diff --git a/project.clj b/project.clj index 0d16d15..2109555 100644 --- a/project.clj +++ b/project.clj @@ -15,7 +15,7 @@ [datascript "0.15.5"] [de.mpg.shh/util-properties "0.0.1"] [de.mpg.shh/util-message-server "0.0.15"] - [de.mpg.shh/util-overseer "0.0.89"]] + [de.mpg.shh/util-overseer "0.0.92"]] :main overseer.core :source-paths ["src/main/clojure"] :profiles {:profiles {:uberjar {:aot :all} diff --git a/src/main/clojure/overseer/application.clj b/src/main/clojure/overseer/application.clj index 0492343..f73db98 100644 --- a/src/main/clojure/overseer/application.clj +++ b/src/main/clojure/overseer/application.clj @@ -4,13 +4,18 @@ [com.stuartsierra.component :as component] [de.mpg.shh.util-message-server.message-server :refer [map->MessageServer]] [de.mpg.shh.util-message-server.message-dispatcher :refer [map->MessageDispatcher]] - ;;[overseer.component.overseer :refer [map->Overseer]] [de.mpg.shh.util-overseer.server :as overseer-server] [overseer.component.endpoints-session-superuser :refer [map->EndpointsSessionSuperUser]] [overseer.component.endpoints-session-user :refer [map->EndpointsSessionUser]] [overseer.system-config :as system-config])) -(info "overseer-user-dispatcher-config: " system-config/overseer-user-dispatcher-config) +(defn build-overseer-conn + "Return the overseer connection" + [] + (let [conn (overseer-server/conn (System/getenv "OVERSEER_SESSION"))] + (when (map? system-config/overseer-environment) + (overseer-server/update-env! conn merge system-config/overseer-environment)) + conn)) (def system (atom @@ -19,26 +24,98 @@ :request-channel (async/chan 2) :message-server (component/using (map->MessageServer {}) [:request-channel]) - ;;:overseer (component/using (map->Overseer system-config/overseer-config) - ;; [:request-channel]) - :overseer (overseer-server/conn (System/getenv "OVERSEER_SESSION")) - :pass-channel (async/chan 2) - :dispatcher-request-transducer-session system-config/dispatcher-request-transducer - :end-points-superuser (component/using (map->EndpointsSessionSuperUser system-config/endpoints-superuser-config) - [:request-channel :overseer :hostname]) - :overseer-dispatcher-superuser (component/using (map->MessageDispatcher system-config/overseer-superuser-dispatcher-config) - {:message-server :message-server - :dispatcher-request-transducer :dispatcher-request-transducer-session - :end-points :end-points-superuser - :pass-channel :pass-channel}) - - :end-points-user (component/using (map->EndpointsSessionUser system-config/endpoints-user-config) - [:request-channel :overseer :hostname]) - :overseer-dispatcher-user (component/using (map->MessageDispatcher system-config/overseer-user-dispatcher-config) - {:message-server :message-server - :dispatcher-request-transducer :dispatcher-request-transducer-session - :end-points :end-points-user - :loopback-channel :pass-channel})))) + :overseer-conn (build-overseer-conn) + ;;:pass-channel (async/chan 2) + :dispatcher-request-transducer system-config/dispatcher-request-transducer + :rx-config-superuser-list (reduce (partial system-config/build-x-config system-config/rx-config-superuser-base :listen-context-config) + [] + system-config/rx-servers) + :rx-config-list (reduce (partial system-config/build-x-config system-config/rx-config-base :listen-context-config) + [] + system-config/rx-servers) + :tx-config-list (reduce (partial system-config/build-x-config system-config/tx-config-base :listen-context-config) + [] + system-config/tx-servers) +;; :end-points-superuser (component/using (map->EndpointsSessionSuperUser system-config/endpoints-superuser-config) +;; [:request-channel :overseer :hostname]) +;; :overseer-dispatcher-superuser (component/using (map->MessageDispatcher system-config/overseer-superuser-dispatcher-config) +;; {:message-server :message-server +;; :dispatcher-request-transducer :dispatcher-request-transducer-session +;; :end-points :end-points-superuser +;; :pass-channel :pass-channel}) +;; +;; :end-points-user (component/using (map->EndpointsSessionUser system-config/endpoints-user-config) +;; [:request-channel :overseer :hostname]) +;; :overseer-dispatcher-user (component/using (map->MessageDispatcher system-config/overseer-user-dispatcher-config) +;; {:message-server :message-server +;; :dispatcher-request-transducer :dispatcher-request-transducer-session +;; :end-points :end-points-user +;; :loopback-channel :pass-channel}) + + ))) + +(defn build-x-map + [accumulator item] + (let [_ (info "brxm item: " item) + suffix (str (first item)) + [rx-superuser-config rx-config tx-config] (second item) + end-points-superuser-kw (keyword (str "end-points-superuser-" suffix)) + dispatcher-superuser-kw (keyword (str "dispatcher-superuser-" suffix)) + dispatcher-superuser-config (merge system-config/overseer-config + rx-superuser-config) + end-points-user-kw (keyword (str "end-points-user-" suffix)) + dispatcher-user-kw (keyword (str "dispatcher-user-" suffix)) + dispatcher-user-config (merge system-config/overseer-config + rx-config) + pass-channel-kw (keyword (str "pass-channel-" suffix)) + + ;;end-points-kw (keyword (str "end-points-" suffix)) + ;;dispatcher-kw (keyword (str "dispatcher-" suffix)) + ;;dispatcher-config (merge system-config/overseer-datastore-config + ;; rx-config) + _ (info "dispatcher-superuser-config: " dispatcher-superuser-config) + _ (info "dispatcher-user-config: " dispatcher-user-config) + + ] + (assoc accumulator pass-channel-kw (async/chan 2) + end-points-superuser-kw (component/using (map->EndpointsSessionSuperUser {}) + [:request-channel :overseer-conn :hostname :rx-config-list :tx-config-list]) + dispatcher-superuser-kw (component/using (map->MessageDispatcher dispatcher-superuser-config) + {:message-server :message-server + :dispatcher-request-transducer :dispatcher-request-transducer + :end-points end-points-superuser-kw + :pass-channel pass-channel-kw}) + end-points-user-kw (component/using (map->EndpointsSessionUser {}) + [:request-channel :overseer-conn :hostname :rx-config-superuser-list :rx-config-list :tx-config-list]) + dispatcher-user-kw (component/using (map->MessageDispatcher dispatcher-user-config) + {:message-server :message-server + :dispatcher-request-transducer :dispatcher-request-transducer + :end-points end-points-user-kw + :loopback-channel pass-channel-kw}) + + ;;end-points-kw (component/using (map->EndpointsSessionUser {}) + ;; [:request-channel :datastore :hostname :tx-config-list]) + ;;dispatcher-kw (component/using (map->MessageDispatcher dispatcher-config) + ;; {:message-server :message-server + ;; :dispatcher-request-transducer :dispatcher-request-transducer + ;; :end-points end-points-kw + ;; :loopback-channel pass-channel-kw}) + ))) + +(defn build-system-from-config + "Add endpoints for message queue(s) found in config" + [] + (let [_ (info "rx-config-superuser-list: " (:rx-config-superuser-list @system)) + _ (info "rx-config-list: " (:rx-config-list @system)) + _ (info "tx-config-list: " (:tx-config-list @system)) + x-map (reduce build-x-map {} (map-indexed vector (map vector (:rx-config-superuser-list @system) (:rx-config-list @system) (:tx-config-list @system)))) + ;;_ (info "x-map: " x-map) + ] + (swap! system merge x-map))) + +(defn debug-system + [] + (info "system-keys: " (keys @system))) (defn start [] (swap! system component/start)) diff --git a/src/main/clojure/overseer/component/endpoints_session_superuser.clj b/src/main/clojure/overseer/component/endpoints_session_superuser.clj index a05b97a..97e7223 100644 --- a/src/main/clojure/overseer/component/endpoints_session_superuser.clj +++ b/src/main/clojure/overseer/component/endpoints_session_superuser.clj @@ -1,30 +1,18 @@ (ns overseer.component.endpoints-session-superuser (:require [clojure.tools.logging :refer [info error]] + [clojure.core.async :as async] [com.stuartsierra.component :as component] [datascript.core :as d] - [clojure.core.async :as async] [de.mpg.shh.util-message-server.helpers :as message-helpers] [de.mpg.shh.util-overseer.server :as overseer-server] [overseer.utils :as utils])) -(defrecord EndpointsSessionSuperUser [response-context response-address overseer hostname] +(defrecord EndpointsSessionSuperUser [request-channel overseer-conn hostname rx-config-list tx-config-list] component/Lifecycle (start [this] - (let [reply (fn [request-channel target message] - (message-helpers/synch-send request-channel - (:host response-context) - (:port response-context) - (:username response-context) - (:password response-context) - response-address - message - :properties (merge {"origin" "overseer-datastore"} - (when-not (nil? target) - {"target" target})) - :timeout 1)) - receive (fn [conn destination data] + (let [receive (fn [destination data] (try - (overseer-server/receive conn (assoc data :destination destination)) + (overseer-server/receive overseer-conn (assoc data :destination destination)) (catch Throwable t (error "failed to receive '" (pr-str destination) "' message " (pr-str data) " " (utils/stack-trace-to-string t)))))] { @@ -33,26 +21,26 @@ (error "error endpoint received: " message)) :workflow-create (fn [request-channel {data :data :as message}] - (receive overseer :workflow-create data)) + (receive :workflow-create data)) :workflow-play (fn [request-channel {data :data :as message}] - (receive overseer :workflow-play data)) + (receive :workflow-play data)) :workflow-pause (fn [request-channel {data :data :as message}] - (receive overseer :workflow-pause data)) + (receive :workflow-pause data)) :workflow-clear-failed (fn [request-channel {data :data :as message}] - (receive overseer :workflow-clear-failed data)) + (receive :workflow-clear-failed data)) :workflow-clear-all (fn [request-channel {data :data :as message}] - (receive overseer :workflow-clear-all data)) + (receive :workflow-clear-all data)) :workflow-restart-from-step (fn [request-channel {data :data :as message}] - (receive overseer :workflow-restart-from-step data)) + (receive :workflow-restart-from-step data)) :resource-toggle-backup (fn [request-channel {data :data :as message}] - (receive overseer :resource-toggle-backup data)) + (receive :resource-toggle-backup data)) :overseer-transaction (fn [request-channel {data :data :as message}] - (recieve overseer :overseer-transaction data)) + (receive :overseer-transaction data)) }))) diff --git a/src/main/clojure/overseer/component/endpoints_session_user.clj b/src/main/clojure/overseer/component/endpoints_session_user.clj index d854d2e..b6f0b1d 100644 --- a/src/main/clojure/overseer/component/endpoints_session_user.clj +++ b/src/main/clojure/overseer/component/endpoints_session_user.clj @@ -7,66 +7,80 @@ [de.mpg.shh.util-overseer.server :as overseer-server] [overseer.utils :as utils])) -(defrecord EndpointsSessionUser [response-context response-address environment overseer request-channel hostname] +(defrecord EndpointsSessionUser [request-channel overseer-conn hostname rx-config-superuser-list rx-config-list tx-config-list] component/Lifecycle (start [this] - (let [scheduler-session-id (-> (d/pull (d/db overseer) '[:overseer.resource/session-id] [:db/ident :overseer]) :overseer.resource/session-id) + (let [scheduler-session-id (-> (d/pull (d/db overseer-conn) '[:overseer.resource/session-id] [:db/ident :overseer]) :overseer.resource/session-id) _ (info "scheduler-session-id: " scheduler-session-id) - reply (fn [request-channel target message] - (message-helpers/synch-send request-channel - (:host response-context) - (:port response-context) - (:username response-context) - (:password response-context) - response-address + reply-rx-superuser (fn [request-channel target message] + (doseq [rx-config rx-config-list] + (message-helpers/synch-send request-channel + (-> rx-config :listen-context-config :host) + (-> rx-config :listen-context-config :port) + (-> rx-config :listen-context-config :username) + (-> rx-config :listen-context-config :password) + (-> rx-config :listen-address) message :properties (merge {"origin" "overseer"} (when-not (nil? target) {"target" target})) :timeout 1 - :send-meta true)) - reply-rx-superuser (fn [request-channel target message] - (message-helpers/synch-send request-channel - (:host response-context-su) - (:port response-context-su) - (:username response-context-su) - (:password response-context-su) - response-address-su + :send-meta true))) + reply-rx (fn [request-channel target message] + (doseq [rx-config rx-config-list] + (message-helpers/synch-send request-channel + (-> rx-config :listen-context-config :host) + (-> rx-config :listen-context-config :port) + (-> rx-config :listen-context-config :username) + (-> rx-config :listen-context-config :password) + (-> rx-config :listen-address) + message + :properties (merge {"origin" "overseer"} + (when-not (nil? target) + {"target" target})) + :timeout 1 + :send-meta true))) + reply-tx (fn [request-channel target message] + (doseq [tx-config tx-config-list] + (info "es tx config: " tx-config) + (message-helpers/synch-send request-channel + (-> tx-config :listen-context-config :host) + (-> tx-config :listen-context-config :port) + (-> tx-config :listen-context-config :username) + (-> tx-config :listen-context-config :password) + (-> tx-config :listen-address) message :properties (merge {"origin" "overseer"} (when-not (nil? target) {"target" target})) - :timeout 1 - :send-meta true)) + :timeout 1))) ch-close (async/chan) - _ (when (map? environment) - (overseer-server/update-env! overseer merge environment)) ch-transaction (async/chan 2) - _ (async/go-loop [[v ch] (aysnc/alts! [ch-close + _ (async/go-loop [[v ch] (async/alts! [ch-close ch-transaction])] (when-not (= ch ch-close) (info "ch-transaction loop: " v) - (when-let [msg (overseer-server/process-transaction conn v)] - (when (overseer-server/cluster? conn) + (when-let [msg (overseer-server/process-transaction overseer-conn v)] + (when (overseer-server/cluster? overseer-conn) (reply-rx-superuser request-channel nil [:overseer-transaction msg])) - (overseer-server/recieve conn (assoc msg :destination :overseer-transaction))) + (overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction))) (recur (async/alts! [ch-close ch-transaction])))) - _ (overseer-server/listen-transaction overseer (partial async/put>!! ch-transaction)) + _ (overseer-server/listen-transaction overseer-conn (partial async/>!! ch-transaction)) scheduler-registration {:overseer.resource/type :overseer/scheduler :session-id scheduler-session-id :host hostname} - _ (overseer-server/register-resource overseer scheduler-registration) + _ (overseer-server/register-resource overseer-conn scheduler-registration) _ (async/go-loop [[v ch] (async/alts! [ch-close (async/timeout 1000)])] (when-not (= ch ch-close) - (when-let [task (overseer-server/task overseer)] - (reply request-channel nil [:overseer/task {:data task}])) + (when-let [task (overseer-server/task overseer-conn)] + (reply-tx request-channel nil [:overseer/task {:data task}])) (recur (async/alts! [ch-close (async/timeout 10000)])))) - receive (fn [conn destination data] + receive (fn [destination data] (try - (overseer-server/receive conn (assoc data :destination destination)) + (overseer-server/receive overseer-conn (assoc data :destination destination)) (catch Throwable t (error "failed to receive '" (pr-str destination) "' message " (pr-str data) " " (utils/stack-trace-to-string t)))))] { @@ -77,43 +91,43 @@ :ping (fn [request-channel message] ;;(info "got ping: " message) - (reply request-channel (:client-id message) [:pong {}])) + (reply-tx request-channel (:client-id message) [:pong {}])) ;; Overseer client api calls, e.g. registering a runtime or accepting a task :status (fn [request-channel message] (let [;;_ (info "got status: " (pr-str message)) result (try - {:data (overseer-server/status overseer)} + {:data (overseer-server/status overseer-conn)} (catch Throwable t {:error (utils/stack-trace-to-string t)}))] - (reply request-channel - (:client-id message) - [:message (merge result (select-keys message [:session-request-id]))]))) + (reply-tx request-channel + (:client-id message) + [:message (merge result (select-keys message [:session-request-id]))]))) :register-resource (fn [request-channel {data :data :as message}] (let [_ (info "es got register-resource: " (pr-str message)) result (try - {:data (overseer-server/register-resource overseer data)} + {:data (overseer-server/register-resource overseer-conn data)} (catch Throwable t {:error (utils/stack-trace-to-string t)}))] - (reply request-channel - (:client-id message) - [:resource-session result]))) + (reply-tx request-channel + (:client-id message) + [:resource-session result]))) :deregister-resource (fn [request-channel {data :data :as message}] - (overseer-server/deregister-resource overseer data)) + (overseer-server/deregister-resource overseer-conn data)) :workflow-task-accept (fn [request-channel {data :data :as message}] - (receive overseer :workflow-task-accept data)) + (receive :workflow-task-accept data)) :workflow-task-done (fn [request-channel {data :data :as message}] - (receive overseer :workflow-task-done data)) + (receive :workflow-task-done data)) :workflow-task-failed (fn [request-channel {data :data :as message}] - (receive overseer :workflow-task-failed data)) + (receive :workflow-task-failed data)) :workflow-task-running (fn [request-channel {data :data :as message}] - (receive overseer :workflow-task-running data)) + (receive :workflow-task-running data)) :stop (fn [request-channel message] (info "got stop message: " message))})) diff --git a/src/main/clojure/overseer/config.clj b/src/main/clojure/overseer/config.clj index 55e222e..66fb30c 100644 --- a/src/main/clojure/overseer/config.clj +++ b/src/main/clojure/overseer/config.clj @@ -8,18 +8,22 @@ (def ^{:private true} config (atom nil)) +(defn csv-split [s] (str/split s #",")) -(def transform-map {}) +(def transform-map {[:overseer :rx :context :host] csv-split + [:overseer :rx :context :port] csv-split + [:overseer :tx :context :host] csv-split + [:overseer :tx :context :port] csv-split}) (defn read-and-cache-config! -"Read `config-file`, apply `config-transformers` and cache in an atom" -[] -(let [new-config (load-properties config-file transform-map)] - (reset! config new-config))) + "Read `config-file`, apply `config-transformers` and cache in an atom" + [] + (let [new-config (load-properties config-file transform-map)] + (reset! config new-config))) (defn get-in -([ks] + ([ks] (get-in ks nil)) -([ks default] + ([ks default] (when (nil? @config) (read-and-cache-config!)) - (clojure.core/get-in @config ks default))) + (clojure.core/get-in @config ks default))) diff --git a/src/main/clojure/overseer/core.clj b/src/main/clojure/overseer/core.clj index 33998cc..b582beb 100644 --- a/src/main/clojure/overseer/core.clj +++ b/src/main/clojure/overseer/core.clj @@ -2,7 +2,7 @@ (:require [clojure.tools.logging :refer [info error]] [clojure.string :as str] [clojure.core.async :as async] - [overseer.application :refer [start stop]]) + [overseer.application :refer [start stop build-system-from-config debug-system]]) (:gen-class)) (defn usage @@ -14,6 +14,8 @@ (let [shutdown-channel (async/chan 1) _ (when (nil? (System/getenv "OVERSEER_SESSION")) (do (usage) (System/exit 1))) _ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (println "Shutting down...") (async/>!! shutdown-channel :stop)))) + _ (build-system-from-config) + _ (debug-system) _ (start)] (async/ 'overseer'" + :dispatcher-request-buffer-size 16 + :stop-message (pr-str [:stop])}) + +(defn build-x-config + [context context-key accumulator item] + (conj accumulator (update context context-key merge {:host (first item) + :port (second item)}))) + + ;;(def overseer-config ;; {:response-context overseer-tx-context ;; :response-address overseer-tx-address ;; :environment overseer-environment}) -(def overseer-dispatcher-config - {:listen-context-config overseer-rx-context - :listen-selector "origin <> 'overseer'" - :dispatcher-request-buffer-size 16 - :stop-message (pr-str [:stop])}) -(def endpoints-superuser-config - {:response-context overseer-tx-context - :response-address overseer-tx-address}) -(def overseer-superuser-dispatcher-config (assoc overseer-dispatcher-config - :listen-address overseer-rx-superuser-address)) +;;(def endpoints-superuser-config +;; {:response-context overseer-tx-context +;; :response-address overseer-tx-address}) + +;;(def overseer-superuser-dispatcher-config (assoc overseer-dispatcher-config +;; :listen-address overseer-rx-superuser-address)) -(def endpoints-user-config - {:response-context overseer-tx-context - :response-address overseer-tx-address - :environment overseer-environment}) +;;(def endpoints-user-config +;; {:response-context overseer-tx-context +;; :response-address overseer-tx-address +;; :environment overseer-environment}) -(def overseer-user-dispatcher-config (assoc overseer-dispatcher-config - :listen-address overseer-rx-address)) +;;(def overseer-user-dispatcher-config (assoc overseer-dispatcher-config +;; :listen-address overseer-rx-address))