Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
import message server
  • Loading branch information
clayton committed Jul 11, 2017
1 parent 6a04c57 commit dd80894
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 0 deletions.
40 changes: 40 additions & 0 deletions bin/install.sh
@@ -0,0 +1,40 @@
#!/usr/bin/env bash

numArgs=$#

set -e -o pipefail

function die() {
MESG="${1:-Died}"
echo "${MESG}" >&2
exit 1
}


if [[ !$numArgs -gt 0 ]]; then
die "Please supply a version number e.g. 0.0.1"
fi

group="de.mpg.shh"
artifact="util-message-server"
version=$1

lein clean
lein deps
lein jar
lein pom

test -f "target/${artifact}-${version}.jar" || die "Build failed: jar not found"

lein localrepo install -p pom.xml target/${artifact}-${version}.jar ${group}/${artifact} ${version}

# create sha1sums for the jar and pom
group_path=$(echo "${group}" | tr "." "/")
jar_path="${HOME}/.m2/repository/${group_path}/${artifact}/${version}/${artifact}-${version}.jar"
jar_sum_path="${jar_path}.sha1"

pom_path="${HOME}/.m2/repository/${group_path}/${artifact}/${version}/${artifact}-${version}.pom"
pom_sum_path="${pom_path}.sha1"

shasum ${jar_path} | cut -d ' ' -f 1 > ${jar_sum_path}
shasum ${pom_path} | cut -d ' ' -f 1 > ${pom_sum_path}
23 changes: 23 additions & 0 deletions bin/test
@@ -0,0 +1,23 @@
#!/usr/bin/env bash

numArgs=$#

if [[ $numArgs -gt 0 ]]; then
test_name=$1
shift

case "$test_name" in
server)
exec lein with-profiles production,shhdocker trampoline run -m clojure.main script/test.clj
;;
dispatcher)
exec lein with-profiles production,shhdocker,test trampoline run -m clojure.main script/test_dispatcher.clj
;;
*)
echo "Test '$test_name' not found"
exit 1
esac
else
echo "Please supply the name of the test you would like to run. e.g. server,dispatcher"
fi

16 changes: 16 additions & 0 deletions project.clj
@@ -0,0 +1,16 @@
(defproject de.mpg.shh/util-message-server "0.0.11"
:description "Interop with hornetq"
:url "http://www.shh.mpg.de/"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/core.async "0.2.385"]
[org.apache.logging.log4j/log4j-api "2.5"]
[org.apache.logging.log4j/log4j-core "2.5"]
[org.apache.logging.log4j/log4j-1.2-api "2.5"]
[com.stuartsierra/component "0.3.1"]
[org.immutant/messaging "2.1.5"]]
:source-paths ["src/main/clojure"]
:profiles {:test {:resource-paths ["test-resources"]
:dependencies [[com.taoensso/encore "2.81.1"]]}})
61 changes: 61 additions & 0 deletions script/test.clj
@@ -0,0 +1,61 @@
(require '[clojure.tools.logging :refer [info error]]
'[clojure.core.async :as async]
'[com.stuartsierra.component :as component]
'[immutant.messaging :as messaging]
'[de.mpg.shh.util-message-server.message-server :as message-server]
'[de.mpg.shh.util-message-server.life-cycle :as life-cycle]
'[de.mpg.shh.util-message-server.helpers :as message-helpers])

(defonce system (let [request-channel (async/chan)
system (atom (component/system-map :request-channel request-channel
:message-server (component/using (message-server/map->MessageServer {}) [:request-channel])))]
system))

(def test-listen-address "queue.shh.test.response")
(def test-listen-selector nil)
(def test-listen-context-config {:host "127.0.0.1" :port 5445 :username "shh-test" :password "batman"})
(def test-listen-transducer (filter (fn [x] true)))
(def test-listen-channel (async/chan 16 test-listen-transducer))


(info "Hello")
(swap! system component/start)

(let [result-chan (async/chan 1)]
(async/>!! (get-in @system [:request-channel]) {:msg "foo" :encoding :none :command :send :host "127.0.0.1" :port 5445 :username "shh-test" :password "batman" :address "queue.shh.test.response" :result-channel result-chan})
(let [timeout-channel (async/timeout 1000)]
(async/go (info "result: " (async/alt!
timeout-channel :timed-out
result-chan ([e] e))))))

;; Test new synch-send
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "synchfoo" :timeout 1))

(async/go-loop [interesting-resp (async/<! test-listen-channel)]
(info "interesting-resp: " interesting-resp)
(if (= :stop interesting-resp)
(info "stop listening to interesting response channel")
(recur (async/<! test-listen-channel))))

;; Install a listener
(info "install listener")
(swap! system update-in [:message-server] life-cycle/listen test-listen-context-config test-listen-address test-listen-channel test-listen-selector)
(info "done install listener")



;;(Thread/sleep 1600)
;;(info "finish first sleep")
;;(info "un-install listener")
;;(swap! system update-in [:message-server] life-cycle/unlisten test-listen-context-config test-listen-address test-listen-channel)
;;(info "done un-install listener")
(Thread/sleep 16000)
(info "finish second sleep")
;;(async/>!! (get-in @system [:request-channel]) {:command :stop})
;;(info "sent stop command")
;;(Thread/sleep 1400)
;;(info "finish third sleep")
(swap! system component/stop)
(Thread/sleep 1400)
(info "finish third sleep")
(System/exit 0)
70 changes: 70 additions & 0 deletions script/test_dispatcher.clj
@@ -0,0 +1,70 @@
(require '[clojure.tools.logging :refer [info error]]
'[clojure.core.async :as async]
'[com.stuartsierra.component :as component]
'[de.mpg.shh.util-message-server.message-server :as message-server]
'[de.mpg.shh.util-message-server.message-dispatcher :as message-dispatcher]
'[de.mpg.shh.util-message-server.helpers :as message-helpers]
'[taoensso.encore :as enc])

(defn test-array
[t]
(let [check (type (t []))]
(fn [arg] (instance? check arg))))

(def byte-array?
(test-array byte-array))

(defn decode-bytes-message
[maybe-bytes]
(if (byte-array? maybe-bytes)
(String. maybe-bytes "UTF-8")
maybe-bytes))

;; edn transducer
(def dispatcher-request-transducer
(comp
(map decode-bytes-message)
(map enc/read-edn)))

(def message-dispatcher-config
{:listen-context-config {:host "127.0.0.1"
:port 5445
:username "shh-test"
:password "batman"}
:listen-address "queue.shh.test.request"
:dispatcher-request-buffer-size 16
:dispatcher-request-transducer dispatcher-request-transducer
:end-points {:test-endpoint-one (fn [request-channel ?data] (info "test-end-point-one: " ?data))
:test-endpoint-two (fn [request-channel ?data] (info "test-end-point-two: " ?data))}
:stop-message "[:stop]"})

(def selective-message-dispatcher-config
(assoc message-dispatcher-config :listen-address "queue.shh.test.response"
:listen-selector "origin <> 'foo'"))

(defonce system (let [request-channel (async/chan)
system (atom (component/system-map :request-channel request-channel
:message-server (component/using (message-server/map->MessageServer {}) [:request-channel])
:message-dispatcher (component/using (message-dispatcher/map->MessageDispatcher message-dispatcher-config) [:message-server])
:selective-message-dispatcher (component/using (message-dispatcher/map->MessageDispatcher selective-message-dispatcher-config) [:message-server])))]
system))



(info "Hello from Dispatcher test")
(swap! system component/start)


;; Test new synch-send
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.request" "[:test-endpoint-one \"Hello from test one\"]" :timeout 1))

;; Test selectors
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"One, two, \"]" :timeout 1))
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"skip a few, \"]" :timeout 1 :properties {"origin" "foo"}))
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "[:test-endpoint-two \"ninety nine, one hundred\"]" :timeout 1 :properties {"origin" "bar"}))


(Thread/sleep 1400)
(info "finish sleep")
(swap! system component/stop)
(System/exit 0)
17 changes: 17 additions & 0 deletions src/main/clojure/de/mpg/shh/util_message_server/helpers.clj
@@ -0,0 +1,17 @@
(ns de.mpg.shh.util-message-server.helpers
(:require [clojure.tools.logging :refer [info error]]
[clojure.core.async :as async]))

(defn synch-send
[request-channel host port username password address msg & {:keys [properties encoding timeout result-channel] :or {properties nil encoding :edn timeout 1000 result-channel (async/chan 1)}}]
(async/>!! request-channel {:msg msg
:properties properties
:encoding encoding
:command :send
:host host
:port port
:username username
:password password
:address address
:result-channel result-channel})
(first (async/alts!! [(async/timeout timeout) result-channel])))
@@ -0,0 +1,7 @@
(ns de.mpg.shh.util-message-server.life-cycle)

(defprotocol ListenableLifeCycle
(listen [component context-config address channel selector] "Subscribes to the address at the given context and copies the internal response channel onto the supplied channel")
(unlisten [component context-config address channel] "Stop copying the internal response channel onto the given channel. If this is the last listener for this context and address, unsubscribe"))


@@ -0,0 +1,66 @@
(ns de.mpg.shh.util-message-server.message-dispatcher
(:require [clojure.tools.logging :refer [info error]]
[clojure.string :as str]
[clojure.core.async :as async]
[com.stuartsierra.component :as component]
[de.mpg.shh.util-message-server.life-cycle :as life-cycle])
(:import [java.io StringWriter PrintWriter]))

(defn stack-trace-to-string
"Returns a string containing the output of .printStackTrace"
[t]
(let [sw (StringWriter.)
pw (PrintWriter. sw)
_ (.printStackTrace t pw)]
(.toString sw)))

(defn report-error-truthfully
[request-channel end-points error-msg]
(if (contains? end-points :error)
(try
(do
((get end-points :error) request-channel error-msg)
true)
(catch Throwable t
(error (stack-trace-to-string t))
true))
(do
(error error-msg)
true)))

;; This provides a convenience dispatcher, just supply a map of end points
(defn dispatch [request-channel end-points ?data]
(if (vector? ?data)
(if (= (first ?data) :stop)
false
(if (contains? end-points (first ?data))
(try
(do
((get end-points (first ?data)) request-channel (second ?data))
true)
(catch Throwable t
(report-error-truthfully request-channel end-points (stack-trace-to-string t))))
(report-error-truthfully request-channel end-points (str/join "" ["Unhandled event " (first ?data) " not found in end-points map"]))))
(report-error-truthfully request-channel end-points (str/join "" ["Event format not recognised " (str ?data)]))))

(defrecord MessageDispatcher [listen-context-config listen-address listen-selector dispatcher-request-buffer-size stop-message]
component/Lifecycle
(start [{{request-channel :request-channel
:as message-server} :message-server
dispatcher-request-transducer :dispatcher-request-transducer
end-points :end-points
:as component}]
(let [dispatcher-request-channel (async/chan dispatcher-request-buffer-size dispatcher-request-transducer)
_ (life-cycle/listen message-server listen-context-config listen-address dispatcher-request-channel listen-selector)]
(async/go-loop [dispatcher-request (async/<! dispatcher-request-channel)]
(if (dispatch request-channel end-points dispatcher-request)
(recur (async/<! dispatcher-request-channel))
(info "Shutdown dispatcher request channel")))
(assoc component :dispatcher-request-channel dispatcher-request-channel
:stop-message stop-message
:dispatcher-request-buffer-size dispatcher-request-buffer-size)))
(stop [{dispatcher-request-channel :dispatcher-request-channel
stop-message :stop-message
:as component}]
(async/>!! dispatcher-request-channel stop-message)))

0 comments on commit dd80894

Please sign in to comment.