Skip to content

shh-services/mentat-kafka

master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 

mentat-kafka

Kafka server for a mentat db.

*** You should run this with at least java 9 ***

This relies on having the libmentat_ffi library on your classpath and java bindings suitable for the jvm (adapted from the projects android bindings).

https://github.com/mozilla/mentat

This project is designed to act as the database for multiple clients via kafka topics. We access kafka via onyx-kafka.

lein uberjar

java -cp ${HOME}/repos/mentat/target/release:target/mentat-kafka-0.0.1-standalone.jar de.mpg.shh.mentat_kafka.core
Fixing logging when using onyx

Unfortunately adding onyx to the project stomps on all the logging. To fix this I need to do some voodoo with the dependencies. Essentially we need an implementation of the logger to appear first on the classpath, rather than the no op logger used by onyx.

--- a/deps.edn
+++ b/deps.edn
@@ -5,4 +5,5 @@
         org.apache.logging.log4j/log4j-core {:mvn/version "2.5"}
         org.apache.logging.log4j/log4j-1.2-api {:mvn/version "2.5"}
         org.mozilla/mentat {:mvn/version "0.11.1"}
-        org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4"}}}
+        org.slf4j/slf4j-simple {:mvn/version "1.7.12"}
+        org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4" :exclusions [org.slf4j/slf4j-nop]}}}
Fixing connection to zookeeper

Unfortunately we yet again have to do more messing around with dependencies to be able to connect to a recent version of zookeeper. Currently we fail to connect and get an error message in the zookeeper log like this:

tail -n 5 /projects1/pipeline_projects/overseer/var/zookeeper/mpi-cruncher1/zookeeper.log
2018-08-20 16:40:22,995 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /172.16.240.205:42870
2018-08-20 16:40:23,015 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@948] - Client attempting to establish new session at /172.16.240.205:42870
2018-08-20 16:40:23,137 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperServer@693] - Established session 0x102155b07d30007 with negotiated timeout 40000 for client /172.16.240.205:42870
2018-08-20 16:40:23,142 [myid:1] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@755] - Received packet at server of unknown type 15
2018-08-20 16:40:23,143 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /172.16.240.205:42870 which had sessionid 0x102155b07d30007

The solution is to pull in the version of zookeeper that we want

--- a/deps.edn
+++ b/deps.edn
@@ -5,4 +5,9 @@
         org.apache.logging.log4j/log4j-core {:mvn/version "2.5"}
         org.apache.logging.log4j/log4j-1.2-api {:mvn/version "2.5"}
         org.mozilla/mentat {:mvn/version "0.11.1"}
-        org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4"}}}
+        org.slf4j/slf4j-simple {:mvn/version "1.7.12"}
+        org.apache.curator/curator-framework {:mvn/version "4.0.1"}
+        org.apache.curator/curator-test {:mvn/version "2.9.1"}
+        org.apache.zookeeper/zookeeper {:mvn/version "3.4.12"}
+        org.apache.curator/curator-client {:mvn/version "4.0.1" :exclusions [org.apache.zookeeper/zookeeper]}
+        org.onyxplatform/onyx-kafka {:mvn/version "0.13.3.0-alpha4" :exclusions [org.slf4j/slf4j-nop org.apache.curator/curator-framework org.apache.curator/curator-test org.apache.zookeeper/zookeeper]}}}
Sending test messages

We can use the kafka command line tools to send some test messages to our topics.

ssh pipedag@crunch-1

# To create a topic by hand (our topic is set to auto create so we don't need to do this)
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic mentat.in --partitions 1 --replication-factor 3

# To send a message to our topic
echo "{:foo \"mary had a little lamb\"}" | /projects1/tools/kafka/2.11-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9002,mpi-cruncher2.sdag.ppj.shh.mpg.de:9002,mpi-cruncher4.sdag.ppj.shh.mpg.de:9002 --topic mentat.in

# To view all messages on the topic
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9002 --topic mentat.in --from-beginning

# delete the topic
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mentat.in

# Rest the retention time back to default
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mentat.in --deleteConfig retention.ms

# Delete messages on the topic by manipulating the retention timeout
/projects1/tools/kafka/2.11-2.0.0/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mentat.in --add-config retention.ms=1000
Using the java kafka consumer client to receive messages

The onyx-kafka client has an onyn.kafka.helpers namespace which can be used as follows to receive all messages from our topic.

(ns de.mpg.shh.mentat-kafka.application
  (:require [clojure.tools.logging :refer [info error]]
            [onyx.kafka.helpers :as kph]))

(defn deserialise-message [bytes]
  (info "bytes: " (String. bytes "UTF-8"))
  (read-string (String. bytes "UTF-8")))

(def kafka-address "mpi-cruncher2.sdag.ppj.shh.mpg.de:9002")

(defn start
  []
  (let [_ (info "narf")
        results (kph/take-now kafka-address "mentat.in" deserialise-message)
        _ (doseq [result results]
             (info "result: " result))
        _ (info "done, exiting")]))
Using onyx-kafka plugin to receive messages

This proved to be a difficult one. The problem is that onyx is built for distributed computing, therefore you may not receive any error message if your job cannot start, as it potentially could start if the state of your cluster improves. To help detect these errors the onyx-dashboard was helpful.

# On my mac
git clone https://github.com/onyx-platform/onyx-dashboard
cd onyx-dashboard

# Make sure you are running java 8 here.
lein uberjar
java -jar target/onyx-dashboard.jar mpi-cruncher2.sdag.ppj.shh.mpg.de:2181 false

Now browse to http://localhost:3000/ to have a view of what onyx is doing and any errors with your jobs.

Successfully running onyx

Onyx communicates using UDP. We can use netcat to check that we can communicate via UDP.

# On the remote we start a server that will reply with 'pong'
ssh pipedag@crunch-2
echo "pong" | nc -u -l 40200

# On your local machine (or other remote)
ssh overseer
echo "ping" | nc -u mpi-cruncher2.sdag.ppj.shh.mpg.de 40200 -

# Each should receive the others message via UDP

The reason for the UDP connection error from onyx is because there is indeed nothing to connect to on cruncher-2. Zookeeper and kafka both run there which is why we can use the kafka client directly. To use the onyx-kafka plugin however we need a peer to run our job, and we haven't started any on cruncher-2. We only started some virtual peers locally, so try localhost instead.

Incoming message is repeated

In this case the meta data for the incoming message specified the input topic. I passed this unmodified to write-message so the metadata meant it actually got delivered to the input topic, and ignored the output task config in the catalog.

About

Kafka server for a mentat db

Resources

Stars

Watchers

Forks

Releases

No releases published