diff --git a/bin/install.sh b/bin/install.sh new file mode 100755 index 0000000..7780818 --- /dev/null +++ b/bin/install.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +numArgs=$# + +set -e -o pipefail + +function die() { + MESG="${1:-Died}" + echo "${MESG}" >&2 + exit 1 +} + + +if [[ !$numArgs -gt 0 ]]; then + die "Please supply a version number e.g. 0.0.1" +fi + +group="de.mpg.shh" +artifact="util-message-server" +version=$1 + +lein clean +lein deps +lein jar +lein pom + +test -f "target/${artifact}-${version}.jar" || die "Build failed: jar not found" + +lein localrepo install -p pom.xml target/${artifact}-${version}.jar ${group}/${artifact} ${version} + +# create sha1sums for the jar and pom +group_path=$(echo "${group}" | tr "." "/") +jar_path="${HOME}/.m2/repository/${group_path}/${artifact}/${version}/${artifact}-${version}.jar" +jar_sum_path="${jar_path}.sha1" + +pom_path="${HOME}/.m2/repository/${group_path}/${artifact}/${version}/${artifact}-${version}.pom" +pom_sum_path="${pom_path}.sha1" + +shasum ${jar_path} | cut -d ' ' -f 1 > ${jar_sum_path} +shasum ${pom_path} | cut -d ' ' -f 1 > ${pom_sum_path} diff --git a/bin/test b/bin/test new file mode 100755 index 0000000..2f37538 --- /dev/null +++ b/bin/test @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +numArgs=$# + +if [[ $numArgs -gt 0 ]]; then + test_name=$1 + shift + + case "$test_name" in + server) + exec lein with-profiles production,shhdocker trampoline run -m clojure.main script/test.clj + ;; + dispatcher) + exec lein with-profiles production,shhdocker,test trampoline run -m clojure.main script/test_dispatcher.clj + ;; + *) + echo "Test '$test_name' not found" + exit 1 + esac +else + echo "Please supply the name of the test you would like to run. e.g. server,dispatcher" +fi + diff --git a/project.clj b/project.clj new file mode 100755 index 0000000..b6c051c --- /dev/null +++ b/project.clj @@ -0,0 +1,16 @@ +(defproject de.mpg.shh/util-message-server "0.0.11" + :description "Interop with hornetq" + :url "http://www.shh.mpg.de/" + :license {:name "Eclipse Public License" + :url "http://www.eclipse.org/legal/epl-v10.html"} + :dependencies [[org.clojure/clojure "1.8.0"] + [org.clojure/tools.logging "0.3.1"] + [org.clojure/core.async "0.2.385"] + [org.apache.logging.log4j/log4j-api "2.5"] + [org.apache.logging.log4j/log4j-core "2.5"] + [org.apache.logging.log4j/log4j-1.2-api "2.5"] + [com.stuartsierra/component "0.3.1"] + [org.immutant/messaging "2.1.5"]] + :source-paths ["src/main/clojure"] + :profiles {:test {:resource-paths ["test-resources"] + :dependencies [[com.taoensso/encore "2.81.1"]]}}) diff --git a/script/test.clj b/script/test.clj new file mode 100644 index 0000000..b7c599b --- /dev/null +++ b/script/test.clj @@ -0,0 +1,61 @@ +(require '[clojure.tools.logging :refer [info error]] + '[clojure.core.async :as async] + '[com.stuartsierra.component :as component] + '[immutant.messaging :as messaging] + '[de.mpg.shh.util-message-server.message-server :as message-server] + '[de.mpg.shh.util-message-server.life-cycle :as life-cycle] + '[de.mpg.shh.util-message-server.helpers :as message-helpers]) + +(defonce system (let [request-channel (async/chan) + system (atom (component/system-map :request-channel request-channel + :message-server (component/using (message-server/map->MessageServer {}) [:request-channel])))] + system)) + +(def test-listen-address "queue.shh.test.response") +(def test-listen-selector nil) +(def test-listen-context-config {:host "127.0.0.1" :port 5445 :username "shh-test" :password "batman"}) +(def test-listen-transducer (filter (fn [x] true))) +(def test-listen-channel (async/chan 16 test-listen-transducer)) + + +(info "Hello") +(swap! system component/start) + +(let [result-chan (async/chan 1)] + (async/>!! (get-in @system [:request-channel]) {:msg "foo" :encoding :none :command :send :host "127.0.0.1" :port 5445 :username "shh-test" :password "batman" :address "queue.shh.test.response" :result-channel result-chan}) + (let [timeout-channel (async/timeout 1000)] + (async/go (info "result: " (async/alt! + timeout-channel :timed-out + result-chan ([e] e)))))) + +;; Test new synch-send +(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "synchfoo" :timeout 1)) + +(async/go-loop [interesting-resp (async/!! (get-in @system [:request-channel]) {:command :stop}) +;;(info "sent stop command") +;;(Thread/sleep 1400) +;;(info "finish third sleep") +(swap! system component/stop) +(Thread/sleep 1400) +(info "finish third sleep") +(System/exit 0) diff --git a/script/test_dispatcher.clj b/script/test_dispatcher.clj new file mode 100644 index 0000000..cdbb9b6 --- /dev/null +++ b/script/test_dispatcher.clj @@ -0,0 +1,70 @@ +(require '[clojure.tools.logging :refer [info error]] + '[clojure.core.async :as async] + '[com.stuartsierra.component :as component] + '[de.mpg.shh.util-message-server.message-server :as message-server] + '[de.mpg.shh.util-message-server.message-dispatcher :as message-dispatcher] + '[de.mpg.shh.util-message-server.helpers :as message-helpers] + '[taoensso.encore :as enc]) + +(defn test-array + [t] + (let [check (type (t []))] + (fn [arg] (instance? check arg)))) + +(def byte-array? + (test-array byte-array)) + +(defn decode-bytes-message + [maybe-bytes] + (if (byte-array? maybe-bytes) + (String. maybe-bytes "UTF-8") + maybe-bytes)) + +;; edn transducer +(def dispatcher-request-transducer + (comp + (map decode-bytes-message) + (map enc/read-edn))) + +(def message-dispatcher-config + {:listen-context-config {:host "127.0.0.1" + :port 5445 + :username "shh-test" + :password "batman"} + :listen-address "queue.shh.test.request" + :dispatcher-request-buffer-size 16 + :dispatcher-request-transducer dispatcher-request-transducer + :end-points {:test-endpoint-one (fn [request-channel ?data] (info "test-end-point-one: " ?data)) + :test-endpoint-two (fn [request-channel ?data] (info "test-end-point-two: " ?data))} + :stop-message "[:stop]"}) + +(def selective-message-dispatcher-config + (assoc message-dispatcher-config :listen-address "queue.shh.test.response" + :listen-selector "origin <> 'foo'")) + +(defonce system (let [request-channel (async/chan) + system (atom (component/system-map :request-channel request-channel + :message-server (component/using (message-server/map->MessageServer {}) [:request-channel]) + :message-dispatcher (component/using (message-dispatcher/map->MessageDispatcher message-dispatcher-config) [:message-server]) + :selective-message-dispatcher (component/using (message-dispatcher/map->MessageDispatcher selective-message-dispatcher-config) [:message-server])))] + system)) + + + +(info "Hello from Dispatcher test") +(swap! system component/start) + + +;; Test new synch-send +(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.request" "[:test-endpoint-one \"Hello from test one\"]" :timeout 1)) + +;; Test selectors +(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"One, two, \"]" :timeout 1)) +(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"skip a few, \"]" :timeout 1 :properties {"origin" "foo"})) +(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"ninety nine, one hundred\"]" :timeout 1 :properties {"origin" "bar"})) + + +(Thread/sleep 1400) +(info "finish sleep") +(swap! system component/stop) +(System/exit 0) diff --git a/src/main/clojure/de/mpg/shh/util_message_server/helpers.clj b/src/main/clojure/de/mpg/shh/util_message_server/helpers.clj new file mode 100644 index 0000000..99566cd --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_message_server/helpers.clj @@ -0,0 +1,17 @@ +(ns de.mpg.shh.util-message-server.helpers + (:require [clojure.tools.logging :refer [info error]] + [clojure.core.async :as async])) + +(defn synch-send + [request-channel host port username password address msg & {:keys [properties encoding timeout result-channel] :or {properties nil encoding :edn timeout 1000 result-channel (async/chan 1)}}] + (async/>!! request-channel {:msg msg + :properties properties + :encoding encoding + :command :send + :host host + :port port + :username username + :password password + :address address + :result-channel result-channel}) + (first (async/alts!! [(async/timeout timeout) result-channel]))) diff --git a/src/main/clojure/de/mpg/shh/util_message_server/life_cycle.clj b/src/main/clojure/de/mpg/shh/util_message_server/life_cycle.clj new file mode 100644 index 0000000..490ae9a --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_message_server/life_cycle.clj @@ -0,0 +1,7 @@ +(ns de.mpg.shh.util-message-server.life-cycle) + +(defprotocol ListenableLifeCycle + (listen [component context-config address channel selector] "Subscribes to the address at the given context and copies the internal response channel onto the supplied channel") + (unlisten [component context-config address channel] "Stop copying the internal response channel onto the given channel. If this is the last listener for this context and address, unsubscribe")) + + diff --git a/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj b/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj new file mode 100644 index 0000000..c06e17b --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_message_server/message_dispatcher.clj @@ -0,0 +1,66 @@ +(ns de.mpg.shh.util-message-server.message-dispatcher + (:require [clojure.tools.logging :refer [info error]] + [clojure.string :as str] + [clojure.core.async :as async] + [com.stuartsierra.component :as component] + [de.mpg.shh.util-message-server.life-cycle :as life-cycle]) + (:import [java.io StringWriter PrintWriter])) + +(defn stack-trace-to-string + "Returns a string containing the output of .printStackTrace" + [t] + (let [sw (StringWriter.) + pw (PrintWriter. sw) + _ (.printStackTrace t pw)] + (.toString sw))) + +(defn report-error-truthfully + [request-channel end-points error-msg] + (if (contains? end-points :error) + (try + (do + ((get end-points :error) request-channel error-msg) + true) + (catch Throwable t + (error (stack-trace-to-string t)) + true)) + (do + (error error-msg) + true))) + +;; This provides a convenience dispatcher, just supply a map of end points +(defn dispatch [request-channel end-points ?data] + (if (vector? ?data) + (if (= (first ?data) :stop) + false + (if (contains? end-points (first ?data)) + (try + (do + ((get end-points (first ?data)) request-channel (second ?data)) + true) + (catch Throwable t + (report-error-truthfully request-channel end-points (stack-trace-to-string t)))) + (report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"])))) + (report-error-truthfully request-channel end-points (str/join "" ["Event format not recognised " (str ?data)])))) + +(defrecord MessageDispatcher [listen-context-config listen-address listen-selector dispatcher-request-buffer-size stop-message] + component/Lifecycle + (start [{{request-channel :request-channel + :as message-server} :message-server + dispatcher-request-transducer :dispatcher-request-transducer + end-points :end-points + :as component}] + (let [dispatcher-request-channel (async/chan dispatcher-request-buffer-size dispatcher-request-transducer) + _ (life-cycle/listen message-server listen-context-config listen-address dispatcher-request-channel listen-selector)] + (async/go-loop [dispatcher-request (async/!! dispatcher-request-channel stop-message))) + diff --git a/src/main/clojure/de/mpg/shh/util_message_server/message_server.clj b/src/main/clojure/de/mpg/shh/util_message_server/message_server.clj new file mode 100644 index 0000000..f070119 --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_message_server/message_server.clj @@ -0,0 +1,174 @@ +(ns de.mpg.shh.util-message-server.message-server + (:require [clojure.tools.logging :refer [info error]] + [clojure.core.async :as async] + [clojure.walk :as walk] + [clojure.string :as str] + [com.stuartsierra.component :as component] + [immutant.messaging :as messaging] + [de.mpg.shh.util-message-server.life-cycle :as life-cycle])) + +(defn aquire-context + [contexts context-config] + (if (contains? @contexts context-config) + (get @contexts context-config) + (let [context (try + (messaging/context :host (:host context-config) + :port (:port context-config) + :username (:username context-config) + :password (:password context-config)) + (catch Exception e e)) + context-info {:context context + :addresses {}} + _ (swap! contexts assoc context-config context-info)] + context-info))) + +(defn create-destination + [context address] + (cond + (nil? context) (Exception. (str/join "" ["Cannot create destination '" address "' context was nil"])) + (instance? Throwable context) (ex-info (str/join "" ["Cannot create destination '" address "' context caused an exception"]) {:cause context}) + (re-find #"^queue" address) (try (messaging/queue address :context context) (catch Exception e e)) + (re-find #"^topic" address) (try (messaging/topic address :context context) (catch Exception e e)) + :else (Exception. (str/join "" ["Address '" address "' does not start with queue|topic"])))) + +(defn aquire-destination + [contexts context-config address] + (let [addresses (get-in @contexts [context-config :addresses])] + (if (contains? addresses address) + (get addresses address) + (let [channel (async/chan) + destination-info {:destination (create-destination (get-in @contexts [context-config :context]) address) + :channel channel + :mult (async/mult channel) + :consumer-channels #{}} + _ (swap! contexts assoc-in [context-config :addresses address] destination-info)] + destination-info)))) + +(defn cleanup-contexts + [node] + (if (map? node) + (do + (when (contains? node :context) + (messaging/stop (:context node))) + (when (contains? node :listener) + (messaging/stop (:listener node))) + (when (contains? node :mult) + (async/untap-all (:mult node)))) + node)) + +(defn dispatch-result-message + [result-channel result-message] + (when (not (nil? result-channel)) + (async/offer! result-channel result-message) + (async/close! result-channel))) + +(defmulti command :command) +(defmethod command :send [{msg :msg + properties :properties + encoding :encoding + contexts :contexts + address :address + result-channel :result-channel + :as request}] + (if (not (contains? request :host)) + (do (dispatch-result-message result-channel (Exception. "Only sending to remote queues is allowed. Please supply :host")) true) + (let [context-key (select-keys request [:host :port :username :password]) + context-info (aquire-context contexts context-key) + destination-info (aquire-destination contexts context-key address)] + (if (instance? Throwable (:destination destination-info)) + (do (dispatch-result-message result-channel (Throwable->map (:destination destination-info))) true) + (try + (messaging/publish (:destination destination-info) msg :properties properties :encoding encoding) + (dispatch-result-message result-channel :sent) + true + (catch Exception e + (dispatch-result-message result-channel (Throwable->map e)) + true)))))) +(defmethod command :stop [_] false) +(defmethod command :default [{result-channel :result-channel + :as request}] + (dispatch-result-message result-channel (ex-info "MessageServer failed to understand your command" {:cause request})) + true) + + +;; Clients may send messages using the request channel, supplying context config and +;; and address. +;; May also supply :error-channel via which errors will be returned +;; This channel will be closed after an attempt is made to put the +;; error on the channel +;; +;; Clients are exepected to supply a channel (filtered according to their interest) +;; and the internal response channel for that address will be tapped onto the supplied +;; channel +;; Clients supply a context configuration (i.e. host of the queue) +;; and the address of the queue +;; MessageServer will start listening to this destination +;; If there are no more channels listening to this destination +;; MessageServer will stop listening +(defrecord MessageServer [] + component/Lifecycle + (start [{request-channel :request-channel + :as component}] + (let [contexts (atom {})] + (async/go-loop [request (async/!! request-channel {:command :stop}) + (async/close! request-channel) + (async/into [] request-channel) + (walk/postwalk cleanup-contexts @contexts) + component)) + +(extend-type MessageServer + life-cycle/ListenableLifeCycle + (listen [{contexts :contexts + :as component} + context-config + address + filtered-channel + selector] + (let [context-key (select-keys context-config [:host :port :username :password]) + context-info (aquire-context contexts context-key) + ;;_ (info "aquired context-info: " context-info) + destination-info (aquire-destination contexts context-key address) + ;;_ (info "aquired destination-info: " destination-info) + _ (async/tap (:mult destination-info) filtered-channel)] + (if (instance? Throwable (:destination destination-info)) + (async/>!! filtered-channel (Throwable->map (:destination destination-info))) + (let [listener (try + (messaging/listen (:destination destination-info) + (partial async/>!! (:channel destination-info)) + :selector selector) + (catch Exception e e))] + (if (instance? Throwable listener) + (async/>!! filtered-channel (Throwable->map listener)) + (do + (swap! contexts assoc-in [context-key :addresses address :listener] listener) + (swap! contexts assoc-in [context-key :addresses address :consumer-channels] filtered-channel))))) + component)) + (unlisten [{contexts :contexts + :as component} + context-config + address + filtered-channel] + (let [context-key (select-keys context-config [:host :port :username :password])] + (when (contains? @contexts context-key) + (when (contains? (get-in @contexts [context-key :addresses]) address) + (let [destination-info (get-in @contexts [context-key :addresses address]) + _ (info "unlisten for address: " address " destination info: " destination-info) + _ (async/untap (:mult destination-info) filtered-channel) + _ (swap! contexts update-in [context-key :addresses address :consumer-channels] disj filtered-channel)] + (when (empty? (get-in @contexts [context-key :addresses address :consumer-channels])) + (swap! contexts update-in [context-key :addresses] dissoc address) + (when (empty? (get-in @contexts [context-key :addresses])) + (messaging/stop (get-in @contexts [context-key :context])) + (swap! contexts dissoc context-key))))))) + component)) + +