Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL-195 Async Reaction Processing #316

Merged
merged 8 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
org.clojure/tools.logging {:mvn/version "1.1.0"}
org.clojure/core.memoize {:mvn/version "1.0.250"}
clojure-interop/java.security {:mvn/version "1.0.5"}
org.clojure/core.async {:mvn/version "1.6.681"}
;; Util deps

camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.2"}
Expand Down
2 changes: 2 additions & 0 deletions doc/env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ The following options are used for advanced database performance tuning and may
| `LRSQL_OIDC_SCOPE_PREFIX` | `oidcScopePrefix` | An optional prefix prepended to OIDC scope. For example, setting this to `lrs:` would change the expected `all` scope to `lrs:all` | `""` |
| `LRSQL_STMT_RETRY_LIMIT` | `stmtRetryLimit` | The number of times to retry a statement post transaction before failing. | `10` |
| `LRSQL_STMT_RETRY_BUDGET` | `stmtRetryBudget` | The max amount of time allowed for statement POST transaction retries before failing (ms). | `1000` |
| `LRSQL_ENABLE_REACTIONS` | `enableReactions` | Whether or not to enable statement reactions. | `false` |
| `LRSQL_REACTION_BUFFER_SIZE` | `reactionBufferSize` | Number of pending reactions to allow. Additional reactions will be dropped with a warning message. | `10000` |

_NOTE:_ `LRSQL_STMT_RETRY_LIMIT` and `LRSQL_STMT_RETRY_BUDGET` are used to mitigate a rare scenario where specific Actors or Activities are updated many times in large concurrent batches. In this situation the DBMS can encounter locking and these settings are used to allow retries that eventually write all the conflicting transactions, but may incur performance degradation. If you are experiencing this situation the first step would be to look at why your data needs to rewrite specific Actors or Activities rapidly with different values, which could potentially solve it at the source. If the issue cannot be avoided by data design alone, another possible solution is reducing batch sizes to decrease or eliminate locks. As a last resort, increasing these settings will at least ensure the statements get written but as mentioned may incur a slowdown in concurrent throughput.

Expand Down
4 changes: 3 additions & 1 deletion resources/lrsql/config/prod/default/lrs.edn
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
:oidc-authority-template #or [#env LRSQL_OIDC_AUTHORITY_TEMPLATE "config/oidc_authority.json.template"]
:oidc-scope-prefix #or [#env LRSQL_OIDC_SCOPE_PREFIX ""]
:stmt-retry-limit #or [#env LRSQL_STMT_RETRY_LIMIT 10]
:stmt-retry-budget #or [#env LRSQL_STMT_RETRY_BUDGET 1000]}
:stmt-retry-budget #or [#env LRSQL_STMT_RETRY_BUDGET 1000]
:enable-reactions #boolean #or [#env LRSQL_ENABLE_REACTIONS false]
:reaction-buffer-size #long #or [#env LRSQL_REACTION_BUFFER_SIZE 10000]}
4 changes: 3 additions & 1 deletion resources/lrsql/config/test/default/lrs.edn
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
:oidc-authority-template "config/oidc_authority.json.template"
:oidc-scope-prefix ""
:stmt-retry-limit 20
:stmt-retry-budget 10000}
:stmt-retry-budget 10000
:enable-reactions true
:reaction-buffer-size 10000}
8 changes: 6 additions & 2 deletions src/db/postgres/lrsql/postgres/main.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns lrsql.postgres.main
(:require [com.stuartsierra.component :as component]
[lrsql.system :as system]
[lrsql.system.util :as su]
[lrsql.postgres.record :as pr])
(:gen-class))

Expand All @@ -10,8 +11,11 @@
"Run a Postgres-backed LRSQL instance based on the `:test-postgres`
config profile. For use with `clojure -X:db-postgres`."
[_] ; Need to pass in a map for -X
(component/start (system/system postgres-backend :test-postgres)))
(-> (system/system postgres-backend :test-postgres)
component/start
su/add-shutdown-hook!))

(defn -main [& _args]
(-> (system/system postgres-backend :prod-postgres)
component/start))
component/start
su/add-shutdown-hook!))
8 changes: 6 additions & 2 deletions src/db/sqlite/lrsql/sqlite/main.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns lrsql.sqlite.main
(:require [com.stuartsierra.component :as component]
[lrsql.system :as system]
[lrsql.system.util :as su]
[lrsql.sqlite.record :as sr])
(:gen-class))

Expand All @@ -14,7 +15,9 @@
override-profile]}]
(let [profile (or override-profile
(if ephemeral? :test-sqlite-mem :test-sqlite))]
(component/start (system/system sqlite-backend profile))))
(-> (system/system sqlite-backend profile)
component/start
su/add-shutdown-hook!)))

(defn -main
"Main entrypoint for SQLite-backed LRSQL instances. Passing `--ephemeral true`
Expand All @@ -25,4 +28,5 @@
ephemeral? (Boolean/parseBoolean ?per-str)
profile (if ephemeral? :prod-sqlite-mem :prod-sqlite)]
(-> (system/system sqlite-backend profile)
component/start)))
component/start
su/add-shutdown-hook!)))
90 changes: 90 additions & 0 deletions src/main/lrsql/init/reaction.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
(ns lrsql.init.reaction
"Reaction initialization functions."
(:require [clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[lrsql.reaction.protocol :as rp]
[lrsql.util :as u]
[lrsql.spec.config :as config-spec]
[lrsql.spec.common :as common-spec]
[lrsql.spec.reaction :as rs]
[xapi-schema.spec :as xs]
[clojure.string :as cs]))

(s/fdef reaction-channel
:args (s/cat :config ::config-spec/lrs)
:ret (s/nilable ::common-spec/channel))

(defn reaction-channel
"Based on config, return a channel to receive reactions or nil if reactions
are disabled"
[{enable-reactions :enable-reactions
reaction-buffer-size :reaction-buffer-size}]
(when enable-reactions
(a/chan reaction-buffer-size)))

(s/fdef offer-trigger!
:args (s/cat :?reaction-channel (s/nilable ::common-spec/channel)
:trigger-id ::xs/uuid)
:ret nil?)

(defn offer-trigger!
"Given a (possibly nil) reaction channel and a string statement ID, submit the
ID to the channel as a UUID if it exists, or do nothing if it is nil.
Log if the channel exists but the ID cannot be submitted."
[?reaction-channel trigger-id]
(when ?reaction-channel
(when-not (a/offer! ?reaction-channel (u/str->uuid trigger-id))
cliffcaseyyet marked this conversation as resolved.
Show resolved Hide resolved
(log/warnf "Reaction channel full, dropping statement ID: %s"
trigger-id))))

(s/fdef reaction-executor
:args (s/cat :?reaction-channel (s/nilable ::common-spec/channel)
:reactor rs/reactor?)
:ret (s/nilable ::common-spec/channel))

(defn reaction-executor
"Given a (possibly nil) reaction channel and a reactor implementation, process
reactions in a thread pool. If the channel is nil, returns nil."
[?reaction-channel reactor]
(when ?reaction-channel
(log/info "Starting reaction processor...")
(let [reaction-executor
(a/go-loop []
(log/debug "Listening for reaction trigger...")
(if-let [trigger-id (a/<! ?reaction-channel)]
(let [_ (log/debugf "Reacting to statement ID: %s"
trigger-id)
{:keys [statement-ids]}
(a/<!
(a/thread
(rp/-react-to-statement reactor trigger-id)))]
(log/debugf "Created reaction to %s, statement IDs: %s"
trigger-id
(cs/join ", " statement-ids))
(recur))
(do
(log/debugf "Reaction channel shutdown")
::shutdown)))]
(log/info "Reaction processor started.")
reaction-executor)))

(s/fdef shutdown-reactions!
:args (s/cat :?reaction-channel (s/nilable ::common-spec/channel)
:?reaction-executor (s/nilable ::common-spec/channel))
:ret nil?)

(defn shutdown-reactions!
"Given a (possibly nil) reaction channel and reaction executor channel,
gracefully shut them down."
[?reaction-channel
?reaction-executor]
(when (and ?reaction-channel ?reaction-executor)
(log/info "Stopping reaction processor...")
(log/debug "Closing reaction channel...")
(a/close! ?reaction-channel)
(log/debug "Draining reaction buffer...")
cliffcaseyyet marked this conversation as resolved.
Show resolved Hide resolved
;; Block until all reactions are processed
(a/<!! ?reaction-executor)
(log/debug "Reaction executor shut down.")
(log/info "Reaction processor stopped.")))
6 changes: 5 additions & 1 deletion src/main/lrsql/spec/common.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns lrsql.spec.common
(:require [clojure.spec.alpha :as s]
[clojure.spec.gen.alpha :as sgen]
[next.jdbc.protocols :as jp])
[next.jdbc.protocols :as jp]
[clojure.core.async.impl.protocols :as ap])
(:import [java.time Instant]))

;; UUIDs
Expand Down Expand Up @@ -35,3 +36,6 @@
(sgen/fmap
#(Instant/ofEpochSecond %)
(sgen/large-integer* {:min 0})))))

;; Core.async channels
(s/def ::channel #(satisfies? ap/Channel %))
7 changes: 6 additions & 1 deletion src/main/lrsql/spec/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@
(s/def ::oidc-authority-template string?)
(s/def ::oidc-scope-prefix string?)

(s/def ::enable-reactions boolean?)
(s/def ::reaction-buffer-size pos-int?)

(s/def ::lrs
(s/and (s/conformer u/remove-nil-vals)
(s/conformer u/remove-neg-vals)
Expand All @@ -130,7 +133,9 @@
::authority-template
::authority-url
::oidc-authority-template
::oidc-scope-prefix]
::oidc-scope-prefix
::enable-reactions
::reaction-buffer-size]
:opt-un [::admin-user-default
::admin-pass-default
::api-key-default
Expand Down
5 changes: 5 additions & 0 deletions src/main/lrsql/spec/reaction.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [clojure.spec.alpha :as s]
[xapi-schema.spec :as xs]
[lrsql.backend.protocol :as bp]
[lrsql.reaction.protocol :as rp]
[lrsql.spec.common :as c]))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand All @@ -12,6 +13,10 @@
[bk]
(satisfies? bp/ReactionBackend bk))

(defn reactor?
[x]
(satisfies? rp/StatementReactor x))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Axioms
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down
10 changes: 7 additions & 3 deletions src/main/lrsql/system.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[lrsql.system.tuning :as tuning]
[lrsql.system.lrs :as lrs]
[lrsql.system.webserver :as webserver]
[lrsql.system.reactor :as reactor]
[lrsql.init.config :refer [read-config]]))

(defn system
Expand Down Expand Up @@ -32,13 +33,16 @@
:lrs (component/using
(lrs/map->LearningRecordStore {})
[:connection :backend])
:reactor (component/using
(reactor/map->Reactor {})
[:backend :lrs])
:webserver (component/using
(webserver/map->Webserver {})
[:lrs]))
[:lrs :reactor]))
assoc-config
(fn [m config-m] (assoc m :config config-m))]
;; This code can be confusing. What is happening is that the above creates
;; a system map with empty maps and then based on the key of the system,
;; This code can be confusing. What is happening is that the above creates
;; a system map with empty maps and then based on the key of the system,
;; populates the corresponding config (by key) from the overall aero config
(-> (merge-with assoc-config initial-sys config)
(component/system-using {}))))
56 changes: 16 additions & 40 deletions src/main/lrsql/system/lrs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[lrsql.admin.protocol :as adp]
[lrsql.init :as init]
[lrsql.init.oidc :as oidc-init]
[lrsql.init.reaction :as react-init]
[lrsql.backend.protocol :as bp]
[lrsql.input.actor :as agent-input]
[lrsql.input.activity :as activity-input]
Expand All @@ -28,7 +29,6 @@
[lrsql.ops.query.document :as doc-q]
[lrsql.ops.query.reaction :as react-q]
[lrsql.ops.query.statement :as stmt-q]
[lrsql.reaction.protocol :as rp]
[lrsql.spec.config :as cs]
[lrsql.util.auth :as auth-util]
[lrsql.util.oidc :as oidc-util]
Expand All @@ -49,7 +49,8 @@
backend
config
authority-fn
oidc-authority-fn]
oidc-authority-fn
reaction-channel]
cmp/Lifecycle
(start
[lrs]
Expand Down Expand Up @@ -78,11 +79,16 @@
(assoc lrs
:connection connection
:authority-fn auth-fn
:oidc-authority-fn oidc-auth-fn))))
:oidc-authority-fn oidc-auth-fn
:reaction-channel (react-init/reaction-channel config)))))
(stop
[lrs]
(log/info "Stopping LRS...")
(assoc lrs :connection nil :authority-fn nil))
(assoc lrs
:connection nil
:authority-fn nil
:oidc-authority-fn nil
:reaction-channel nil))

lrsp/AboutResource
(-get-about
Expand Down Expand Up @@ -136,8 +142,11 @@
stmt-result)
;; Non-error result - continue
(if-some [stmt-id (:statement-id stmt-result)]
(recur (rest stmt-ins)
(update stmt-res :statement-ids conj stmt-id))
(do
;; Submit statement for reaction if enabled
(react-init/offer-trigger! reaction-channel stmt-id)
(recur (rest stmt-ins)
cliffcaseyyet marked this conversation as resolved.
Show resolved Hide resolved
(update stmt-res :statement-ids conj stmt-id)))
(recur (rest stmt-ins)
stmt-res))))
;; No more statement inputs - return
Expand Down Expand Up @@ -376,37 +385,4 @@
(let [conn (lrs-conn this)
input (react-input/delete-reaction-input reaction-id)]
(jdbc/with-transaction [tx conn]
(react-cmd/delete-reaction! backend tx input))))
rp/StatementReactor
(-react-to-statement [this statement-id]
(let [conn (lrs-conn this)
statement-results
(jdbc/with-transaction [tx conn]
(reduce
(fn [acc {:keys [reaction-id
error]
:as result}]
(if error
(let [input (react-input/error-reaction-input
reaction-id error)]
(react-cmd/error-reaction! backend tx input)
acc)
(conj acc (select-keys result [:statement :authority]))))
[]
(:result
(react-q/query-statement-reactions
backend tx {:trigger-id statement-id}))))]
;; Submit statements one at a time with varying authority
{:statement-ids
(reduce
(fn [acc {:keys [statement authority]}]
(into acc
(:statement-ids
(lrsp/-store-statements
this
{:agent authority
:scopes #{:scope/statements.write}}
[statement]
[]))))
[]
statement-results)})))
(react-cmd/delete-reaction! backend tx input)))))
Loading