diff --git a/project.clj b/project.clj index d0153a1..de7f741 100755 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject de.mpg.shh/util-message-server "0.0.13" +(defproject de.mpg.shh/util-message-server "0.0.15" :description "Interop with hornetq" :url "http://www.shh.mpg.de/" :license {:name "Eclipse Public License" diff --git a/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj b/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj index 6141d6f..8da2240 100644 --- a/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj +++ b/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj @@ -29,7 +29,7 @@ true))) ;; This provides a convenience dispatcher, just supply a map of end points -(defn dispatch [request-channel end-points ?data] +(defn dispatch [request-channel end-points pass-channel ?data] (if (vector? ?data) (if (= (first ?data) :stop) false @@ -40,7 +40,12 @@ true) (catch Throwable t (report-error-truthfully request-channel end-points (stack-trace-to-string t)))) - (report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"])))) + (if (nil? pass-channel) + (report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"])) + (do + (when-not (async/offer! pass-channel ?data) + (info "pass-channel full, dropping message")) + true)))) (report-error-truthfully request-channel end-points (str/join "" ["Event format not recognised " (str ?data)])))) (defrecord MessageDispatcher [listen-context-config listen-address listen-selector dispatcher-request-buffer-size stop-message] @@ -50,13 +55,14 @@ dispatcher-request-transducer :dispatcher-request-transducer end-points :end-points loopback-channel :loopback-channel + pass-channel :pass-channel :as component}] (let [dispatcher-request-channel (async/chan dispatcher-request-buffer-size dispatcher-request-transducer) _ (life-cycle/listen message-server listen-context-config listen-address dispatcher-request-channel listen-selector) - dispatcher-request-merge-channel (async/merge (remove nil? [dispatcher-request-channel loopback-channel]))] - (async/go-loop [dispatcher-request (async/