Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
mentat-kafka/README.md
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
146 lines (109 sloc)
7.25 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# mentat-kafka | |
Kafka server for a mentat db. | |
*** You should run this with at least java 9 *** | |
This relies on having the libmentat_ffi library on your classpath and java bindings suitable for the jvm (adapted from the projects android bindings). | |
https://github.com/mozilla/mentat | |
This project is designed to act as the database for multiple clients via kafka topics. We access kafka via onyx-kafka. | |
```bash | |
lein uberjar | |
java -cp ${HOME}/repos/mentat/target/release:target/mentat-kafka-0.0.1-standalone.jar de.mpg.shh.mentat_kafka.core | |
``` | |
##### Fixing logging when using onyx | |
Unfortunately adding onyx to the project stomps on all the logging. To fix this I need to do some voodoo with the dependencies. Essentially we need an implementation of the logger to appear first on the classpath, rather than the no op logger used by onyx. | |
```bash | |
--- a/deps.edn | |
+++ b/deps.edn | |
@@ -5,4 +5,5 @@ | |
org.apache.logging.log4j/log4j-core {:mvn/version "2.5"} | |
org.apache.logging.log4j/log4j-1.2-api {:mvn/version "2.5"} | |
org.mozilla/mentat {:mvn/version "0.11.1"} | |
- org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4"}}} | |
+ org.slf4j/slf4j-simple {:mvn/version "1.7.12"} | |
+ org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4" :exclusions [org.slf4j/slf4j-nop]}}} | |
``` | |
##### Fixing connection to zookeeper | |
Unfortunately we yet again have to do more messing around with dependencies to be able to connect to a recent version of zookeeper. Currently we fail to connect and get an error message in the zookeeper log like this: | |
```bash | |
tail -n 5 /projects1/pipeline_projects/overseer/var/zookeeper/mpi-cruncher1/zookeeper.log | |
2018-08-20 16:40:22,995 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /172.16.240.205:42870 | |
2018-08-20 16:40:23,015 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@948] - Client attempting to establish new session at /172.16.240.205:42870 | |
2018-08-20 16:40:23,137 [myid:1] - INFO [CommitProcessor:1:ZooKeeperServer@693] - Established session 0x102155b07d30007 with negotiated timeout 40000 for client /172.16.240.205:42870 | |
2018-08-20 16:40:23,142 [myid:1] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@755] - Received packet at server of unknown type 15 | |
2018-08-20 16:40:23,143 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /172.16.240.205:42870 which had sessionid 0x102155b07d30007 | |
``` | |
The solution is to pull in the version of zookeeper that we want | |
```bash | |
--- a/deps.edn | |
+++ b/deps.edn | |
@@ -5,4 +5,9 @@ | |
org.apache.logging.log4j/log4j-core {:mvn/version "2.5"} | |
org.apache.logging.log4j/log4j-1.2-api {:mvn/version "2.5"} | |
org.mozilla/mentat {:mvn/version "0.11.1"} | |
- org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4"}}} | |
+ org.slf4j/slf4j-simple {:mvn/version "1.7.12"} | |
+ org.apache.curator/curator-framework {:mvn/version "4.0.1"} | |
+ org.apache.curator/curator-test {:mvn/version "2.9.1"} | |
+ org.apache.zookeeper/zookeeper {:mvn/version "3.4.12"} | |
+ org.apache.curator/curator-client {:mvn/version "4.0.1" :exclusions [org.apache.zookeeper/zookeeper]} | |
+ org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4" :exclusions [org.slf4j/slf4j-nop org.apache.curator/curator-framework org.apache.curator/curator-test org.apache.zookeeper/zookeeper]}}} | |
``` | |
###### Sending test messages | |
We can use the kafka command line tools to send some test messages to our topics. | |
```bash | |
ssh pipedag@crunch-1 | |
# To create a topic by hand (our topic is set to auto create so we don't need to do this) | |
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic mentat.in --partitions 1 --replication-factor 3 | |
# To send a message to our topic | |
echo "{:foo \"mary had a little lamb\"}" | /projects1/tools/kafka/2.11-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9002,mpi-cruncher2.sdag.ppj.shh.mpg.de:9002,mpi-cruncher4.sdag.ppj.shh.mpg.de:9002 --topic mentat.in | |
# To view all messages on the topic | |
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9002 --topic mentat.in --from-beginning | |
# delete the topic | |
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mentat.in | |
# Rest the retention time back to default | |
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mentat.in --deleteConfig retention.ms | |
# Delete messages on the topic by manipulating the retention timeout | |
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mentat.in --add-config retention.ms=1000 | |
``` | |
###### Using the java kafka consumer client to receive messages | |
The onyx-kafka client has an onyn.kafka.helpers namespace which can be used as follows to receive all messages from our topic. | |
```clojure | |
(ns de.mpg.shh.mentat-kafka.application | |
(:require [clojure.tools.logging :refer [info error]] | |
[onyx.kafka.helpers :as kph])) | |
(defn deserialise-message [bytes] | |
(info "bytes: " (String. bytes "UTF-8")) | |
(read-string (String. bytes "UTF-8"))) | |
(def kafka-address "mpi-cruncher2.sdag.ppj.shh.mpg.de:9002") | |
(defn start | |
[] | |
(let [_ (info "narf") | |
results (kph/take-now kafka-address "mentat.in" deserialise-message) | |
_ (doseq [result results] | |
(info "result: " result)) | |
_ (info "done, exiting")])) | |
``` | |
###### Using onyx-kafka plugin to receive messages | |
This proved to be a difficult one. The problem is that onyx is built for distributed computing, therefore you may not receive any error message if your job cannot start, as it potentially could start if the state of your cluster improves. To help detect these errors the onyx-dashboard was helpful. | |
```bash | |
# On my mac | |
git clone https://github.com/onyx-platform/onyx-dashboard | |
cd onyx-dashboard | |
# Make sure you are running java 8 here. | |
lein uberjar | |
java -jar target/onyx-dashboard.jar mpi-cruncher2.sdag.ppj.shh.mpg.de:2181 false | |
``` | |
Now browse to http://localhost:3000/ to have a view of what onyx is doing and any errors with your jobs. | |
##### Successfully running onyx | |
Onyx communicates using UDP. We can use netcat to check that we can communicate via UDP. | |
```bash | |
# On the remote we start a server that will reply with 'pong' | |
ssh pipedag@crunch-2 | |
echo "pong" | nc -u -l 40200 | |
# On your local machine (or other remote) | |
ssh overseer | |
echo "ping" | nc -u mpi-cruncher2.sdag.ppj.shh.mpg.de 40200 - | |
# Each should receive the others message via UDP | |
``` | |
The reason for the UDP connection error from onyx is because there is indeed nothing to connect to on cruncher-2. Zookeeper and kafka both run there which is why we can use the kafka client directly. To use the onyx-kafka plugin however we need a peer to run our job, and we haven't started any on cruncher-2. We only started some virtual peers locally, so try localhost instead. | |
###### Incoming message is repeated | |
In this case the meta data for the incoming message specified the input topic. I passed this unmodified to write-message so the metadata meant it actually got delivered to the input topic, and ignored the output task config in the catalog. |