Skip to content

Commit

Permalink
Merge pull request #1039 from samply/db-sync
Browse files Browse the repository at this point in the history
Improve Database Sync Efficiency
  • Loading branch information
alexanderkiel authored Jun 23, 2023
2 parents 78649dd + 0431a68 commit d5e17ff
Show file tree
Hide file tree
Showing 69 changed files with 679 additions and 634 deletions.
22 changes: 11 additions & 11 deletions modules/cql/test/blaze/elm/compiler/external_data_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:require
[blaze.anomaly :as ba]
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.compiler :as c]
[blaze.elm.compiler.core :as core]
[blaze.elm.compiler.external-data]
Expand Down Expand Up @@ -64,7 +64,7 @@
(deftest compile-retrieve-test
(testing "Patient context"
(testing "Patient"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]]]

(let [context
Expand All @@ -84,7 +84,7 @@
(is (= '(retrieve-resource) (core/-form expr)))))))

(testing "Observation"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "1"
:subject
Expand All @@ -107,7 +107,7 @@
(is (= '(compartment-list-retrieve "Observation") (core/-form expr))))))

(testing "with one code"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -149,7 +149,7 @@
(core/-form expr)))))))

(testing "with two codes"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -198,7 +198,7 @@
[1 :id] := "2"))))

(testing "with one concept"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -251,7 +251,7 @@

(testing "Specimen context"
(testing "Patient"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Specimen :id "0"
:subject
Expand All @@ -271,7 +271,7 @@

(testing "Unfiltered context"
(testing "Medication"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Medication :id "0"
:code
#fhir/CodeableConcept
Expand Down Expand Up @@ -301,7 +301,7 @@
[0 :id] := "0"))))

(testing "unknown code property"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [context
{:node node
:eval-context "Unfiltered"
Expand All @@ -322,7 +322,7 @@

(testing "with related context"
(testing "with pre-compiled database query"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [library {:codeSystems
{:def [{:name "sys-def-174848" :id "system-174915"}]}
:statements
Expand All @@ -340,7 +340,7 @@
type := WithRelatedContextQueryRetrieveExpression))))

(testing "unknown code property"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [library {:codeSystems
{:def [{:name "sys-def-174848" :id "system-174915"}]}
:statements
Expand Down
20 changes: 10 additions & 10 deletions modules/cql/test/blaze/elm/compiler/library_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns blaze.elm.compiler.library-test
(:require
[blaze.cql-translator :as t]
[blaze.db.api-stub :refer [mem-node-system]]
[blaze.db.api-stub :refer [mem-node-config]]
[blaze.elm.compiler :as compiler]
[blaze.elm.compiler.library :as library]
[blaze.elm.compiler.library-spec]
Expand Down Expand Up @@ -64,13 +64,13 @@
(deftest compile-library-test
(testing "empty library"
(let [library (t/translate "library Test")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
:expression-defs := {}))))

(testing "one static expression"
(let [library (t/translate "library Test define Foo: true")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "Foo" :context] := "Patient"
[:expression-defs "Foo" :expression] := true))))
Expand All @@ -80,7 +80,7 @@
using FHIR version '4.0.0'
context Patient
define Gender: Patient.gender")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "Gender" :context] := "Patient"
[:expression-defs "Gender" :expression compiler/form] := '(:gender (expr-ref "Patient"))))))
Expand All @@ -91,7 +91,7 @@
context Patient
define function Gender(P Patient): P.gender
define InInitialPopulation: Gender(Patient)")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "InInitialPopulation" :context] := "Patient"
[:expression-defs "InInitialPopulation" :resultTypeName] := "{http://hl7.org/fhir}AdministrativeGender"
Expand All @@ -107,7 +107,7 @@
define function Inc(i System.Integer): i + 1
define function Inc2(i System.Integer): Inc(i) + 1
define InInitialPopulation: Inc2(1)")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "InInitialPopulation" :context] := "Patient"
[:expression-defs "InInitialPopulation" :expression compiler/form] := '(call "Inc2" 1)))))
Expand All @@ -116,31 +116,31 @@
(testing "function"
(let [library (t/translate "library Test
define function Error(): singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message := "More than one element in `SingletonFrom` expression."))))

(testing "expression"
(let [library (t/translate "library Test
define Error: singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message := "More than one element in `SingletonFrom` expression.")))))

(testing "with parameter default"
(let [library (t/translate "library Test
parameter \"Measurement Period\" Interval<Date> default Interval[@2020-01-01, @2020-12-31]")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:parameter-default-values "Measurement Period" :start] := #system/date"2020-01-01"
[:parameter-default-values "Measurement Period" :end] := #system/date"2020-12-31"))))

(testing "with invalid parameter default"
(let [library (t/translate "library Test
parameter \"Measurement Start\" Integer default singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message "More than one element in `SingletonFrom` expression.")))))
8 changes: 4 additions & 4 deletions modules/cql/test/blaze/elm/compiler/queries_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:require
[blaze.anomaly :as ba]
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.code :as code]
[blaze.elm.code-spec]
[blaze.elm.compiler :as c]
Expand Down Expand Up @@ -128,7 +128,7 @@
(is (= '(eduction-query distinct [1 1]) (core/-form expr)))))))

(testing "Retrieve queries"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]]]

(let [db (d/db node)
Expand Down Expand Up @@ -272,7 +272,7 @@
;; condition. This operation is known as a semi-join in database languages.
(deftest compile-with-clause-test
(testing "Equiv With with two Observations comparing there subjects."
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -312,7 +312,7 @@
(queries/-form xform-factory)))))))

(testing "Equiv With with one Patient and one Observation comparing the patient with the operation subject."
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down
4 changes: 2 additions & 2 deletions modules/cql/test/blaze/elm/compiler/string_operators_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
https://cql.hl7.org/04-logicalspecification.html."
(:require
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.compiler :as c]
[blaze.elm.compiler.core :as core]
[blaze.elm.compiler.test-util :as tu]
Expand Down Expand Up @@ -203,7 +203,7 @@

(testing "retrieve"
(are [count]
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[(into [[:put {:fhir/type :fhir/Patient :id "0"}]]
(map (fn [id]
[:put {:fhir/type :fhir/Observation :id (str id)
Expand Down
15 changes: 10 additions & 5 deletions modules/db-stub/src/blaze/db/api_stub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[java-time.api :as time]))


(defn create-mem-node-system [node-config]
(defn create-mem-node-config [node-config]
{:blaze.db/node
(merge
{:tx-log (ig/ref :blaze.db/tx-log)
Expand Down Expand Up @@ -78,11 +78,16 @@
{:structure-definition-repo structure-definition-repo}})


(def mem-node-system
(create-mem-node-system {}))
(def mem-node-config
(create-mem-node-config {}))


(defmacro with-system-data [[binding-form system] txs & body]
`(with-system [system# ~system]
(defmacro with-system-data
"Runs `body` inside a system that is initialized from `config`, bound to
`binding-form` and finally halted.
Additionally the database is initialized with `txs`."
[[binding-form config] txs & body]
`(with-system [system# ~config]
(run! #(deref (d/transact (:blaze.db/node system#) %)) ~txs)
(let [~binding-form system#] ~@body)))
18 changes: 9 additions & 9 deletions modules/db-tx-log-kafka/test/blaze/db/tx_log/kafka_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
(def tx-cmd {:op "create" :type "Patient" :id "0" :hash patient-hash-0})


(def system
(def config
{::tx-log/kafka
{:bootstrap-servers bootstrap-servers
:last-t-executor (ig/ref ::kafka/last-t-executor)}
Expand Down Expand Up @@ -105,7 +105,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(is (= 1 @(tx-log/submit tx-log [tx-cmd] nil)))))

(testing "RecordTooLargeException"
Expand All @@ -121,7 +121,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(given-failed-future (tx-log/submit tx-log [tx-cmd] nil)
::anom/category := ::anom/unsupported
::anom/message := "A transaction with 1 commands generated a Kafka message which is larger than the configured maximum of null bytes. In order to prevent this error, increase the maximum message size by setting DB_KAFKA_MAX_REQUEST_SIZE to a higher number. msg-173357"))))
Expand All @@ -139,7 +139,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(given-failed-future @(tx-log/submit tx-log [tx-cmd] nil)
::anom/category := ::anom/fault
::anom/message := "msg-175337")))))
Expand All @@ -161,7 +161,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(with-open [queue (tx-log/new-queue tx-log 1)]
(is (empty? (tx-log/poll! queue (time/seconds 1))))))))

Expand All @@ -178,19 +178,19 @@
(Map/of (first partitions) 104614))
AutoCloseable
(close [_])))]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(is (= 104614 @(tx-log/last-t tx-log)))))))


(def config {:bootstrap-servers "localhost:9092"})
(def producer-config {:bootstrap-servers "localhost:9092"})


(deftest create-producer-test
(is (instance? KafkaProducer (kafka/create-producer config))))
(is (instance? KafkaProducer (kafka/create-producer producer-config))))


(deftest create-consumer-test
(given (.assignment (kafka/create-consumer config))
(given (.assignment (kafka/create-consumer producer-config))
count := 1
[0] := (TopicPartition. "tx" 0)))

Expand Down
19 changes: 14 additions & 5 deletions modules/db/src/blaze/db/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@
add-subsetted-xf)))


(defrecord Node [context tx-log tx-cache kv-store resource-store
(defrecord Node [context tx-log tx-cache kv-store resource-store sync-fn
search-param-registry resource-indexer state run? poll-timeout
finished]
np/Node
Expand All @@ -336,8 +336,7 @@

(-sync [node]
(log/trace "sync on last t")
(-> (tx-log/last-t tx-log)
(ac/then-compose #(np/-sync node %))))
(sync-fn node))

(-sync [node t]
(log/trace "sync on t =" t)
Expand Down Expand Up @@ -515,15 +514,25 @@
expected-kv-store-version))))))


(defn- sync-fn [storage]
(condp identical? storage
:distributed
(fn sync-distributed [^Node node]
(-> (tx-log/last-t (.-tx_log node))
(ac/then-compose #(np/-sync node %))))
(fn sync-standalone [^Node node]
(ac/completed-future (db/db node (:t @(.-state node)))))))


(defmethod ig/init-key :blaze.db/node
[_ {:keys [tx-log tx-cache indexer-executor kv-store resource-indexer
[_ {:keys [storage tx-log tx-cache indexer-executor kv-store resource-indexer
resource-store search-param-registry poll-timeout]
:or {poll-timeout (time/seconds 1)}
:as config}]
(init-msg config)
(check-version! kv-store)
(let [node (->Node (ctx config) tx-log tx-cache kv-store resource-store
search-param-registry resource-indexer
(sync-fn storage) search-param-registry resource-indexer
(atom (initial-state kv-store))
(volatile! true)
poll-timeout
Expand Down
Loading

0 comments on commit d5e17ff

Please sign in to comment.