From 526cdc58d6efc0b3429075ce788c538682d3d4cb Mon Sep 17 00:00:00 2001 From: Stephen Clayton Date: Fri, 1 Dec 2017 17:40:55 +0100 Subject: [PATCH] add receive buffering and begin work on synch messaging --- bin/dev | 2 +- bin/dev-two | 4 ++ project.clj | 2 +- src/main/clojure/overseer/application.clj | 2 +- .../component/endpoints_session_superuser.clj | 7 +- .../component/endpoints_session_user.clj | 72 +++++++++++++------ src/main/clojure/overseer/system_config.clj | 4 +- 7 files changed, 65 insertions(+), 28 deletions(-) create mode 100755 bin/dev-two diff --git a/bin/dev b/bin/dev index b72bbee..a0895a2 100755 --- a/bin/dev +++ b/bin/dev @@ -1,4 +1,4 @@ #!/usr/bin/env bash lein clean -OVERSEER_SESSION="dispatcher-foobar" lein with-profiles +dev,+shhdocker trampoline run +OVERSEER_SESSION="dispatcher-foo" lein with-profiles +dev,+shhdocker trampoline run diff --git a/bin/dev-two b/bin/dev-two new file mode 100755 index 0000000..e10ad89 --- /dev/null +++ b/bin/dev-two @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +lein clean +OVERSEER_SESSION="dispatcher-bar" lein with-profiles +dev,+shhdocker trampoline run diff --git a/project.clj b/project.clj index 2109555..da5c3bc 100644 --- a/project.clj +++ b/project.clj @@ -15,7 +15,7 @@ [datascript "0.15.5"] [de.mpg.shh/util-properties "0.0.1"] [de.mpg.shh/util-message-server "0.0.15"] - [de.mpg.shh/util-overseer "0.0.92"]] + [de.mpg.shh/util-overseer "0.0.101"]] :main overseer.core :source-paths ["src/main/clojure"] :profiles {:profiles {:uberjar {:aot :all} diff --git a/src/main/clojure/overseer/application.clj b/src/main/clojure/overseer/application.clj index f73db98..6f6bf60 100644 --- a/src/main/clojure/overseer/application.clj +++ b/src/main/clojure/overseer/application.clj @@ -12,7 +12,7 @@ (defn build-overseer-conn "Return the overseer connection" [] - (let [conn (overseer-server/conn (System/getenv "OVERSEER_SESSION"))] + (let [conn (overseer-server/conn system-config/overseer-session)] (when (map? system-config/overseer-environment) (overseer-server/update-env! conn merge system-config/overseer-environment)) conn)) diff --git a/src/main/clojure/overseer/component/endpoints_session_superuser.clj b/src/main/clojure/overseer/component/endpoints_session_superuser.clj index 97e7223..21314cb 100644 --- a/src/main/clojure/overseer/component/endpoints_session_superuser.clj +++ b/src/main/clojure/overseer/component/endpoints_session_superuser.clj @@ -40,7 +40,10 @@ :resource-toggle-backup (fn [request-channel {data :data :as message}] (receive :resource-toggle-backup data)) - :overseer-transaction + :overseer-synch-scheduler (fn [request-channel {data :data :as message}] - (receive :overseer-transaction data)) + (receive :overseer-synch-scheduler data)) + :overseer-synch-transaction + (fn [request-channel {data :data :as message}] + (receive :overseer-synch-transaction data)) }))) diff --git a/src/main/clojure/overseer/component/endpoints_session_user.clj b/src/main/clojure/overseer/component/endpoints_session_user.clj index b6f0b1d..e7b418b 100644 --- a/src/main/clojure/overseer/component/endpoints_session_user.clj +++ b/src/main/clojure/overseer/component/endpoints_session_user.clj @@ -13,7 +13,8 @@ (let [scheduler-session-id (-> (d/pull (d/db overseer-conn) '[:overseer.resource/session-id] [:db/ident :overseer]) :overseer.resource/session-id) _ (info "scheduler-session-id: " scheduler-session-id) reply-rx-superuser (fn [request-channel target message] - (doseq [rx-config rx-config-list] + (doseq [rx-config rx-config-superuser-list] + (info "reply-rx-superuser send to: " rx-config) (message-helpers/synch-send request-channel (-> rx-config :listen-context-config :host) (-> rx-config :listen-context-config :port) @@ -21,7 +22,7 @@ (-> rx-config :listen-context-config :password) (-> rx-config :listen-address) message - :properties (merge {"origin" "overseer"} + :properties (merge {"origin" scheduler-session-id} (when-not (nil? target) {"target" target})) :timeout 1 @@ -35,7 +36,7 @@ (-> rx-config :listen-context-config :password) (-> rx-config :listen-address) message - :properties (merge {"origin" "overseer"} + :properties (merge {"origin" scheduler-session-id} (when-not (nil? target) {"target" target})) :timeout 1 @@ -50,27 +51,58 @@ (-> tx-config :listen-context-config :password) (-> tx-config :listen-address) message - :properties (merge {"origin" "overseer"} + :properties (merge {"origin" scheduler-session-id} (when-not (nil? target) {"target" target})) :timeout 1))) ch-close (async/chan) - ch-transaction (async/chan 2) + ch-synch (async/chan 2) _ (async/go-loop [[v ch] (async/alts! [ch-close - ch-transaction])] + ch-synch])] (when-not (= ch ch-close) - (info "ch-transaction loop: " v) - (when-let [msg (overseer-server/process-transaction overseer-conn v)] - (when (overseer-server/cluster? overseer-conn) - (reply-rx-superuser request-channel nil [:overseer-transaction msg])) - (overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction))) + (info "ch-synch loop: " v) + (condp = (first v) + :overseer-synch-transaction + (do + (info "process transaction event: " (second v)) + (when-let [msg (overseer-server/process-transaction overseer-conn v)] + (info "broadcasting message: " msg) + (reply-rx-superuser request-channel nil [:overseer-transaction {:data msg}]) + (overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction)))) + :overseer-synch-scheduler + (do + (info "process schdeduler event: " (second v)) + (reply-rx-superuser request-channel nil v)) + (throw (ex-info (str "Failed to understand overseer transaction event '" v "'") {:cause "Only :overseer-synch-transaction or :overseer-synch-scheduler accepted"}))) (recur (async/alts! [ch-close - ch-transaction])))) - _ (overseer-server/listen-transaction overseer-conn (partial async/>!! ch-transaction)) + ch-synch])))) + _ (overseer-server/listen-synch overseer-conn (partial async/>!! ch-synch)) scheduler-registration {:overseer.resource/type :overseer/scheduler :session-id scheduler-session-id :host hostname} _ (overseer-server/register-resource overseer-conn scheduler-registration) + scheduler-uuid (overseer-server/resource-session->uuid overseer-conn :overseer/scheduler scheduler-session-id) + _ (info "self scheduler-uuid: " scheduler-uuid) + scheduler-broadcast-interval 30000 + _ (async/go-loop [[v ch] (async/alts! [ch-close + (async/timeout scheduler-broadcast-interval)])] + (when-not (= ch ch-close) + (reply-rx request-channel nil [:register-resource {:data (assoc scheduler-registration :uuid scheduler-uuid)}]) + (recur (async/alts! [ch-close + (async/timeout scheduler-broadcast-interval)])))) + ch-receive-buffer (async/chan 500) + _ (async/go-loop [[v ch] (async/alts! [ch-close + ch-receive-buffer])] + (when-not (= ch ch-close) + (info "ch-receive-buffer loop: " v) + (if (overseer-server/synched? overseer-conn) + (try + (overseer-server/receive overseer-conn v) + (catch Throwable t + (error "failed to receive '" (get v :destination) "' message " v " " (utils/stack-trace-to-string t)))) + (Thread/sleep 2000)) + (recur (async/alts! [ch-close + ch-receive-buffer])))) _ (async/go-loop [[v ch] (async/alts! [ch-close (async/timeout 1000)])] (when-not (= ch ch-close) @@ -78,11 +110,7 @@ (reply-tx request-channel nil [:overseer/task {:data task}])) (recur (async/alts! [ch-close (async/timeout 10000)])))) - receive (fn [destination data] - (try - (overseer-server/receive overseer-conn (assoc data :destination destination)) - (catch Throwable t - (error "failed to receive '" (pr-str destination) "' message " (pr-str data) " " (utils/stack-trace-to-string t)))))] + ] { :ch-close ch-close :error @@ -118,16 +146,16 @@ (overseer-server/deregister-resource overseer-conn data)) :workflow-task-accept (fn [request-channel {data :data :as message}] - (receive :workflow-task-accept data)) + (async/>!! ch-receive-buffer (assoc data :destination :workflow-task-accept))) :workflow-task-done (fn [request-channel {data :data :as message}] - (receive :workflow-task-done data)) + (async/>!! ch-receive-buffer (assoc data :destination :workflow-task-done))) :workflow-task-failed (fn [request-channel {data :data :as message}] - (receive :workflow-task-failed data)) + (async/>!! ch-receive-buffer (assoc data :destination :workflow-task-failed))) :workflow-task-running (fn [request-channel {data :data :as message}] - (receive :workflow-task-running data)) + (async/>!! ch-receive-buffer (assoc data :destination :workflow-task-running))) :stop (fn [request-channel message] (info "got stop message: " message))})) diff --git a/src/main/clojure/overseer/system_config.clj b/src/main/clojure/overseer/system_config.clj index c44f378..f6040c7 100644 --- a/src/main/clojure/overseer/system_config.clj +++ b/src/main/clojure/overseer/system_config.clj @@ -24,6 +24,8 @@ (def hostname (.getCanonicalHostName (java.net.InetAddress/getLocalHost))) +(def overseer-session (System/getenv "OVERSEER_SESSION")) + (def overseer-environment (config/get-in [:overseer :environment])) (def overseer-rx-context (config/get-in [:overseer :rx :context])) @@ -50,7 +52,7 @@ (:port overseer-tx-context))) (def overseer-config - {:listen-selector "origin <> 'overseer'" + {:listen-selector (str "origin <> '" overseer-session "'") :dispatcher-request-buffer-size 16 :stop-message (pr-str [:stop])})