Skip to content
Permalink
dd80894f8c
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
61 lines (50 sloc) 2.76 KB
(require '[clojure.tools.logging :refer [info error]]
'[clojure.core.async :as async]
'[com.stuartsierra.component :as component]
'[immutant.messaging :as messaging]
'[de.mpg.shh.util-message-server.message-server :as message-server]
'[de.mpg.shh.util-message-server.life-cycle :as life-cycle]
'[de.mpg.shh.util-message-server.helpers :as message-helpers])
(defonce system (let [request-channel (async/chan)
system (atom (component/system-map :request-channel request-channel
:message-server (component/using (message-server/map->MessageServer {}) [:request-channel])))]
system))
(def test-listen-address "queue.shh.test.response")
(def test-listen-selector nil)
(def test-listen-context-config {:host "127.0.0.1" :port 5445 :username "shh-test" :password "batman"})
(def test-listen-transducer (filter (fn [x] true)))
(def test-listen-channel (async/chan 16 test-listen-transducer))
(info "Hello")
(swap! system component/start)
(let [result-chan (async/chan 1)]
(async/>!! (get-in @system [:request-channel]) {:msg "foo" :encoding :none :command :send :host "127.0.0.1" :port 5445 :username "shh-test" :password "batman" :address "queue.shh.test.response" :result-channel result-chan})
(let [timeout-channel (async/timeout 1000)]
(async/go (info "result: " (async/alt!
timeout-channel :timed-out
result-chan ([e] e))))))
;; Test new synch-send
(info "synch-send: " (message-helpers/synch-send (get-in @system [:request-channel]) "127.0.0.1" 5445 "shh-test" "batman" "queue.shh.test.response" "synchfoo" :timeout 1))
(async/go-loop [interesting-resp (async/<! test-listen-channel)]
(info "interesting-resp: " interesting-resp)
(if (= :stop interesting-resp)
(info "stop listening to interesting response channel")
(recur (async/<! test-listen-channel))))
;; Install a listener
(info "install listener")
(swap! system update-in [:message-server] life-cycle/listen test-listen-context-config test-listen-address test-listen-channel test-listen-selector)
(info "done install listener")
;;(Thread/sleep 1600)
;;(info "finish first sleep")
;;(info "un-install listener")
;;(swap! system update-in [:message-server] life-cycle/unlisten test-listen-context-config test-listen-address test-listen-channel)
;;(info "done un-install listener")
(Thread/sleep 16000)
(info "finish second sleep")
;;(async/>!! (get-in @system [:request-channel]) {:command :stop})
;;(info "sent stop command")
;;(Thread/sleep 1400)
;;(info "finish third sleep")
(swap! system component/stop)
(Thread/sleep 1400)
(info "finish third sleep")
(System/exit 0)