Skip to content
Permalink
526cdc58d6
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
128 lines (116 sloc) 7.53 KB
(ns overseer.application
(:require [clojure.tools.logging :refer [info error]]
[clojure.core.async :as async]
[com.stuartsierra.component :as component]
[de.mpg.shh.util-message-server.message-server :refer [map->MessageServer]]
[de.mpg.shh.util-message-server.message-dispatcher :refer [map->MessageDispatcher]]
[de.mpg.shh.util-overseer.server :as overseer-server]
[overseer.component.endpoints-session-superuser :refer [map->EndpointsSessionSuperUser]]
[overseer.component.endpoints-session-user :refer [map->EndpointsSessionUser]]
[overseer.system-config :as system-config]))
(defn build-overseer-conn
"Return the overseer connection"
[]
(let [conn (overseer-server/conn system-config/overseer-session)]
(when (map? system-config/overseer-environment)
(overseer-server/update-env! conn merge system-config/overseer-environment))
conn))
(def system
(atom
(component/system-map
:hostname system-config/hostname
:request-channel (async/chan 2)
:message-server (component/using (map->MessageServer {})
[:request-channel])
:overseer-conn (build-overseer-conn)
;;:pass-channel (async/chan 2)
:dispatcher-request-transducer system-config/dispatcher-request-transducer
:rx-config-superuser-list (reduce (partial system-config/build-x-config system-config/rx-config-superuser-base :listen-context-config)
[]
system-config/rx-servers)
:rx-config-list (reduce (partial system-config/build-x-config system-config/rx-config-base :listen-context-config)
[]
system-config/rx-servers)
:tx-config-list (reduce (partial system-config/build-x-config system-config/tx-config-base :listen-context-config)
[]
system-config/tx-servers)
;; :end-points-superuser (component/using (map->EndpointsSessionSuperUser system-config/endpoints-superuser-config)
;; [:request-channel :overseer :hostname])
;; :overseer-dispatcher-superuser (component/using (map->MessageDispatcher system-config/overseer-superuser-dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer-session
;; :end-points :end-points-superuser
;; :pass-channel :pass-channel})
;;
;; :end-points-user (component/using (map->EndpointsSessionUser system-config/endpoints-user-config)
;; [:request-channel :overseer :hostname])
;; :overseer-dispatcher-user (component/using (map->MessageDispatcher system-config/overseer-user-dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer-session
;; :end-points :end-points-user
;; :loopback-channel :pass-channel})
)))
(defn build-x-map
[accumulator item]
(let [_ (info "brxm item: " item)
suffix (str (first item))
[rx-superuser-config rx-config tx-config] (second item)
end-points-superuser-kw (keyword (str "end-points-superuser-" suffix))
dispatcher-superuser-kw (keyword (str "dispatcher-superuser-" suffix))
dispatcher-superuser-config (merge system-config/overseer-config
rx-superuser-config)
end-points-user-kw (keyword (str "end-points-user-" suffix))
dispatcher-user-kw (keyword (str "dispatcher-user-" suffix))
dispatcher-user-config (merge system-config/overseer-config
rx-config)
pass-channel-kw (keyword (str "pass-channel-" suffix))
;;end-points-kw (keyword (str "end-points-" suffix))
;;dispatcher-kw (keyword (str "dispatcher-" suffix))
;;dispatcher-config (merge system-config/overseer-datastore-config
;; rx-config)
_ (info "dispatcher-superuser-config: " dispatcher-superuser-config)
_ (info "dispatcher-user-config: " dispatcher-user-config)
]
(assoc accumulator pass-channel-kw (async/chan 2)
end-points-superuser-kw (component/using (map->EndpointsSessionSuperUser {})
[:request-channel :overseer-conn :hostname :rx-config-list :tx-config-list])
dispatcher-superuser-kw (component/using (map->MessageDispatcher dispatcher-superuser-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer
:end-points end-points-superuser-kw
:pass-channel pass-channel-kw})
end-points-user-kw (component/using (map->EndpointsSessionUser {})
[:request-channel :overseer-conn :hostname :rx-config-superuser-list :rx-config-list :tx-config-list])
dispatcher-user-kw (component/using (map->MessageDispatcher dispatcher-user-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer
:end-points end-points-user-kw
:loopback-channel pass-channel-kw})
;;end-points-kw (component/using (map->EndpointsSessionUser {})
;; [:request-channel :datastore :hostname :tx-config-list])
;;dispatcher-kw (component/using (map->MessageDispatcher dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer
;; :end-points end-points-kw
;; :loopback-channel pass-channel-kw})
)))
(defn build-system-from-config
"Add endpoints for message queue(s) found in config"
[]
(let [_ (info "rx-config-superuser-list: " (:rx-config-superuser-list @system))
_ (info "rx-config-list: " (:rx-config-list @system))
_ (info "tx-config-list: " (:tx-config-list @system))
x-map (reduce build-x-map {} (map-indexed vector (map vector (:rx-config-superuser-list @system) (:rx-config-list @system) (:tx-config-list @system))))
;;_ (info "x-map: " x-map)
]
(swap! system merge x-map)))
(defn debug-system
[]
(info "system-keys: " (keys @system)))
(defn start []
(swap! system component/start))
(defn stop []
(swap! system component/stop))
(defn reload []
(stop)
(start))