Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
implement loopback properly, add pass channel
  • Loading branch information
clayton committed Sep 9, 2017
1 parent 4cf85af commit 8aeec18
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject de.mpg.shh/util-message-server "0.0.13"
(defproject de.mpg.shh/util-message-server "0.0.15"
:description "Interop with hornetq"
:url "http://www.shh.mpg.de/"
:license {:name "Eclipse Public License"
Expand Down
Expand Up @@ -29,7 +29,7 @@
true)))

;; This provides a convenience dispatcher, just supply a map of end points
(defn dispatch [request-channel end-points ?data]
(defn dispatch [request-channel end-points pass-channel ?data]
(if (vector? ?data)
(if (= (first ?data) :stop)
false
Expand All @@ -40,7 +40,12 @@
true)
(catch Throwable t
(report-error-truthfully request-channel end-points (stack-trace-to-string t))))
(report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"]))))
(if (nil? pass-channel)
(report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"]))
(do
(when-not (async/offer! pass-channel ?data)
(info "pass-channel full, dropping message"))
true))))
(report-error-truthfully request-channel end-points (str/join "" ["Event format not recognised " (str ?data)]))))

(defrecord MessageDispatcher [listen-context-config listen-address listen-selector dispatcher-request-buffer-size stop-message]
Expand All @@ -50,13 +55,14 @@
dispatcher-request-transducer :dispatcher-request-transducer
end-points :end-points
loopback-channel :loopback-channel
pass-channel :pass-channel
:as component}]
(let [dispatcher-request-channel (async/chan dispatcher-request-buffer-size dispatcher-request-transducer)
_ (life-cycle/listen message-server listen-context-config listen-address dispatcher-request-channel listen-selector)
dispatcher-request-merge-channel (async/merge (remove nil? [dispatcher-request-channel loopback-channel]))]
(async/go-loop [dispatcher-request (async/<! dispatcher-request-merge-channel)]
(if (dispatch request-channel end-points dispatcher-request)
(recur (async/<! dispatcher-request-channel))
dispatcher-channels (vec (remove nil? [dispatcher-request-channel loopback-channel]))]
(async/go-loop [[dispatcher-request dispatcher-channel] (async/alts! dispatcher-channels)]
(if (dispatch request-channel end-points pass-channel dispatcher-request)
(recur (async/alts! dispatcher-channels))
(info "Shutdown dispatcher request channel")))
(assoc component :dispatcher-request-channel dispatcher-request-channel
:stop-message stop-message
Expand Down

0 comments on commit 8aeec18

Please sign in to comment.