Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add receive buffering and begin work on synch messaging
  • Loading branch information
clayton committed Dec 1, 2017
1 parent c2c03cc commit 526cdc5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 28 deletions.
2 changes: 1 addition & 1 deletion bin/dev
@@ -1,4 +1,4 @@
#!/usr/bin/env bash

lein clean
OVERSEER_SESSION="dispatcher-foobar" lein with-profiles +dev,+shhdocker trampoline run
OVERSEER_SESSION="dispatcher-foo" lein with-profiles +dev,+shhdocker trampoline run
4 changes: 4 additions & 0 deletions bin/dev-two
@@ -0,0 +1,4 @@
#!/usr/bin/env bash

lein clean
OVERSEER_SESSION="dispatcher-bar" lein with-profiles +dev,+shhdocker trampoline run
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -15,7 +15,7 @@
[datascript "0.15.5"]
[de.mpg.shh/util-properties "0.0.1"]
[de.mpg.shh/util-message-server "0.0.15"]
[de.mpg.shh/util-overseer "0.0.92"]]
[de.mpg.shh/util-overseer "0.0.101"]]
:main overseer.core
:source-paths ["src/main/clojure"]
:profiles {:profiles {:uberjar {:aot :all}
Expand Down
2 changes: 1 addition & 1 deletion src/main/clojure/overseer/application.clj
Expand Up @@ -12,7 +12,7 @@
(defn build-overseer-conn
"Return the overseer connection"
[]
(let [conn (overseer-server/conn (System/getenv "OVERSEER_SESSION"))]
(let [conn (overseer-server/conn system-config/overseer-session)]
(when (map? system-config/overseer-environment)
(overseer-server/update-env! conn merge system-config/overseer-environment))
conn))
Expand Down
Expand Up @@ -40,7 +40,10 @@
:resource-toggle-backup
(fn [request-channel {data :data :as message}]
(receive :resource-toggle-backup data))
:overseer-transaction
:overseer-synch-scheduler
(fn [request-channel {data :data :as message}]
(receive :overseer-transaction data))
(receive :overseer-synch-scheduler data))
:overseer-synch-transaction
(fn [request-channel {data :data :as message}]
(receive :overseer-synch-transaction data))
})))
72 changes: 50 additions & 22 deletions src/main/clojure/overseer/component/endpoints_session_user.clj
Expand Up @@ -13,15 +13,16 @@
(let [scheduler-session-id (-> (d/pull (d/db overseer-conn) '[:overseer.resource/session-id] [:db/ident :overseer]) :overseer.resource/session-id)
_ (info "scheduler-session-id: " scheduler-session-id)
reply-rx-superuser (fn [request-channel target message]
(doseq [rx-config rx-config-list]
(doseq [rx-config rx-config-superuser-list]
(info "reply-rx-superuser send to: " rx-config)
(message-helpers/synch-send request-channel
(-> rx-config :listen-context-config :host)
(-> rx-config :listen-context-config :port)
(-> rx-config :listen-context-config :username)
(-> rx-config :listen-context-config :password)
(-> rx-config :listen-address)
message
:properties (merge {"origin" "overseer"}
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1
Expand All @@ -35,7 +36,7 @@
(-> rx-config :listen-context-config :password)
(-> rx-config :listen-address)
message
:properties (merge {"origin" "overseer"}
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1
Expand All @@ -50,39 +51,66 @@
(-> tx-config :listen-context-config :password)
(-> tx-config :listen-address)
message
:properties (merge {"origin" "overseer"}
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1)))
ch-close (async/chan)
ch-transaction (async/chan 2)
ch-synch (async/chan 2)
_ (async/go-loop [[v ch] (async/alts! [ch-close
ch-transaction])]
ch-synch])]
(when-not (= ch ch-close)
(info "ch-transaction loop: " v)
(when-let [msg (overseer-server/process-transaction overseer-conn v)]
(when (overseer-server/cluster? overseer-conn)
(reply-rx-superuser request-channel nil [:overseer-transaction msg]))
(overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction)))
(info "ch-synch loop: " v)
(condp = (first v)
:overseer-synch-transaction
(do
(info "process transaction event: " (second v))
(when-let [msg (overseer-server/process-transaction overseer-conn v)]
(info "broadcasting message: " msg)
(reply-rx-superuser request-channel nil [:overseer-transaction {:data msg}])
(overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction))))
:overseer-synch-scheduler
(do
(info "process schdeduler event: " (second v))
(reply-rx-superuser request-channel nil v))
(throw (ex-info (str "Failed to understand overseer transaction event '" v "'") {:cause "Only :overseer-synch-transaction or :overseer-synch-scheduler accepted"})))
(recur (async/alts! [ch-close
ch-transaction]))))
_ (overseer-server/listen-transaction overseer-conn (partial async/>!! ch-transaction))
ch-synch]))))
_ (overseer-server/listen-synch overseer-conn (partial async/>!! ch-synch))
scheduler-registration {:overseer.resource/type :overseer/scheduler
:session-id scheduler-session-id
:host hostname}
_ (overseer-server/register-resource overseer-conn scheduler-registration)
scheduler-uuid (overseer-server/resource-session->uuid overseer-conn :overseer/scheduler scheduler-session-id)
_ (info "self scheduler-uuid: " scheduler-uuid)
scheduler-broadcast-interval 30000
_ (async/go-loop [[v ch] (async/alts! [ch-close
(async/timeout scheduler-broadcast-interval)])]
(when-not (= ch ch-close)
(reply-rx request-channel nil [:register-resource {:data (assoc scheduler-registration :uuid scheduler-uuid)}])
(recur (async/alts! [ch-close
(async/timeout scheduler-broadcast-interval)]))))
ch-receive-buffer (async/chan 500)
_ (async/go-loop [[v ch] (async/alts! [ch-close
ch-receive-buffer])]
(when-not (= ch ch-close)
(info "ch-receive-buffer loop: " v)
(if (overseer-server/synched? overseer-conn)
(try
(overseer-server/receive overseer-conn v)
(catch Throwable t
(error "failed to receive '" (get v :destination) "' message " v " " (utils/stack-trace-to-string t))))
(Thread/sleep 2000))
(recur (async/alts! [ch-close
ch-receive-buffer]))))
_ (async/go-loop [[v ch] (async/alts! [ch-close
(async/timeout 1000)])]
(when-not (= ch ch-close)
(when-let [task (overseer-server/task overseer-conn)]
(reply-tx request-channel nil [:overseer/task {:data task}]))
(recur (async/alts! [ch-close
(async/timeout 10000)]))))
receive (fn [destination data]
(try
(overseer-server/receive overseer-conn (assoc data :destination destination))
(catch Throwable t
(error "failed to receive '" (pr-str destination) "' message " (pr-str data) " " (utils/stack-trace-to-string t)))))]
]
{
:ch-close ch-close
:error
Expand Down Expand Up @@ -118,16 +146,16 @@
(overseer-server/deregister-resource overseer-conn data))
:workflow-task-accept
(fn [request-channel {data :data :as message}]
(receive :workflow-task-accept data))
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-accept)))
:workflow-task-done
(fn [request-channel {data :data :as message}]
(receive :workflow-task-done data))
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-done)))
:workflow-task-failed
(fn [request-channel {data :data :as message}]
(receive :workflow-task-failed data))
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-failed)))
:workflow-task-running
(fn [request-channel {data :data :as message}]
(receive :workflow-task-running data))
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-running)))
:stop
(fn [request-channel message]
(info "got stop message: " message))}))
Expand Down
4 changes: 3 additions & 1 deletion src/main/clojure/overseer/system_config.clj
Expand Up @@ -24,6 +24,8 @@

(def hostname (.getCanonicalHostName (java.net.InetAddress/getLocalHost)))

(def overseer-session (System/getenv "OVERSEER_SESSION"))

(def overseer-environment (config/get-in [:overseer :environment]))

(def overseer-rx-context (config/get-in [:overseer :rx :context]))
Expand All @@ -50,7 +52,7 @@
(:port overseer-tx-context)))

(def overseer-config
{:listen-selector "origin <> 'overseer'"
{:listen-selector (str "origin <> '" overseer-session "'")
:dispatcher-request-buffer-size 16
:stop-message (pr-str [:stop])})

Expand Down

0 comments on commit 526cdc5

Please sign in to comment.