From de98739fb0dcbec5d1b23647b29ca578c744ca67 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Tue, 24 Sep 2019 22:18:47 -0700 Subject: [PATCH] Modify Simple Ledger save balances after rekey --- examples/simple-ledger/dev/user.clj | 67 +++++---- examples/simple-ledger/src/simple_ledger.clj | 144 ++++++++++++------- examples/word-count/dev/user.clj | 7 +- examples/xf-word-count/dev/user.clj | 6 +- 4 files changed, 143 insertions(+), 81 deletions(-) diff --git a/examples/simple-ledger/dev/user.clj b/examples/simple-ledger/dev/user.clj index 38ecd7e7..cc2d6a3d 100644 --- a/examples/simple-ledger/dev/user.clj +++ b/examples/simple-ledger/dev/user.clj @@ -7,6 +7,7 @@ which libs are actually required." (:gen-class) (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] [integrant.core :as ig] [integrant.repl :refer [clear go halt prep init reset reset-all]] [jackdaw.admin :as ja] @@ -14,39 +15,60 @@ [jackdaw.repl :refer :all] [jackdaw.streams :as j] [jackdaw.streams.xform :as jxf] - simple-ledger) - (:import org.apache.kafka.streams.kstream.ValueTransformer - [org.apache.kafka.streams.state KeyValueStore Stores] - org.apache.kafka.streams.StreamsBuilder)) + [simple-ledger :as sl])) (def repl-config "The development config. When the 'dev' alias is active, this config will be used." - {:topology {:topology-builder simple-ledger/topology-builder - :xform simple-ledger/xf - :swap-fn jxf/kv-store-swap-fn} + {:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"]) + :topic-metadata {:entry-added + {:topic-name "entry-requested" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-pending + {:topic-name "transaction-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} - :topics {:streams-config simple-ledger/streams-config - :client-config (select-keys simple-ledger/streams-config - ["bootstrap.servers"]) - :topology (ig/ref :topology)} + :transaction-added + {:topic-name "transaction-added" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}} - :app {:streams-config simple-ledger/streams-config + :topology {:topology-builder sl/topology-builder + :xforms [#'sl/xf-split-entries #'sl/xf-running-balances] + :swap-fn jxf/kv-store-swap-fn} + + :app {:streams-config sl/streams-config :topology (ig/ref :topology) :topics (ig/ref :topics)}}) -(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] - (let [streams-builder (j/streams-builder)] - ((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata)) -(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology] - :as opts}] - (let [topic-metadata (topology->topic-metadata topology streams-config)] - (with-open [client (ja/->AdminClient client-config)] - (ja/create-topics! client (vals topic-metadata))) - (assoc opts :topic-metadata topic-metadata))) +(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}] + (let [xform-map (reduce-kv (fn [m k v] + (let [k (keyword (str (:ns (meta v))) + (str (:name (meta v))))] + (assoc m k #(v % jxf/kv-store-swap-fn)))) + {} + xforms) + streams-builder (j/streams-builder)] + ((topology-builder topic-metadata xform-map) streams-builder))) (defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] (let [streams-app (j/kafka-streams topology streams-config)] @@ -66,6 +88,3 @@ (destroy-state-stores streams-config) (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] (re-delete-topics (:client-config topics) re))) - - -(integrant.repl/set-prep! (constantly repl-config)) diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index d5cc452e..b3679e91 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -5,20 +5,26 @@ [clojure.tools.logging :refer [info]] [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.streams.xform :as jxf]) - (:import org.apache.kafka.streams.kstream.ValueTransformer - [org.apache.kafka.streams.state KeyValueStore Stores] - org.apache.kafka.streams.StreamsBuilder)) + [jackdaw.streams.xform :as jxf])) +(defn xf-split-entries + [_ _] + (map (fn [[k {:keys [debit-account credit-account amount] :as entry}]] + [[debit-account + {:account-name debit-account + :debit-credit-indicator :dr + :amount amount}] + [credit-account + {:account-name credit-account + :debit-credit-indicator :cr + :amount amount}]]))) + (defn next-balances - [starting-balances {:keys [debit-account - credit-account - amount] - :as entry}] - (-> starting-balances - (update debit-account (fnil - 0) amount) - (update credit-account (fnil + 0) amount))) + [starting-balances {:keys [account-name debit-credit-indicator amount] + :as transaction}] + (let [op (if (= :dr debit-credit-indicator) - +)] + (update starting-balances account-name (fnil op 0) amount))) (defn xf-running-balances [state swap-fn] @@ -27,30 +33,20 @@ ([] (rf)) ([result] (rf result)) ([result input] - (let [{:keys [debit-account credit-account] :as entry} input - next (as-> input % + (let [[k v] input + {:keys [account-name debit-credit-indicator amount] :as entry} v + next (as-> entry % (swap-fn state next-balances %) - (select-keys % [debit-account credit-account]) - (hash-map :entry entry :current-balances %))] + (select-keys % [account-name]) + ((juxt (comp first keys) (comp first vals)) %) + (zipmap [:account-name :current-balance] %) + (assoc % :starting-balance (if (= :dr debit-credit-indicator) + (+ amount (:current-balance %)) + (- amount (:current-balance %)))) + (vector k %) + (vector %))] (rf result next)))))) -(defn balances->transactions - [{{:keys [debit-account amount]} :entry - current-balances :current-balances}] - (map (fn [[k v]] - [k {:account-name k - :starting-balance (if (= debit-account k) - (+ amount v) - (- amount v)) - :current-balance v}]) - current-balances)) - -(defn xf - [state swap-fn] - (comp - (xf-running-balances state swap-fn) - (map balances->transactions))) - (comment ;; Use this comment block to explore the Simple Ledger using Clojure @@ -58,7 +54,7 @@ ;; Launch a Clojure REPL: ;; ``` - ;; cd /examples/xf-word-count + ;; cd /examples/simple-ledger ;; clj -A:dev ;; ``` @@ -67,15 +63,17 @@ ;; Evaluate the form: (def coll - [{:debit-account "tech" - :credit-account "cash" - :amount 1000} - {:debit-account "cash" - :credit-account "sales" - :amount 2000}]) + [[nil {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + [nil {:debit-account "cash" + :credit-account "sales" + :amount 2000}]]) ;; Let's record the entries. Evaluate the form: - (transduce (xf (atom {}) swap!) concat coll) + (->> coll + (transduce (xf-split-entries nil nil) concat) + (transduce (xf-running-balances (atom {}) swap!) concat)) ;; You should see output like the following: @@ -102,7 +100,10 @@ ;; KeyValueStore interface with overrides for get and put." ;; Evaluate the form: - (transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) + (->> coll + (transduce (xf-split-entries nil nil) concat) + (transduce (xf-running-balances (jxf/fake-kv-store {}) + jxf/kv-store-swap-fn) concat)) ;; You should see the same output. ) @@ -114,11 +115,13 @@ "cache.max.bytes.buffering" "0"}) (defn topology-builder - [{:keys [entry-requested transaction-added] :as topics} xf] + [{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms] (fn [builder] (jxf/add-state-store! builder) (-> (j/kstream builder entry-requested) - (jxf/transduce-kstream xf) + (jxf/transduce-kstream (::xf-split-entries xforms)) + (j/through transaction-pending) + (jxf/transduce-kstream (::xf-running-balances xforms)) (j/to transaction-added)) builder)) @@ -128,7 +131,7 @@ ;; processing application. ;; For more details on dynamic development, see the comment block in - ;; /examples/simple-ledger/src/simple_ledger.clj + ;; /examples/word-count/src/word_count.clj ;; Start ZooKeeper and Kafka: ;; ``` @@ -160,9 +163,14 @@ ;; Evaluate the form: (get-keyvals (:transaction-added topic-metadata)) - ;; You should see output like the following: + ;; You should see output like the following. Notice transaction + ;; order is not preserved: - ;; (["tech" + ;; (["sales" + ;; {:account-name "sales" + ;; :starting-balance 0 + ;; :current-balance 2000}] + ;; ["tech" ;; {:account-name "tech" ;; :starting-balance 0 ;; :current-balance -1000}] @@ -172,10 +180,46 @@ ;; :current-balance 1000}] ;; ["cash" ;; {:account-name "cash" - ;; :starting-balance 1000, - ;; :current-balance -1000}] - ;; ["sales" + ;; :starting-balance 1000 + ;; :current-balance -1000}]) + + + ;; The `transaction-added` topic has 15 partitions. Let's see how + ;; the records distributed. Evaluate the form: + (->> (get-records (:transaction-added topic-metadata)) + (map (fn [x] + (select-keys x [:key :offset :partition :value])))) + + ;; You should see output like the following. The balances are spread + ;; across partitions 0, 11, and 14. Transaction order is preserved + ;; only for each account. There is no global order. + + ;; ({:key "sales" + ;; :offset 0 + ;; :partition 0 + ;; :value ;; {:account-name "sales" ;; :starting-balance 0 - ;; :current-balance 2000}]) + ;; :current-balance 2000}} + ;; {:key "tech" + ;; :offset 0 + ;; :partition 11 + ;; :value + ;; {:account-name "tech" + ;; :starting-balance 0 + ;; :current-balance -1000}} + ;; {:key "cash" + ;; :offset 0 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :starting-balance 0 + ;; :current-balance 1000}} + ;; {:key "cash" + ;; :offset 1 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :starting-balance 1000 + ;; :current-balance -1000}}) ) diff --git a/examples/word-count/dev/user.clj b/examples/word-count/dev/user.clj index 309a58b8..25722d52 100644 --- a/examples/word-count/dev/user.clj +++ b/examples/word-count/dev/user.clj @@ -23,6 +23,9 @@ :topics (ig/ref :topics)}}) +(integrant.repl/set-prep! (constantly repl-config)) + + (defmethod ig/init-key :topology [_ {:keys [topology-builder]}] (let [streams-builder (j/streams-builder)] ((topology-builder topic-metadata) streams-builder))) @@ -49,10 +52,6 @@ (defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] (j/close streams-app) - ;; BUG: Does not delete state on reset! (destroy-state-stores streams-config) (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] (re-delete-topics (:client-config topics) re))) - - -(integrant.repl/set-prep! (constantly repl-config)) diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj index 216217fc..5b59b440 100644 --- a/examples/xf-word-count/dev/user.clj +++ b/examples/xf-word-count/dev/user.clj @@ -18,6 +18,9 @@ [xf-word-count :as xfwc])) +(integrant.repl/set-prep! (constantly repl-config)) + + (def repl-config "The development config. When the 'dev' alias is active, this config will be used." @@ -64,6 +67,3 @@ (destroy-state-stores streams-config) (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] (re-delete-topics (:client-config topics) re))) - - -(integrant.repl/set-prep! (constantly repl-config))