Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify connection updates, add cost center. #658

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/datahike/experimental/versioning.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
[datahike.core :refer [with]]
[datahike.store :refer [store-identity]]
[datahike.writing :refer [stored->db db->stored stored-db?
update-connection! commit! add-commit-meta!
create-commit-id flush-pending-writes]]
complete-db-update commit! create-commit-id flush-pending-writes]]
[superv.async :refer [<? S go-loop-try]]
[datahike.db.utils :refer [db?]]
[datahike.tools :as dt]))
Expand Down Expand Up @@ -135,12 +134,16 @@
"Create a merge commit to the current branch of this connection for parent
commit uuids. It is the responsibility of the caller to make sure that tx-data
contains the data to be merged into the branch from the parents. This function
ensures that the parent commits are properly tracked."
ensures that the parent commits are properly tracked.

NOTE: Currently merge! requires that you release all connections to conn and reconnect afterwards to reset the writer state. This will be fixed in the future by handling merge! through the writer."
([conn parents tx-data]
(merge! conn parents tx-data nil))
([conn parents tx-data tx-meta]
(parent-check parents)
(let [db (:db-after (update-connection! conn tx-data tx-meta #(with %1 %2 %3)))
parents (conj parents (get-in @conn [:config :branch]))]
(add-commit-meta! conn (commit! db parents))
(let [old @conn
db (:db-after (complete-db-update old (with old tx-data tx-meta)))
parents (conj parents (get-in old [:config :branch]))
commit-db (commit! db parents)]
(reset! conn commit-db)
true)))
9 changes: 7 additions & 2 deletions src/datahike/index/persistent_set.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,24 @@
(uuid (mapv (comp vec seq) (.keys node))))
(uuid)))

(defrecord CachedStorage [store config cache stats pending-writes]
(defrecord CachedStorage [store config cache stats pending-writes cost-center-fn]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we plan to use cost-center-fn? Is it for profiling and collecting statistics about how CachedStorage is being used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function can track a compute budget, e.g. for an API user and can throw when the budget is used up. I did not expose an interface to it yet to see whether it will prove useful. In our blockchainesque experiments with the datopia project we were able to render the access to the database permissionless by validating incoming data with Datalog (as a smart contract language) and this is complementary functionality to also make sure that such contracts get interrupted as soon as their budget is consumed. But I think this might be of interest also as an independent functionality just to meter an API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation :-)

IStorage
(store [_ node]
(@cost-center-fn :store)
(swap! stats update :writes inc)
(let [address (gen-address node (:crypto-hash? config))
_ (trace "writing storage: " address " crypto: " (:crypto-hash? config))]
(swap! pending-writes conj (k/assoc store address node {:sync? false}))
(wrapped/miss cache address node)
address))
(accessed [_ address]
(@cost-center-fn :accessed)
(trace "accessing storage: " address)
(swap! stats update :accessed inc)
(wrapped/hit cache address)
nil)
(restore [_ address]
(@cost-center-fn :restore)
(trace "reading: " address)
(if-let [cached (wrapped/lookup cache address)]
cached
Expand All @@ -234,7 +237,8 @@
(CachedStorage. store config
(atom (cache/lru-cache-factory {} :threshold (:store-cache-size config)))
(atom init-stats)
(atom [])))
(atom [])
(atom (fn [_] nil))))

(def ^:const DEFAULT_BRANCHING_FACTOR 512)

Expand Down Expand Up @@ -268,6 +272,7 @@
(int (or (:branching-factor m) 0))
nil ;; weak ref default
))

(defmethod di/add-konserve-handlers :datahike.index/persistent-set [config store]
;; deal with circular reference between storage and store
(let [settings (map->settings {:branching-factor DEFAULT_BRANCHING_FACTOR})
Expand Down
135 changes: 72 additions & 63 deletions src/datahike/writer.cljc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns ^:no-doc datahike.writer
(:require [superv.async :refer [S thread-try <?-]]
(:require [superv.async :refer [S thread-try <?- go-try]]
[taoensso.timbre :as log]
[datahike.core]
[datahike.writing :as w]
Expand Down Expand Up @@ -40,77 +40,86 @@
[transaction-queue commit-queue
(thread-try
S
(let [store (:store @(:wrapped-atom connection))]
(do
;; processing loop
(go-loop []
(if-let [{:keys [op args callback] :as invocation} (<?- transaction-queue)]
(do
(when (> (count transaction-queue-buffer) (* 0.9 transaction-queue-size))
(log/warn "Transaction queue buffer more than 90% full, "
(count transaction-queue-buffer) "of" transaction-queue-size " filled."
"Reduce transaction frequency."))
(let [op-fn (write-fn-map op)
res (try
(apply op-fn connection args)
(go-try S
;; delay processing until the writer we are part of in connection is set
(while (not (:writer @(:wrapped-atom connection)))
(<! (timeout 10)))
(loop [old @(:wrapped-atom connection)]
(if-let [{:keys [op args callback] :as invocation} (<?- transaction-queue)]
(do
(when (> (count transaction-queue-buffer) (* 0.9 transaction-queue-size))
(log/warn "Transaction queue buffer more than 90% full, "
(count transaction-queue-buffer) "of" transaction-queue-size " filled."
"Reduce transaction frequency."))
(let [old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection)))
(do
(log/warn "DEPRECATED. Connection was changed outside of writer.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it happens that the connection was changed outside of the writer, what would a potential reason be? Could it be that the user of Datahike is using the library incorrectly in some way?

If it should never happen in a correct program that we enter this line, maybe we should consider doing something more drastic here than showing a warning, but I am not sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataScript puts the connection into a plain atom, which worked fine for Datahike until I implemented the distributed memory model last year. Now we have an explicit Connection type that wraps an atom. The connection is still manipulated directly by merge! and import, which we need to reroute through the writer next. But there might be other parties messing with atom and I wanted to give them some grace period to port their code as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. Thanks for the explanation :-)

(assoc old :max-tx (:max-tx @(:wrapped-atom connection))))
old)

op-fn (write-fn-map op)
res (try
(apply op-fn old args)
;; Only catch ExceptionInfo here (intentionally rejected transactions).
;; Any other exceptions should crash the writer and signal the supervisor.
(catch Exception e
(log/error "Error during invocation" invocation e args)
(catch Exception e
(log/error "Error during invocation" invocation e args)
;; take a guess that a NPE was triggered by an invalid connection
;; short circuit on errors
(put! callback
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e))
:error))]
(when-not (= res :error)
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])))
(recur))
(do
(close! commit-queue)
(log/debug "Writer thread gracefully closed"))))
(put! callback
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e))
:error))]
(if-not (= res :error)
(do
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])
(recur (:db-after res)))
(recur old))))
(do
(close! commit-queue)
(log/debug "Writer thread gracefully closed")))))
;; commit loop
(go-loop [tx (<?- commit-queue)]
(when tx
(let [txs (atom [tx])]
(go-try S
(loop [tx (<?- commit-queue)]
(when tx
(let [txs (into [tx] (take-while some?) (repeatedly #(poll! commit-queue)))]
;; empty channel of pending transactions
(loop [tx (poll! commit-queue)]
(when tx
(swap! txs conj tx)
(recur (poll! commit-queue))))
(log/trace "Batched transaction count: " (count @txs))
(log/trace "Batched transaction count: " (count txs))
;; commit latest tx to disk
(let [db (:db-after (first (peek @txs)))]
(try
(let [start-ts (get-time-ms)
{{:keys [datahike/commit-id datahike/parents]} :meta
:as commit-db} (<?- (w/commit! db nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(w/add-commit-meta! connection commit-db)
(let [db (:db-after (first (peek txs)))]
(try
(let [start-ts (get-time-ms)
{{:keys [datahike/commit-id]} :meta
:as commit-db} (<?- (w/commit! db nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(reset! connection commit-db)
;; notify all processes that transaction is complete
(doseq [[res callback] @txs]
(let [res (-> res
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback res))))
(catch Exception e
(doseq [[_ callback] @txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error " e)
(close! commit-queue)
(close! transaction-queue)))
(<! (timeout commit-wait-time))
(recur (<?- commit-queue))))))))]))
(doseq [[tx-report callback] txs]
(let [tx-report (-> tx-report
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback tx-report))))
(catch Exception e
(doseq [[_ callback] txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error." e)
(close! commit-queue)
(close! transaction-queue)))
(<! (timeout commit-wait-time))
(recur (<?- commit-queue)))))))))]))

;; public API

Expand Down
40 changes: 14 additions & 26 deletions src/datahike/writing.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[hasch.core :refer [uuid]]
[superv.async :refer [go-try- <?-]]
[clojure.core.async :refer [poll!]]
[konserve.utils :refer [async+sync *default-sync-translation*]]))
[konserve.utils :refer [async+sync *default-sync-translation*]]
[taoensso.timbre :as t]))

;; mapping to storage

Expand Down Expand Up @@ -147,27 +148,14 @@
(<?- branch-op)
db)))))

(defn add-commit-meta! [connection commit-db]
(let [{{:keys [datahike/commit-id datahike/parents]} :meta} commit-db]
(swap! connection #(-> %
(assoc-in [:meta :datahike/parents] parents)
(assoc-in [:meta :datahike/commit-id] commit-id)))))

(defn update-connection! [connection tx-data tx-meta update-fn]
(let [ret-atom (atom nil)]
(swap! connection
(fn [old]
(let [{:keys [writer]} old
{:keys [db-after]
{:keys [db/txInstant]}
:tx-meta
:as tx-report} (update-fn old tx-data tx-meta)
new-meta (assoc (:meta db-after) :datahike/updated-at txInstant)
db (assoc db-after :meta new-meta :writer writer)
tx-report (assoc tx-report :db-after db)]
(reset! ret-atom tx-report)
db)))
@ret-atom))
(defn complete-db-update [old tx-report]
(let [{:keys [writer]} old
{:keys [db-after]
{:keys [db/txInstant]} :tx-meta} tx-report
new-meta (assoc (:meta db-after) :datahike/updated-at txInstant)
db (assoc db-after :meta new-meta :writer writer)
tx-report (assoc tx-report :db-after db)]
tx-report))

(defprotocol PDatabaseManager
(-create-database [config opts])
Expand Down Expand Up @@ -274,11 +262,11 @@
;; TODO log deprecation notice with #54
(-database-exists? config)))

(defn transact! [connection {:keys [tx-data tx-meta]}]
(defn transact! [old {:keys [tx-data tx-meta]}]
(log/debug "Transacting" (count tx-data) " objects with meta: " tx-meta)
(log/trace "Transaction data" tx-data)
(update-connection! connection tx-data tx-meta #(core/with %1 %2 %3)))
(complete-db-update old (core/with old tx-data tx-meta)))

(defn load-entities [connection entities]
(defn load-entities [old entities]
(log/debug "Loading" (count entities) " entities.")
(update-connection! connection entities nil #(core/load-entities-with %1 %2 %3)))
(complete-db-update old (core/load-entities-with old entities nil)))
2 changes: 1 addition & 1 deletion test/datahike/test/api_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
(def date (java.util.Date.))

(Thread/sleep 100)

(d/transact conn {:tx-data [{:db/id [:name "Alice"] :age 35}]})

(is (= #{["Alice" 25] ["Bob" 30]}
Expand Down