Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
move to multi-queue
  • Loading branch information
clayton committed Nov 28, 2017
1 parent 21d0529 commit c2c03cc
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 117 deletions.
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -15,7 +15,7 @@
[datascript "0.15.5"]
[de.mpg.shh/util-properties "0.0.1"]
[de.mpg.shh/util-message-server "0.0.15"]
[de.mpg.shh/util-overseer "0.0.89"]]
[de.mpg.shh/util-overseer "0.0.92"]]
:main overseer.core
:source-paths ["src/main/clojure"]
:profiles {:profiles {:uberjar {:aot :all}
Expand Down
121 changes: 99 additions & 22 deletions src/main/clojure/overseer/application.clj
Expand Up @@ -4,13 +4,18 @@
[com.stuartsierra.component :as component]
[de.mpg.shh.util-message-server.message-server :refer [map->MessageServer]]
[de.mpg.shh.util-message-server.message-dispatcher :refer [map->MessageDispatcher]]
;;[overseer.component.overseer :refer [map->Overseer]]
[de.mpg.shh.util-overseer.server :as overseer-server]
[overseer.component.endpoints-session-superuser :refer [map->EndpointsSessionSuperUser]]
[overseer.component.endpoints-session-user :refer [map->EndpointsSessionUser]]
[overseer.system-config :as system-config]))

(info "overseer-user-dispatcher-config: " system-config/overseer-user-dispatcher-config)
(defn build-overseer-conn
"Return the overseer connection"
[]
(let [conn (overseer-server/conn (System/getenv "OVERSEER_SESSION"))]
(when (map? system-config/overseer-environment)
(overseer-server/update-env! conn merge system-config/overseer-environment))
conn))

(def system
(atom
Expand All @@ -19,26 +24,98 @@
:request-channel (async/chan 2)
:message-server (component/using (map->MessageServer {})
[:request-channel])
;;:overseer (component/using (map->Overseer system-config/overseer-config)
;; [:request-channel])
:overseer (overseer-server/conn (System/getenv "OVERSEER_SESSION"))
:pass-channel (async/chan 2)
:dispatcher-request-transducer-session system-config/dispatcher-request-transducer
:end-points-superuser (component/using (map->EndpointsSessionSuperUser system-config/endpoints-superuser-config)
[:request-channel :overseer :hostname])
:overseer-dispatcher-superuser (component/using (map->MessageDispatcher system-config/overseer-superuser-dispatcher-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer-session
:end-points :end-points-superuser
:pass-channel :pass-channel})

:end-points-user (component/using (map->EndpointsSessionUser system-config/endpoints-user-config)
[:request-channel :overseer :hostname])
:overseer-dispatcher-user (component/using (map->MessageDispatcher system-config/overseer-user-dispatcher-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer-session
:end-points :end-points-user
:loopback-channel :pass-channel}))))
:overseer-conn (build-overseer-conn)
;;:pass-channel (async/chan 2)
:dispatcher-request-transducer system-config/dispatcher-request-transducer
:rx-config-superuser-list (reduce (partial system-config/build-x-config system-config/rx-config-superuser-base :listen-context-config)
[]
system-config/rx-servers)
:rx-config-list (reduce (partial system-config/build-x-config system-config/rx-config-base :listen-context-config)
[]
system-config/rx-servers)
:tx-config-list (reduce (partial system-config/build-x-config system-config/tx-config-base :listen-context-config)
[]
system-config/tx-servers)
;; :end-points-superuser (component/using (map->EndpointsSessionSuperUser system-config/endpoints-superuser-config)
;; [:request-channel :overseer :hostname])
;; :overseer-dispatcher-superuser (component/using (map->MessageDispatcher system-config/overseer-superuser-dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer-session
;; :end-points :end-points-superuser
;; :pass-channel :pass-channel})
;;
;; :end-points-user (component/using (map->EndpointsSessionUser system-config/endpoints-user-config)
;; [:request-channel :overseer :hostname])
;; :overseer-dispatcher-user (component/using (map->MessageDispatcher system-config/overseer-user-dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer-session
;; :end-points :end-points-user
;; :loopback-channel :pass-channel})

)))

(defn build-x-map
[accumulator item]
(let [_ (info "brxm item: " item)
suffix (str (first item))
[rx-superuser-config rx-config tx-config] (second item)
end-points-superuser-kw (keyword (str "end-points-superuser-" suffix))
dispatcher-superuser-kw (keyword (str "dispatcher-superuser-" suffix))
dispatcher-superuser-config (merge system-config/overseer-config
rx-superuser-config)
end-points-user-kw (keyword (str "end-points-user-" suffix))
dispatcher-user-kw (keyword (str "dispatcher-user-" suffix))
dispatcher-user-config (merge system-config/overseer-config
rx-config)
pass-channel-kw (keyword (str "pass-channel-" suffix))

;;end-points-kw (keyword (str "end-points-" suffix))
;;dispatcher-kw (keyword (str "dispatcher-" suffix))
;;dispatcher-config (merge system-config/overseer-datastore-config
;; rx-config)
_ (info "dispatcher-superuser-config: " dispatcher-superuser-config)
_ (info "dispatcher-user-config: " dispatcher-user-config)

]
(assoc accumulator pass-channel-kw (async/chan 2)
end-points-superuser-kw (component/using (map->EndpointsSessionSuperUser {})
[:request-channel :overseer-conn :hostname :rx-config-list :tx-config-list])
dispatcher-superuser-kw (component/using (map->MessageDispatcher dispatcher-superuser-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer
:end-points end-points-superuser-kw
:pass-channel pass-channel-kw})
end-points-user-kw (component/using (map->EndpointsSessionUser {})
[:request-channel :overseer-conn :hostname :rx-config-superuser-list :rx-config-list :tx-config-list])
dispatcher-user-kw (component/using (map->MessageDispatcher dispatcher-user-config)
{:message-server :message-server
:dispatcher-request-transducer :dispatcher-request-transducer
:end-points end-points-user-kw
:loopback-channel pass-channel-kw})

;;end-points-kw (component/using (map->EndpointsSessionUser {})
;; [:request-channel :datastore :hostname :tx-config-list])
;;dispatcher-kw (component/using (map->MessageDispatcher dispatcher-config)
;; {:message-server :message-server
;; :dispatcher-request-transducer :dispatcher-request-transducer
;; :end-points end-points-kw
;; :loopback-channel pass-channel-kw})
)))

(defn build-system-from-config
"Add endpoints for message queue(s) found in config"
[]
(let [_ (info "rx-config-superuser-list: " (:rx-config-superuser-list @system))
_ (info "rx-config-list: " (:rx-config-list @system))
_ (info "tx-config-list: " (:tx-config-list @system))
x-map (reduce build-x-map {} (map-indexed vector (map vector (:rx-config-superuser-list @system) (:rx-config-list @system) (:tx-config-list @system))))
;;_ (info "x-map: " x-map)
]
(swap! system merge x-map)))

(defn debug-system
[]
(info "system-keys: " (keys @system)))

(defn start []
(swap! system component/start))
Expand Down
36 changes: 12 additions & 24 deletions src/main/clojure/overseer/component/endpoints_session_superuser.clj
@@ -1,30 +1,18 @@
(ns overseer.component.endpoints-session-superuser
(:require [clojure.tools.logging :refer [info error]]
[clojure.core.async :as async]
[com.stuartsierra.component :as component]
[datascript.core :as d]
[clojure.core.async :as async]
[de.mpg.shh.util-message-server.helpers :as message-helpers]
[de.mpg.shh.util-overseer.server :as overseer-server]
[overseer.utils :as utils]))

(defrecord EndpointsSessionSuperUser [response-context response-address overseer hostname]
(defrecord EndpointsSessionSuperUser [request-channel overseer-conn hostname rx-config-list tx-config-list]
component/Lifecycle
(start [this]
(let [reply (fn [request-channel target message]
(message-helpers/synch-send request-channel
(:host response-context)
(:port response-context)
(:username response-context)
(:password response-context)
response-address
message
:properties (merge {"origin" "overseer-datastore"}
(when-not (nil? target)
{"target" target}))
:timeout 1))
receive (fn [conn destination data]
(let [receive (fn [destination data]
(try
(overseer-server/receive conn (assoc data :destination destination))
(overseer-server/receive overseer-conn (assoc data :destination destination))
(catch Throwable t
(error "failed to receive '" (pr-str destination) "' message " (pr-str data) " " (utils/stack-trace-to-string t)))))]
{
Expand All @@ -33,26 +21,26 @@
(error "error endpoint received: " message))
:workflow-create
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-create data))
(receive :workflow-create data))
:workflow-play
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-play data))
(receive :workflow-play data))
:workflow-pause
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-pause data))
(receive :workflow-pause data))
:workflow-clear-failed
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-clear-failed data))
(receive :workflow-clear-failed data))
:workflow-clear-all
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-clear-all data))
(receive :workflow-clear-all data))
:workflow-restart-from-step
(fn [request-channel {data :data :as message}]
(receive overseer :workflow-restart-from-step data))
(receive :workflow-restart-from-step data))
:resource-toggle-backup
(fn [request-channel {data :data :as message}]
(receive overseer :resource-toggle-backup data))
(receive :resource-toggle-backup data))
:overseer-transaction
(fn [request-channel {data :data :as message}]
(recieve overseer :overseer-transaction data))
(receive :overseer-transaction data))
})))

0 comments on commit c2c03cc

Please sign in to comment.