Skip to content

Commit

Permalink
Ensure History Changes only if Resource Changes
Browse files Browse the repository at this point in the history
This change introduces a new transaction command called "keep" that just
means: "Keep the resource of it is not changed in between." Unlike
"put", "keep" will not introduce a new history entry. The command "keep"
is always conditional because the detection of no changes will be done
outside of the transaction.

Closes: #777
  • Loading branch information
alexanderkiel committed Jun 29, 2023
1 parent d5e17ff commit ceac9d2
Show file tree
Hide file tree
Showing 73 changed files with 2,077 additions and 679 deletions.
49 changes: 49 additions & 0 deletions .github/scripts/patient-identical-update.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash -e

#
# This script tests that an update without changes of the resource content
# doesn't create a new history entry.
#

SCRIPT_DIR="$(dirname "$(readlink -f "$0")")"
. "$SCRIPT_DIR/util.sh"

bundle() {
cat <<END
{
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"resource": $1,
"request": {
"method": "PUT",
"url": "Patient/$2"
}
}
]
}
END
}

BASE="http://localhost:8080/fhir"
PATIENT_IDENTIFIER="X79746011X"
PATIENT=$(curl -sH "Accept: application/fhir+json" "$BASE/Patient?identifier=$PATIENT_IDENTIFIER" | jq -cM '.entry[0].resource')
ID="$(echo "$PATIENT" | jq -r .id)"
VERSION_ID="$(echo "$PATIENT" | jq -r .meta.versionId)"

# Update Interaction
RESULT=$(curl -sXPUT -H "Content-Type: application/fhir+json" -d "$PATIENT" "$BASE/Patient/$ID")
RESULT_VERSION_ID="$(echo "$RESULT" | jq -r .meta.versionId)"

test "update versionId" "$RESULT_VERSION_ID" "$VERSION_ID"

# Transaction Interaction
RESULT=$(curl -sH "Content-Type: application/fhir+json" -H "Prefer: return=representation" -d "$(bundle "$PATIENT" "$ID")" "$BASE")
RESULT_VERSION_ID="$(echo "$RESULT" | jq -r '.entry[0].resource.meta.versionId')"

test "transaction versionId" "$RESULT_VERSION_ID" "$VERSION_ID"

HISTORY_TOTAL=$(curl -sH "Accept: application/fhir+json" "$BASE/Patient/$ID/_history" | jq -r '.total')

test "history total" "$HISTORY_TOTAL" "1"
7 changes: 7 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
- kv
- luid
- metrics
- module-base
- openid-auth
- operation-graphql
- operation-measure-evaluate-measure
Expand Down Expand Up @@ -620,6 +621,9 @@ jobs:
- name: Patient Everything
run: .github/scripts/patient-everything.sh

- name: Patient Identical Update
run: .github/scripts/patient-identical-update.sh

not-enforcing-referential-integrity-test:
needs: build
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -1519,6 +1523,9 @@ jobs:
- name: Patient Everything
run: .github/scripts/patient-everything.sh

- name: Patient Identical Update
run: .github/scripts/patient-identical-update.sh

- name: Docker Stats
run: docker stats --no-stream

Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ target
classes
.cpcache
.cache
blaze-load-tests/node_modules
.nrepl-port
6 changes: 6 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Blaze exposes a [FHIR RESTful API][1] under the default context path of `/fhir`. The [CapabilityStatement][2] exposed under `/fhir/metadata` can be used to discover the capabilities of Blaze. Everything stated there can be considered to be implemented correctly. If not please [file an issue][3].

## Interactions

### Update

Blaze keeps track over the history of all updates of each resource. However if the content of the resource update is equal to the current version of the resource, no new history entry is created. Usually such identical content updates will only cost a very small amount of transaction handling storage but no additional resource or index storage.

## Operations

The following Operations are implemented:
Expand Down
46 changes: 41 additions & 5 deletions docs/implementation/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ After concatenation, the strings are hashed with the [Murmur3][7] algorithm in i
For this example, we don't use the hashed versions of the key parts except for the content-hash.

| Key (search-param, type, value, id, content-hash) |
|---|
| gender, Patient, female, 1, 6744ed32 |
| gender, Patient, female, 2, b7e3e5f8 |
| gender, Patient, male, 0, ba9c9b24 |
|---------------------------------------------------|
| gender, Patient, female, 1, 6744ed32 |
| gender, Patient, female, 2, b7e3e5f8 |
| gender, Patient, male, 0, ba9c9b24 |

In case one searches for female patients, Blaze will seek into the index with the key prefix (gender, Patient, female) and scan over it while the prefix stays the same. The result will be the `[id, hash]` tuples:
* `[1, 6744ed32]` and
Expand Down Expand Up @@ -213,7 +213,43 @@ That tuples are further processed against the `ResourceAsOf` index in order to c
* this node submits the transaction commands to the central transaction log
* all nodes (inkl. the transaction submitter) receive the transaction commands from the central transaction log

**TODO: continue...**
### Transaction Commands

### Create

**TODO**

### Put

The `put` command is used to create or update a resource.

#### Properties

| Name | Required | Data Type | Description |
|---------------|----------|---------------|---------------------------------------------|
| type | yes | string | resource type |
| id | yes | string | resource id |
| hash | yes | string | resource content hash |
| refs | no | list | references to other resources |
| if-match | no | number | the t the resource to update has to match |
| if-none-match | no | "*" or number | the t the resource to update must not match |

### Keep

The `keep` command can be used instead of a `put` command if it's likely that the update of the resource will result in no changes. In that sense, the `keep` command is an optimization of the `put` command that has to be retried if it fails.

#### Properties

| Name | Required | Data Type | Description |
|----------|----------|-----------|---------------------------------------------------------------|
| type | yes | string | resource type |
| id | yes | string | resource id |
| hash | yes | string | the resource content hash the resource to update has to match |
| if-match | no | number | the t the resource to update has to match |

### Delete

**TODO**

[1]: <https://www.datomic.com>
[2]: <https://xtdb.com>
Expand Down
22 changes: 22 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ It's recommended to use [Prometheus][1] and [Grafana][2] to monitor the runtime

![](monitoring/prometheus.png)

## Prometheus Config

A basic Prometheus config looks like this:

```yml
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['<server-ip-addr>:9100']
- labels:
instance: 'blaze'

- job_name: 'blaze'
static_configs:
- targets: ['<server-ip-addr>:8081']
- labels:
instance: 'blaze'
```
## Import the Blaze Dashboard
In order to import the Blaze dashboard into your Grafana instance, please copy the contents of [blaze.json](monitoring/blaze.json) and pate it into the import dialog on the Import dashboard site:
Expand Down
11 changes: 9 additions & 2 deletions modules/anomaly/src/blaze/anomaly.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
(identical? ::anom/not-found (::anom/category x)))


(defn conflict? [x]
(identical? ::anom/conflict (::anom/category x)))


(defn fault? [x]
(identical? ::anom/fault (::anom/category x)))

Expand Down Expand Up @@ -70,8 +74,11 @@
(anomaly* ::anom/fault msg kvs)))


(defn busy [msg & {:as kvs}]
(anomaly* ::anom/busy msg kvs))
(defn busy
([]
(busy nil))
([msg & {:as kvs}]
(anomaly* ::anom/busy msg kvs)))


(defn- format-exception
Expand Down
7 changes: 6 additions & 1 deletion modules/anomaly/src/blaze/anomaly_spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
:ret boolean?)


(s/fdef ba/conflict?
:args (s/cat :x any?)
:ret boolean?)


(s/fdef ba/fault?
:args (s/cat :x any?)
:ret boolean?)
Expand Down Expand Up @@ -67,7 +72,7 @@


(s/fdef ba/busy
:args (s/cat :msg (s/nilable string?) :kvs (s/* (s/cat :k keyword? :v any?)))
:args (s/cat :msg (s/? (s/nilable string?)) :kvs (s/* (s/cat :k keyword? :v any?)))
:ret ::anom/anomaly)


Expand Down
15 changes: 14 additions & 1 deletion modules/anomaly/test/blaze/anomaly_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@
(is (not (ba/anomaly? nil)))))


(deftest conflict?-test
(testing "a conflict anomaly has to have the right category"
(is (ba/conflict? {::anom/category ::anom/conflict})))

(testing "anomalies with other categories are no conflict anomalies"
(is (not (ba/conflict? {::anom/category ::anom/fault}))))

(testing "nil is no conflict anomaly"
(is (not (ba/anomaly? nil)))))


(deftest fault?-test
(testing "a fault anomaly has to have the right category"
(is (ba/fault? {::anom/category ::anom/fault})))
Expand Down Expand Up @@ -152,7 +163,6 @@
(testing "without message"
(is (= (ba/fault) {::anom/category ::anom/fault})))


(testing "with nil message"
(is (= (ba/fault nil) {::anom/category ::anom/fault})))

Expand All @@ -169,6 +179,9 @@


(deftest busy-test
(testing "without message"
(is (= (ba/busy) {::anom/category ::anom/busy})))

(testing "with nil message"
(is (= (ba/busy nil) {::anom/category ::anom/busy})))

Expand Down
92 changes: 80 additions & 12 deletions modules/async/src/blaze/async/comp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
(.complete ^CompletableFuture future x))


(defn- ->Supplier [f]
(reify Supplier
(get [_]
(ba/throw-when (f)))))


(defn complete-async!
"Completes `future` with the result of `f` invoked with no arguments from an
asynchronous task using the default executor."
[future f]
(.completeAsync ^CompletableFuture future (->Supplier f)))


(defn or-timeout!
"Exceptionally completes `future` with a TimeoutException if not otherwise
completed before `timeout` in `unit`.
Expand Down Expand Up @@ -135,12 +148,6 @@
(.cancel ^CompletableFuture future false))


(defn- ->Supplier [f]
(reify Supplier
(get [_]
(ba/throw-when (f)))))


(defn supply-async
"Returns a CompletableFuture that is asynchronously completed by a task
running in `executor` with the value obtained by calling the function `f`
Expand Down Expand Up @@ -236,6 +243,12 @@
(-completion-cause [e] e))


(defn- ->BiFunction [f]
(reify BiFunction
(apply [_ x e]
(ba/throw-when (f x (some-> e -completion-cause ba/anomaly))))))


(defn handle
"Returns a CompletionStage that, when `stage` completes either normally or
exceptionally, is executed with `stage`'s result and exception as arguments to
Expand All @@ -247,9 +260,22 @@
[stage f]
(.handle
^CompletionStage stage
(reify BiFunction
(apply [_ x e]
(ba/throw-when (f x (some-> e -completion-cause ba/anomaly)))))))
(->BiFunction f)))


(defn handle-async
"Returns a CompletionStage that, when `stage` completes either normally or
exceptionally, is executed using `stage`'s default asynchronous execution
facility, with `stage`'s result and exception as arguments to the function
`f`.
When `stage` is complete, the function `f` is invoked with the result (or nil
if none) and the anomaly (or nil if none) of `stage` as arguments, and the
`f`'s result is used to complete the returned stage."
[stage f]
(.handleAsync
^CompletionStage stage
(->BiFunction f)))


(defn exceptionally
Expand All @@ -275,6 +301,24 @@
(then-compose identity)))


(defn exceptionally-compose-async
"Returns a CompletionStage that, when `stage` completes exceptionally, is
composed using the results of the function `f` applied to `stage`'s anomaly,
using `stage`'s default asynchronous execution facility."
[stage f]
(-> stage
(handle
(fn [_ e]
(if (nil? e)
stage
(-> stage
(handle-async
(fn [_ e]
(f e)))
(then-compose identity)))))
(then-compose identity)))


(defn when-complete
"Returns a CompletionStage with the same result or exception as `stage`, that
executes the given action when `stage` completes.
Expand Down Expand Up @@ -340,7 +384,31 @@


(defn retry
"Please be aware that `num-retries` shouldn't be higher than the max stack
"Returns a CompletionStage that, when the CompletionStage as result of calling
the function`f` with no arguments completes normally will complete with its
result.
Otherwise retires by calling `f` again with no arguments. Wait's between
retries starting with 100 ms growing exponentially.
Please be aware that `num-retries` shouldn't be higher than the max stack
depth. Otherwise, the CompletionStage would fail with a StackOverflowException."
[future-fn num-retries]
(retry* future-fn num-retries 0))
[f num-retries]
(retry* f num-retries 0))


(defn retry2
"Returns a CompletionStage that, when the CompletionStage as result of calling
the function`f` with no arguments completes normally will complete with its
result.
Otherwise retires by calling `f` again with no arguments if calling the
function `retry?` with the anomaly returned by the CompletionStage returns
true."
[f retry?]
(-> (f)
(exceptionally-compose-async
(fn [e]
(if (retry? e)
(retry2 f retry?)
(completed-future e))))))
Loading

0 comments on commit ceac9d2

Please sign in to comment.