From f4a5eb065f5e8911b927a7ce6ba8b8b98eb2f63a Mon Sep 17 00:00:00 2001 From: Stephen Clayton Date: Wed, 4 Oct 2017 13:07:43 +0200 Subject: [PATCH] add utils for converting table rows into tx-data --- project.clj | 10 +- .../mpg/shh/util_datomic_peer/impl/push.clj | 116 ++++++++++++++++++ .../util_datomic_peer/transaction_data.clj | 18 +++ src/test/clojure/data/transaction_data.clj | 59 +++++++++ .../transaction_data_test.clj | 34 +++++ 5 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 src/main/clojure/de/mpg/shh/util_datomic_peer/impl/push.clj create mode 100644 src/main/clojure/de/mpg/shh/util_datomic_peer/transaction_data.clj create mode 100644 src/test/clojure/data/transaction_data.clj create mode 100644 src/test/clojure/de/mpg/shh/util_datomic_peer/transaction_data_test.clj diff --git a/project.clj b/project.clj index 3f39cc0..6817959 100755 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject de.mpg.shh/util-datomic-peer "0.0.1" +(defproject de.mpg.shh/util-datomic-peer "0.0.2" :description "Utilities for using datomic databases" :url "http://www.shh.mpg.de/" :license {:name "Eclipse Public License" @@ -9,5 +9,9 @@ [org.apache.logging.log4j/log4j-core "2.5"] [org.apache.logging.log4j/log4j-1.2-api "2.5"] [org.slf4j/slf4j-log4j12 "1.6.4"] - [com.datomic/datomic-pro "0.9.5554" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j]]] - :source-paths ["src/main/clojure"]) + [com.datomic/datomic-pro "0.9.5554" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j]] + [datascript "0.15.5"]] + :source-paths ["src/main/clojure"] + :profiles {:test {:source-paths ["src/test/clojure"] + :resource-paths ["test-resources"] + :dependencies [[midje "1.8.3"]]}}) diff --git a/src/main/clojure/de/mpg/shh/util_datomic_peer/impl/push.clj b/src/main/clojure/de/mpg/shh/util_datomic_peer/impl/push.clj new file mode 100644 index 0000000..046cb5b --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_datomic_peer/impl/push.clj @@ -0,0 +1,116 @@ +(ns de.mpg.shh.util-datomic-peer.impl.push + (:require [clojure.tools.logging :refer [info error]] + [clojure.string :as str] + [clojure.set :as set] + [datascript.core :as ds] + [datomic.api :as dt])) + +(defn push->path + [accumulator path push-pattern] + (cond + (keyword? push-pattern) + (conj accumulator (conj path push-pattern)) + (map? push-pattern) + (reduce (fn [acc i] + (push->path acc (conj path (ffirst push-pattern)) i)) accumulator (second (first push-pattern))) + (vector? push-pattern) + (reduce (fn [acc i] + (push->path acc path i)) accumulator push-pattern) + :else (throw (ex-info "Cannot find path for push" {:cause (str "no handler for: '" (type push-pattern) "'")})))) + +(defn push->attr-set + [accumulator push-pattern] + (cond + (keyword? push-pattern) + (conj accumulator push-pattern) + (map? push-pattern) + (reduce (fn [acc i] + (push->attr-set acc i)) (conj accumulator (ffirst push-pattern)) (second (first push-pattern))) + (vector? push-pattern) + (reduce (fn [acc i] + (push->attr-set acc i)) accumulator push-pattern) + :else (throw (ex-info "Cannot add to attr set for push" {:cause (str "no handler for: '" (type push-pattern) "'")})))) + +(defn component-attr? + [schema-map attr] + (= true (get-in schema-map [attr :db/isComponent]))) + +(defn component-attr->id-attr + [attr] + (let [attr-name (name attr) + _ (when-not (str/ends-with? attr-name "+") (throw (ex-info "Can't convert component-attr to id-attr" {:cause (str "component-attr name '" (name attr) "' doesn't end with '+'")}))) + component-name (subs attr-name 0 (- (count attr-name) 1))] + (keyword (str (namespace attr) "." component-name "/" component-name "-id")))) + +(defn where-clause-builder + [component-attr-id-attr-map row accumulator item] + (let [next-sym (gensym "?e") + component-id-attr (get component-attr-id-attr-map item) + component-id-attr-val (get row component-id-attr)] + (conj accumulator [(first (last accumulator)) item next-sym] + [next-sym component-id-attr component-id-attr-val]))) + +;; query for the id of the parent entity +(defn cache-row + [ds-conn attr-paths-lookup component-attr-id-attr-map row] + (let [;;_ (info "attr-paths-lookup: " attr-paths-lookup) + insertion-order (sort-by namespace (filter #(contains? attr-paths-lookup %) (keys row))) + ;;_ (info "insertion-order: " insertion-order) + root-result (-> (ds/q '[:find [?e ...] + :in $ ?lookup-attr ?lookup-val + :where [?e ?lookup-attr ?lookup-val]] + (ds/db ds-conn) + (first insertion-order) + (get row (first insertion-order))) + first) + ;;_ (info "root-result: " root-result) + _ (when (nil? root-result) + (ds/transact ds-conn [[:db/add -1 (first insertion-order) (get row (first insertion-order))]]))] + (doseq [field (rest insertion-order)] + ;;(dorun (map (partial cache-field conn attr-paths-lookup component-attr-id-attr-map row) insertion-order)) + (let [field-attr-paths (get attr-paths-lookup field) + ;;_ (info "field-attr-paths: " field-attr-paths) + where-clauses (vec (reduce (partial where-clause-builder component-attr-id-attr-map row) [[(symbol "?e") (first insertion-order) (get row (first insertion-order))]] field-attr-paths)) + component-id-attr (get component-attr-id-attr-map (last field-attr-paths)) + component-id-attr-val (get row component-id-attr) + ;;_ (info "field: " field) + ;;_ (info "cid: " component-id-attr " cv: " component-id-attr-val) + ;;_ (info "where clauses: " where-clauses) + ] + (loop [wc where-clauses] + (when-not (empty? (into #{} (map first wc))) + (let [;;_ (info "search wc: " wc) + result (-> (ds/q `[:find [(~'pull ~(first (last wc)) [~'*]) ~'...] + :where ~@wc] + (ds/db ds-conn)) + first) + ;;_ (info "result: " result) + ] + (cond + (nil? result) + (recur (vec (butlast (apply concat (butlast (partition-by first wc)))))) + (not= (get result component-id-attr) component-id-attr-val) + (let [;;_ (info "found parent: " result) + tx-data (vec (distinct [[:db/add (:db/id result) (last field-attr-paths) -1] + [:db/add -1 component-id-attr component-id-attr-val] + [:db/add -1 field (get row field)]])) + ;;_ (info "tx-data: " tx-data) + ] + (ds/transact ds-conn tx-data)) + (not= (get result field) (get row field)) + (ds/transact ds-conn [[:db/add (:db/id result) field (get row field)]]))))) + )) + )) + +(defn datascript->datomic-tx-data + [ds-conn dt-partition dt-value-type-lookup] + (let [schema (:schema (ds/db ds-conn))] + (for [datom (ds/datoms (ds/db ds-conn) :eavt)] + (let [;;_ (info "e: " (:e datom) " a: " (:a datom) " v: " (:v datom) " :db/valueType " (get-in schema [(:a datom) :db/valueType]) " dt type " (get dt-value-type-lookup (:a datom))) + datom-val (cond + (= (get-in schema [(:a datom) :db/valueType]) :db.type/ref) + (dt/tempid dt-partition (* (:v datom) -1)) + (= (get dt-value-type-lookup (:a datom)) :db.type/long) + (Long/valueOf (:v datom)) + :else (:v datom))] + [:db/add (dt/tempid dt-partition (* (:e datom) -1)) (:a datom) datom-val])))) diff --git a/src/main/clojure/de/mpg/shh/util_datomic_peer/transaction_data.clj b/src/main/clojure/de/mpg/shh/util_datomic_peer/transaction_data.clj new file mode 100644 index 0000000..c986f1e --- /dev/null +++ b/src/main/clojure/de/mpg/shh/util_datomic_peer/transaction_data.clj @@ -0,0 +1,18 @@ +(ns de.mpg.shh.util-datomic-peer.transaction-data + (:require [datascript.core :as ds] + [de.mpg.shh.util-datomic-peer.impl.push :as peer-push])) + +(defn push->tx-data + ([dt-partition dt-value-type-lookup schema-map push-pattern rows] + (let [attr-set (peer-push/push->attr-set #{} push-pattern) + component-attrs (filter (partial peer-push/component-attr? schema-map) attr-set) + component-attr-id-attr-map (into {} (map #(vector % (peer-push/component-attr->id-attr %)) component-attrs))] + (push->tx-data dt-partition dt-value-type-lookup schema-map push-pattern rows component-attr-id-attr-map))) + ([dt-partition dt-value-type-lookup schema-map push-pattern rows component-attr-id-attr-map] + (let [schema (assoc schema-map :db/ident {:db/unique :db.unique/identity}) + datoms [] + ds-conn (ds/conn-from-db (ds/init-db datoms schema)) + attr-paths (peer-push/push->path [] [] push-pattern) + attr-paths-lookup (into {} (map #(vector (last %) (vec (butlast %))) attr-paths)) + _ (dorun (map (partial peer-push/cache-row ds-conn attr-paths-lookup component-attr-id-attr-map) rows))] + (peer-push/datascript->datomic-tx-data ds-conn dt-partition dt-value-type-lookup)))) diff --git a/src/test/clojure/data/transaction_data.clj b/src/test/clojure/data/transaction_data.clj new file mode 100644 index 0000000..3ee5050 --- /dev/null +++ b/src/test/clojure/data/transaction_data.clj @@ -0,0 +1,59 @@ +(ns data.transaction-data + (:require [datomic.api :as dt])) + +(def run-lane-library-data [{:domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3" + :domain.data.sequencing.illumina.run.lane/lane-id "L001" + :domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101" + :domain.data.sequencing.illumina.run.lane.library/read-count "250" + :file-name "JAE006.A0101_S0_L001_R1_001.fastq.gz" + :sha1sum "e92e815426b0bcb00ce3968512973e74b35c0650" + :path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R1_001.fastq.gz"} + {:domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3" + :domain.data.sequencing.illumina.run.lane/lane-id "L001" + :domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101" + :domain.data.sequencing.illumina.run.lane.library/read-count "250" + :file-name "JAE006.A0101_S0_L001_R2_001.fastq.gz" + :sha1sum "1fa8fb21a8162aa4731f3f300eb0de7a065962ac" + :path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R2_001.fastq.gz"}]) + +(def domain-data-value-type {:domain.data.sequencing.illumina.run.lane.library/read-count :db.type/long + :domain.data.sequencing.illumina.run.read/indexed? :db.type/boolean + :domain.data/uuid :db.type/uuid + :domain.data.sequencing.run/run-date :db.type/instant + :domain.data.sequencing.run.read/cycles :db.type/long + :domain.data.sequencing.illumina.run/run-date :db.type/instant + :domain.data.sequencing.run.lane.library/read-count :db.type/long + :domain.data.sequencing.illumina.run.read/cycles :db.type/long + :domain.data.sequencing.run.read/indexed? :db.type/boolean}) + +(def domain-data-datascript-schema {:domain.data.sequencing.illumina.run/read+ {:db/cardinality :db.cardinality/many + :db/isComponent true + :db/valueType :db.type/ref} + :domain.data.sequencing.illumina.run/lane+ {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many + :db/isComponent true} + :domain.data.sequencing.run/lane+ {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many + :db/isComponent true} + :domain.data.sequencing.run.lane/library+ {:db/valueType :db.type/ref + :db/cardinality :db.cardinality/many} + :domain.data/uuid {:db/unique :db.unique/identity} + :domain.data.sequencing.run/run-id {:db/unique :db.unique/identity} + :domain.data.sequencing.illumina.run.lane/library+ {:db/isComponent true + :db/cardinality :db.cardinality/many + :db/valueType :db.type/ref} + :domain.data.sequencing.illumina.run/run-id {:db/unique :db.unique/identity} + :domain.data.sequencing.run/read+ {:db/isComponent true + :db/cardinality :db.cardinality/many + :db/valueType :db.type/ref}}) + +(def tx-data [[:db/add (dt/tempid :db.part/data -1) :domain.data.sequencing.illumina.run/lane+ (dt/tempid :db.part/data -2)] + [:db/add (dt/tempid :db.part/data -1) :domain.data.sequencing.illumina.run/run-id "170911_NS500559_0049_AHKJ2TBGX3"] + [:db/add (dt/tempid :db.part/data -2) :domain.data.sequencing.illumina.run.lane/lane-id "L001"] + [:db/add (dt/tempid :db.part/data -2) :domain.data.sequencing.illumina.run.lane/library+ (dt/tempid :db.part/data -3)] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/library-id "JAE006.A0101"] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-count 250] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-one-file-path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R1_001.fastq.gz"] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum "e92e815426b0bcb00ce3968512973e74b35c0650"] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-two-file-path "/rawdata1/releases/2017/170911_NS500559_0049_AHKJ2TBGX3/JAE006.A0101/JAE006.A0101_S0_L001_R2_001.fastq.gz"] + [:db/add (dt/tempid :db.part/data -3) :domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum "1fa8fb21a8162aa4731f3f300eb0de7a065962ac"]]) diff --git a/src/test/clojure/de/mpg/shh/util_datomic_peer/transaction_data_test.clj b/src/test/clojure/de/mpg/shh/util_datomic_peer/transaction_data_test.clj new file mode 100644 index 0000000..7ab0f9e --- /dev/null +++ b/src/test/clojure/de/mpg/shh/util_datomic_peer/transaction_data_test.clj @@ -0,0 +1,34 @@ +(ns de.mpg.shh.util-datomic-peer.transaction-data-test + (:require [clojure.tools.logging :refer [info error]] + [clojure.string :as str] + [clojure.set :as set] + [midje.sweet :refer :all] + [datomic.api :as dt] + [datascript.core :as ds] + [data.transaction-data :as transaction-data] + [de.mpg.shh.util-datomic-peer.transaction-data :as peer-transaction-data])) + +(fact "Create tx-data for run-lane-library data" + (let [row->tx (fn [row] + (let [read-num (str/split (:file-name row) #"_") + read-num (-> row :file-name (str/split #"_") reverse second) + ;;_ (info "read-num: " read-num) + path-kw (if (= read-num "R1") + :domain.data.sequencing.illumina.run.lane.library/read-one-file-path + :domain.data.sequencing.illumina.run.lane.library/read-two-file-path) + sum-kw (if (= read-num "R1") + :domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum + :domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum)] + (clojure.set/rename-keys row {:path path-kw :sha1sum sum-kw}))) + rows (vec (map row->tx transaction-data/run-lane-library-data)) + push-pattern [:domain.data.sequencing.illumina.run/run-id + {:domain.data.sequencing.illumina.run/lane+ [:domain.data.sequencing.illumina.run.lane/lane-id + {:domain.data.sequencing.illumina.run.lane/library+ [:domain.data.sequencing.illumina.run.lane.library/library-id + :domain.data.sequencing.illumina.run.lane.library/read-one-file-path + :domain.data.sequencing.illumina.run.lane.library/read-one-sha1sum + :domain.data.sequencing.illumina.run.lane.library/read-two-file-path + :domain.data.sequencing.illumina.run.lane.library/read-two-sha1sum + :domain.data.sequencing.illumina.run.lane.library/read-count]}]}]] + (vec (peer-transaction-data/push->tx-data :db.part/data transaction-data/domain-data-value-type transaction-data/domain-data-datascript-schema push-pattern rows))) => transaction-data/tx-data) + +