Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add utils for converting table rows into tx-data
  • Loading branch information
clayton committed Oct 4, 2017
1 parent 50e8cb7 commit f4a5eb0
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 3 deletions.
10 changes: 7 additions & 3 deletions project.clj
@@ -1,4 +1,4 @@
(defproject de.mpg.shh/util-datomic-peer "0.0.1"
(defproject de.mpg.shh/util-datomic-peer "0.0.2"
:description "Utilities for using datomic databases"
:url "http://www.shh.mpg.de/"
:license {:name "Eclipse Public License"
Expand All @@ -9,5 +9,9 @@
[org.apache.logging.log4j/log4j-core "2.5"]
[org.apache.logging.log4j/log4j-1.2-api "2.5"]
[org.slf4j/slf4j-log4j12 "1.6.4"]
[com.datomic/datomic-pro "0.9.5554" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j]]]
:source-paths ["src/main/clojure"])
[com.datomic/datomic-pro "0.9.5554" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j]]
[datascript "0.15.5"]]
:source-paths ["src/main/clojure"]
:profiles {:test {:source-paths ["src/test/clojure"]
:resource-paths ["test-resources"]
:dependencies [[midje "1.8.3"]]}})
116 changes: 116 additions & 0 deletions src/main/clojure/de/mpg/shh/util_datomic_peer/impl/push.clj
@@ -0,0 +1,116 @@
(ns de.mpg.shh.util-datomic-peer.impl.push
(:require [clojure.tools.logging :refer [info error]]
[clojure.string :as str]
[clojure.set :as set]
[datascript.core :as ds]
[datomic.api :as dt]))

(defn push->path
[accumulator path push-pattern]
(cond
(keyword? push-pattern)
(conj accumulator (conj path push-pattern))
(map? push-pattern)
(reduce (fn [acc i]
(push->path acc (conj path (ffirst push-pattern)) i)) accumulator (second (first push-pattern)))
(vector? push-pattern)
(reduce (fn [acc i]
(push->path acc path i)) accumulator push-pattern)
:else (throw (ex-info "Cannot find path for push" {:cause (str "no handler for: '" (type push-pattern) "'")}))))

(defn push->attr-set
[accumulator push-pattern]
(cond
(keyword? push-pattern)
(conj accumulator push-pattern)
(map? push-pattern)
(reduce (fn [acc i]
(push->attr-set acc i)) (conj accumulator (ffirst push-pattern)) (second (first push-pattern)))
(vector? push-pattern)
(reduce (fn [acc i]
(push->attr-set acc i)) accumulator push-pattern)
:else (throw (ex-info "Cannot add to attr set for push" {:cause (str "no handler for: '" (type push-pattern) "'")}))))

(defn component-attr?
[schema-map attr]
(= true (get-in schema-map [attr :db/isComponent])))

(defn component-attr->id-attr
[attr]
(let [attr-name (name attr)
_ (when-not (str/ends-with? attr-name "+") (throw (ex-info "Can't convert component-attr to id-attr" {:cause (str "component-attr name '" (name attr) "' doesn't end with '+'")})))
component-name (subs attr-name 0 (- (count attr-name) 1))]
(keyword (str (namespace attr) "." component-name "/" component-name "-id"))))

(defn where-clause-builder
[component-attr-id-attr-map row accumulator item]
(let [next-sym (gensym "?e")
component-id-attr (get component-attr-id-attr-map item)
component-id-attr-val (get row component-id-attr)]
(conj accumulator [(first (last accumulator)) item next-sym]
[next-sym component-id-attr component-id-attr-val])))

;; query for the id of the parent entity
(defn cache-row
[ds-conn attr-paths-lookup component-attr-id-attr-map row]
(let [;;_ (info "attr-paths-lookup: " attr-paths-lookup)
insertion-order (sort-by namespace (filter #(contains? attr-paths-lookup %) (keys row)))
;;_ (info "insertion-order: " insertion-order)
root-result (-> (ds/q '[:find [?e ...]
:in $ ?lookup-attr ?lookup-val
:where [?e ?lookup-attr ?lookup-val]]
(ds/db ds-conn)
(first insertion-order)
(get row (first insertion-order)))
first)
;;_ (info "root-result: " root-result)
_ (when (nil? root-result)
(ds/transact ds-conn [[:db/add -1 (first insertion-order) (get row (first insertion-order))]]))]
(doseq [field (rest insertion-order)]
;;(dorun (map (partial cache-field conn attr-paths-lookup component-attr-id-attr-map row) insertion-order))
(let [field-attr-paths (get attr-paths-lookup field)
;;_ (info "field-attr-paths: " field-attr-paths)
where-clauses (vec (reduce (partial where-clause-builder component-attr-id-attr-map row) [[(symbol "?e") (first insertion-order) (get row (first insertion-order))]] field-attr-paths))
component-id-attr (get component-attr-id-attr-map (last field-attr-paths))
component-id-attr-val (get row component-id-attr)
;;_ (info "field: " field)
;;_ (info "cid: " component-id-attr " cv: " component-id-attr-val)
;;_ (info "where clauses: " where-clauses)
]
(loop [wc where-clauses]
(when-not (empty? (into #{} (map first wc)))
(let [;;_ (info "search wc: " wc)
result (-> (ds/q `[:find [(~'pull ~(first (last wc)) [~'*]) ~'...]
:where ~@wc]
(ds/db ds-conn))
first)
;;_ (info "result: " result)
]
(cond
(nil? result)
(recur (vec (butlast (apply concat (butlast (partition-by first wc))))))
(not= (get result component-id-attr) component-id-attr-val)
(let [;;_ (info "found parent: " result)
tx-data (vec (distinct [[:db/add (:db/id result) (last field-attr-paths) -1]
[:db/add -1 component-id-attr component-id-attr-val]
[:db/add -1 field (get row field)]]))
;;_ (info "tx-data: " tx-data)
]
(ds/transact ds-conn tx-data))
(not= (get result field) (get row field))
(ds/transact ds-conn [[:db/add (:db/id result) field (get row field)]])))))
))
))

(defn datascript->datomic-tx-data
[ds-conn dt-partition dt-value-type-lookup]
(let [schema (:schema (ds/db ds-conn))]
(for [datom (ds/datoms (ds/db ds-conn) :eavt)]
(let [;;_ (info "e: " (:e datom) " a: " (:a datom) " v: " (:v datom) " :db/valueType " (get-in schema [(:a datom) :db/valueType]) " dt type " (get dt-value-type-lookup (:a datom)))
datom-val (cond
(= (get-in schema [(:a datom) :db/valueType]) :db.type/ref)
(dt/tempid dt-partition (* (:v datom) -1))
(= (get dt-value-type-lookup (:a datom)) :db.type/long)
(Long/valueOf (:v datom))
:else (:v datom))]
[:db/add (dt/tempid dt-partition (* (:e datom) -1)) (:a datom) datom-val]))))
18 changes: 18 additions & 0 deletions src/main/clojure/de/mpg/shh/util_datomic_peer/transaction_data.clj
@@ -0,0 +1,18 @@
(ns de.mpg.shh.util-datomic-peer.transaction-data
(:require [datascript.core :as ds]
[de.mpg.shh.util-datomic-peer.impl.push :as peer-push]))

(defn push->tx-data
([dt-partition dt-value-type-lookup schema-map push-pattern rows]
(let [attr-set (peer-push/push->attr-set #{} push-pattern)
component-attrs (filter (partial peer-push/component-attr? schema-map) attr-set)
component-attr-id-attr-map (into {} (map #(vector % (peer-push/component-attr->id-attr %)) component-attrs))]
(push->tx-data dt-partition dt-value-type-lookup schema-map push-pattern rows component-attr-id-attr-map)))
([dt-partition dt-value-type-lookup schema-map push-pattern rows component-attr-id-attr-map]
(let [schema (assoc schema-map :db/ident {:db/unique :db.unique/identity})
datoms []
ds-conn (ds/conn-from-db (ds/init-db datoms schema))
attr-paths (peer-push/push->path [] [] push-pattern)
attr-paths-lookup (into {} (map #(vector (last %) (vec (butlast %))) attr-paths))
_ (dorun (map (partial peer-push/cache-row ds-conn attr-paths-lookup component-attr-id-attr-map) rows))]
(peer-push/datascript->datomic-tx-data ds-conn dt-partition dt-value-type-lookup))))
59 changes: 59 additions & 0 deletions src/test/clojure/data/transaction_data.clj
@@ -0,0 +1,59 @@
(ns data.transaction-data
(:require [datomic.api :as dt]))

(def run-lane-library-data [{:domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3"
:domain.data.sequencing.illumina.run.lane/lane-id "L001"
:domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101"
:domain.data.sequencing.illumina.run.lane.library/read-count "250"
:file-name "JAE006.A0101_S0_L001_R1_001.fastq.gz"
:sha1sum "e92e815426b0bcb00ce3968512973e74b35c0650"
:path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R1_001.fastq.gz"}
{:domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3"
:domain.data.sequencing.illumina.run.lane/lane-id "L001"
:domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101"
:domain.data.sequencing.illumina.run.lane.library/read-count "250"
:file-name "JAE006.A0101_S0_L001_R2_001.fastq.gz"
:sha1sum "1fa8fb21a8162aa4731f3f300eb0de7a065962ac"
:path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R2_001.fastq.gz"}])

(def domain-data-value-type {:domain.data.sequencing.illumina.run.lane.library/read-count :db.type/long
:domain.data.sequencing.illumina.run.read/indexed? :db.type/boolean
:domain.data/uuid :db.type/uuid
:domain.data.sequencing.run/run-date :db.type/instant
:domain.data.sequencing.run.read/cycles :db.type/long
:domain.data.sequencing.illumina.run/run-date :db.type/instant
:domain.data.sequencing.run.lane.library/read-count :db.type/long
:domain.data.sequencing.illumina.run.read/cycles :db.type/long
:domain.data.sequencing.run.read/indexed? :db.type/boolean})

(def domain-data-datascript-schema {:domain.data.sequencing.illumina.run/read+ {:db/cardinality :db.cardinality/many
:db/isComponent true
:db/valueType :db.type/ref}
:domain.data.sequencing.illumina.run/lane+ {:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db/isComponent true}
:domain.data.sequencing.run/lane+ {:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db/isComponent true}
:domain.data.sequencing.run.lane/library+ {:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many}
:domain.data/uuid {:db/unique :db.unique/identity}
:domain.data.sequencing.run/run-id {:db/unique :db.unique/identity}
:domain.data.sequencing.illumina.run.lane/library+ {:db/isComponent true
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref}
:domain.data.sequencing.illumina.run/run-id {:db/unique :db.unique/identity}
:domain.data.sequencing.run/read+ {:db/isComponent true
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref}})

(def tx-data [[:db/add (dt/tempid :db.part/data -1) :domain.data.sequencing.illumina.run/lane+ (dt/tempid :db.part/data -2)]
[:db/add (dt/tempid :db.part/data -1) :domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3"]
[:db/add (dt/tempid :db.part/data -2) :domain.data.sequencing.illumina.run.lane/lane-id "L001"]
[:db/add (dt/tempid :db.part/data -2) :domain.data.sequencing.illumina.run.lane/library+ (dt/tempid :db.part/data -3)]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101"]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-count 250]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-one-file-path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R1_001.fastq.gz"]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum "e92e815426b0bcb00ce3968512973e74b35c0650"]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-two-file-path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R2_001.fastq.gz"]
[:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum "1fa8fb21a8162aa4731f3f300eb0de7a065962ac"]])
@@ -0,0 +1,34 @@
(ns de.mpg.shh.util-datomic-peer.transaction-data-test
(:require [clojure.tools.logging :refer [info error]]
[clojure.string :as str]
[clojure.set :as set]
[midje.sweet :refer :all]
[datomic.api :as dt]
[datascript.core :as ds]
[data.transaction-data :as transaction-data]
[de.mpg.shh.util-datomic-peer.transaction-data :as peer-transaction-data]))

(fact "Create tx-data for run-lane-library data"
(let [row->tx (fn [row]
(let [read-num (str/split (:file-name row) #"_")
read-num (-> row :file-name (str/split #"_") reverse second)
;;_ (info "read-num: " read-num)
path-kw (if (= read-num "R1")
:domain.data.sequencing.illumina.run.lane.library/read-one-file-path
:domain.data.sequencing.illumina.run.lane.library/read-two-file-path)
sum-kw (if (= read-num "R1")
:domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum
:domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum)]
(clojure.set/rename-keys row {:path path-kw :sha1sum sum-kw})))
rows (vec (map row->tx transaction-data/run-lane-library-data))
push-pattern [:domain.data.sequencing.illumina.run/run-id
{:domain.data.sequencing.illumina.run/lane+ [:domain.data.sequencing.illumina.run.lane/lane-id
{:domain.data.sequencing.illumina.run.lane/library+ [:domain.data.sequencing.illumina.run.lane.library/library-id
:domain.data.sequencing.illumina.run.lane.library/read-one-file-path
:domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum
:domain.data.sequencing.illumina.run.lane.library/read-two-file-path
:domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum
:domain.data.sequencing.illumina.run.lane.library/read-count]}]}]]
(vec (peer-transaction-data/push->tx-data :db.part/data transaction-data/domain-data-value-type transaction-data/domain-data-datascript-schema push-pattern rows))) => transaction-data/tx-data)


0 comments on commit f4a5eb0

Please sign in to comment.