Skip to content
Permalink
526cdc58d6
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
164 lines (163 sloc) 10.5 KB
(ns overseer.component.endpoints-session-user
(:require [clojure.tools.logging :refer [info error]]
[clojure.core.async :as async]
[com.stuartsierra.component :as component]
[datascript.core :as d]
[de.mpg.shh.util-message-server.helpers :as message-helpers]
[de.mpg.shh.util-overseer.server :as overseer-server]
[overseer.utils :as utils]))
(defrecord EndpointsSessionUser [request-channel overseer-conn hostname rx-config-superuser-list rx-config-list tx-config-list]
component/Lifecycle
(start [this]
(let [scheduler-session-id (-> (d/pull (d/db overseer-conn) '[:overseer.resource/session-id] [:db/ident :overseer]) :overseer.resource/session-id)
_ (info "scheduler-session-id: " scheduler-session-id)
reply-rx-superuser (fn [request-channel target message]
(doseq [rx-config rx-config-superuser-list]
(info "reply-rx-superuser send to: " rx-config)
(message-helpers/synch-send request-channel
(-> rx-config :listen-context-config :host)
(-> rx-config :listen-context-config :port)
(-> rx-config :listen-context-config :username)
(-> rx-config :listen-context-config :password)
(-> rx-config :listen-address)
message
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1
:send-meta true)))
reply-rx (fn [request-channel target message]
(doseq [rx-config rx-config-list]
(message-helpers/synch-send request-channel
(-> rx-config :listen-context-config :host)
(-> rx-config :listen-context-config :port)
(-> rx-config :listen-context-config :username)
(-> rx-config :listen-context-config :password)
(-> rx-config :listen-address)
message
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1
:send-meta true)))
reply-tx (fn [request-channel target message]
(doseq [tx-config tx-config-list]
(info "es tx config: " tx-config)
(message-helpers/synch-send request-channel
(-> tx-config :listen-context-config :host)
(-> tx-config :listen-context-config :port)
(-> tx-config :listen-context-config :username)
(-> tx-config :listen-context-config :password)
(-> tx-config :listen-address)
message
:properties (merge {"origin" scheduler-session-id}
(when-not (nil? target)
{"target" target}))
:timeout 1)))
ch-close (async/chan)
ch-synch (async/chan 2)
_ (async/go-loop [[v ch] (async/alts! [ch-close
ch-synch])]
(when-not (= ch ch-close)
(info "ch-synch loop: " v)
(condp = (first v)
:overseer-synch-transaction
(do
(info "process transaction event: " (second v))
(when-let [msg (overseer-server/process-transaction overseer-conn v)]
(info "broadcasting message: " msg)
(reply-rx-superuser request-channel nil [:overseer-transaction {:data msg}])
(overseer-server/receive overseer-conn (assoc msg :destination :overseer-transaction))))
:overseer-synch-scheduler
(do
(info "process schdeduler event: " (second v))
(reply-rx-superuser request-channel nil v))
(throw (ex-info (str "Failed to understand overseer transaction event '" v "'") {:cause "Only :overseer-synch-transaction or :overseer-synch-scheduler accepted"})))
(recur (async/alts! [ch-close
ch-synch]))))
_ (overseer-server/listen-synch overseer-conn (partial async/>!! ch-synch))
scheduler-registration {:overseer.resource/type :overseer/scheduler
:session-id scheduler-session-id
:host hostname}
_ (overseer-server/register-resource overseer-conn scheduler-registration)
scheduler-uuid (overseer-server/resource-session->uuid overseer-conn :overseer/scheduler scheduler-session-id)
_ (info "self scheduler-uuid: " scheduler-uuid)
scheduler-broadcast-interval 30000
_ (async/go-loop [[v ch] (async/alts! [ch-close
(async/timeout scheduler-broadcast-interval)])]
(when-not (= ch ch-close)
(reply-rx request-channel nil [:register-resource {:data (assoc scheduler-registration :uuid scheduler-uuid)}])
(recur (async/alts! [ch-close
(async/timeout scheduler-broadcast-interval)]))))
ch-receive-buffer (async/chan 500)
_ (async/go-loop [[v ch] (async/alts! [ch-close
ch-receive-buffer])]
(when-not (= ch ch-close)
(info "ch-receive-buffer loop: " v)
(if (overseer-server/synched? overseer-conn)
(try
(overseer-server/receive overseer-conn v)
(catch Throwable t
(error "failed to receive '" (get v :destination) "' message " v " " (utils/stack-trace-to-string t))))
(Thread/sleep 2000))
(recur (async/alts! [ch-close
ch-receive-buffer]))))
_ (async/go-loop [[v ch] (async/alts! [ch-close
(async/timeout 1000)])]
(when-not (= ch ch-close)
(when-let [task (overseer-server/task overseer-conn)]
(reply-tx request-channel nil [:overseer/task {:data task}]))
(recur (async/alts! [ch-close
(async/timeout 10000)]))))
]
{
:ch-close ch-close
:error
(fn [request-channel message]
(error "error session endpoint received: " message))
:ping
(fn [request-channel message]
;;(info "got ping: " message)
(reply-tx request-channel (:client-id message) [:pong {}]))
;; Overseer client api calls, e.g. registering a runtime or accepting a task
:status
(fn [request-channel message]
(let [;;_ (info "got status: " (pr-str message))
result (try
{:data (overseer-server/status overseer-conn)}
(catch Throwable t
{:error (utils/stack-trace-to-string t)}))]
(reply-tx request-channel
(:client-id message)
[:message (merge result (select-keys message [:session-request-id]))])))
:register-resource
(fn [request-channel {data :data :as message}]
(let [_ (info "es got register-resource: " (pr-str message))
result (try
{:data (overseer-server/register-resource overseer-conn data)}
(catch Throwable t
{:error (utils/stack-trace-to-string t)}))]
(reply-tx request-channel
(:client-id message)
[:resource-session result])))
:deregister-resource
(fn [request-channel {data :data :as message}]
(overseer-server/deregister-resource overseer-conn data))
:workflow-task-accept
(fn [request-channel {data :data :as message}]
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-accept)))
:workflow-task-done
(fn [request-channel {data :data :as message}]
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-done)))
:workflow-task-failed
(fn [request-channel {data :data :as message}]
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-failed)))
:workflow-task-running
(fn [request-channel {data :data :as message}]
(async/>!! ch-receive-buffer (assoc data :destination :workflow-task-running)))
:stop
(fn [request-channel message]
(info "got stop message: " message))}))
(stop [this]
(info "esu component stop")
(async/close! (:ch-close this))))