From 4dffa12591a2948167b0de4e6bc686a7463046eb Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Sun, 22 Sep 2019 19:40:55 -0700 Subject: [PATCH 1/8] Adds helper functions for working with transducers and two examples. --- examples/simple-ledger/.gitignore | 1 + examples/simple-ledger/README.md | 3 + examples/simple-ledger/deps.edn | 41 +- examples/simple-ledger/dev/system.clj | 68 -- examples/simple-ledger/dev/user.clj | 90 +++ examples/simple-ledger/resources/logback.xml | 1 + examples/simple-ledger/src/simple_ledger.clj | 601 ++++++------------ .../simple-ledger/test/simple_ledger_test.clj | 64 -- examples/word-count/dev/user.clj | 7 +- examples/xf-word-count/.gitignore | 1 + examples/xf-word-count/README.md | 3 + examples/xf-word-count/deps.edn | 28 + examples/xf-word-count/dev/user.clj | 69 ++ examples/xf-word-count/resources/logback.xml | 1 + examples/xf-word-count/src/xf_word_count.clj | 145 +++++ .../xf-word-count/test/xf_word_count_test.clj | 76 +++ src/jackdaw/streams/xform.clj | 67 ++ 17 files changed, 719 insertions(+), 547 deletions(-) create mode 100644 examples/simple-ledger/.gitignore create mode 100644 examples/simple-ledger/README.md delete mode 100644 examples/simple-ledger/dev/system.clj create mode 100644 examples/simple-ledger/dev/user.clj create mode 120000 examples/simple-ledger/resources/logback.xml delete mode 100644 examples/simple-ledger/test/simple_ledger_test.clj create mode 100644 examples/xf-word-count/.gitignore create mode 100644 examples/xf-word-count/README.md create mode 100644 examples/xf-word-count/deps.edn create mode 100644 examples/xf-word-count/dev/user.clj create mode 120000 examples/xf-word-count/resources/logback.xml create mode 100644 examples/xf-word-count/src/xf_word_count.clj create mode 100644 examples/xf-word-count/test/xf_word_count_test.clj create mode 100644 src/jackdaw/streams/xform.clj diff --git a/examples/simple-ledger/.gitignore b/examples/simple-ledger/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/simple-ledger/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/simple-ledger/README.md b/examples/simple-ledger/README.md new file mode 100644 index 00000000..6984287b --- /dev/null +++ b/examples/simple-ledger/README.md @@ -0,0 +1,3 @@ +# Simple Ledger + +This example creates a simple accounting ledger using transducers. diff --git a/examples/simple-ledger/deps.edn b/examples/simple-ledger/deps.edn index 2fb64673..f71d53ce 100644 --- a/examples/simple-ledger/deps.edn +++ b/examples/simple-ledger/deps.edn @@ -1,19 +1,28 @@ -{:deps - {danlentz/clj-uuid {:mvn/version "0.1.7"} - fundingcircle/jackdaw {:mvn/version "0.6.4"} - org.apache.kafka/kafka-streams {:mvn/version "2.1.0"} - org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"} - org.clojure/clojure {:mvn/version "1.10.0"} - org.clojure/tools.logging {:mvn/version "0.4.1"}} +{:paths + ["src" "resources"] - :mvn/repos - {"confluent" {:url "https://packages.confluent.io/maven/"}} - - :paths - ["src" "test" "dev" "../dev"] + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} :aliases - {:test {:extra-deps {com.cognitect/test-runner - {:git/url "https://github.com/cognitect-labs/test-runner.git" - :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} - :main-opts ["-m" "cognitect.test-runner"]}}} + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/simple-ledger/dev/system.clj b/examples/simple-ledger/dev/system.clj deleted file mode 100644 index 4e6a80bb..00000000 --- a/examples/simple-ledger/dev/system.clj +++ /dev/null @@ -1,68 +0,0 @@ -(ns system - "Functions to start and stop the system, used for interactive - development. - The `system/start` and `system/stop` functions are required by the - `user` namespace and should not be called directly." - (:require [clojure.tools.logging :as log] - [clojure.java.shell :refer [sh]] - [jackdaw.admin :as ja] - [simple-ledger])) - -(def system nil) - -(defn kafka-admin-client-config - [] - {"bootstrap.servers" "localhost:9092"}) - -(defn create-topics - "Takes a list of topics and creates these using the names given." - [topic-config-list] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (ja/create-topics! client topic-config-list))) - -(defn re-delete-topics - "Takes an instance of java.util.regex.Pattern and deletes any Kafka - topics that match." - [re] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (let [topics-to-delete (->> (ja/list-topics client) - (filter #(re-find re (:topic-name %))))] - (ja/delete-topics! client topics-to-delete)))) - -(defn application-id - "Takes an application config and returns an `application.id`." - [app-config] - (get app-config "application.id")) - -(defn destroy-state-stores - "Takes an application config and deletes local files associated with - internal state." - [app-config] - (sh "rm" "-rf" (str "/tmp/kafka-streams/" - (application-id app-config))) - (log/info "internal state is deleted")) - -(defn stop - "Stops the app, and deletes topics and internal state. - This functions is required by the `user` namespace and should not - be called directly." - [] - (when system - (simple-ledger/stop-app (:app system))) - (let [app-id (application-id (simple-ledger/app-config)) - re (re-pattern (str "(ledger-entries-requested" - "|ledger-transaction-added" - "|" app-id - ".*)"))] - (re-delete-topics re) - (destroy-state-stores (simple-ledger/app-config)))) - -(defn start - "Creates topics, and starts the app. - This functions is required by the `user` namespace and should not - be called directly." - [] - (with-out-str (stop)) - (create-topics (map simple-ledger/topic-config ["ledger-entries-requested" - "ledger-transaction-added"])) - {:app (simple-ledger/start-app (simple-ledger/app-config))}) diff --git a/examples/simple-ledger/dev/user.clj b/examples/simple-ledger/dev/user.clj new file mode 100644 index 00000000..cc2d6a3d --- /dev/null +++ b/examples/simple-ledger/dev/user.clj @@ -0,0 +1,90 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [simple-ledger :as sl])) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"]) + :topic-metadata {:entry-added + {:topic-name "entry-requested" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-pending + {:topic-name "transaction-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-added + {:topic-name "transaction-added" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}} + + :topology {:topology-builder sl/topology-builder + :xforms [#'sl/xf-split-entries #'sl/xf-running-balances] + :swap-fn jxf/kv-store-swap-fn} + + :app {:streams-config sl/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata)) + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}] + (let [xform-map (reduce-kv (fn [m k v] + (let [k (keyword (str (:ns (meta v))) + (str (:name (meta v))))] + (assoc m k #(v % jxf/kv-store-swap-fn)))) + {} + xforms) + streams-builder (j/streams-builder)] + ((topology-builder topic-metadata xform-map) streams-builder))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/simple-ledger/resources/logback.xml b/examples/simple-ledger/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/simple-ledger/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index 4f5a3d60..b3679e91 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -1,414 +1,225 @@ (ns simple-ledger - "This tutorial contains a simple stream processing application using - Jackdaw and Kafka Streams. - - It begins with Pipe which is then extended using an interactive - workflow. The result is a simple ledger." + "This example creates a simple accounting ledger using transducers." (:gen-class) - (:require [clojure.spec.alpha :as s] + (:require [clojure.string :as str] [clojure.tools.logging :refer [info]] - [clj-uuid :as uuid] + [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.serdes.edn :as jse]) - (:import [org.apache.kafka.common.serialization Serdes])) - - -;;; Topic Configuration -;;; - -(defn topic-config - "Takes a topic name and (optionally) a key and value serde and - returns a topic configuration map, which may be used to create a - topic or produce/consume records." - ([topic-name] - (topic-config topic-name (jse/serde) (jse/serde))) + [jackdaw.streams.xform :as jxf])) + + +(defn xf-split-entries + [_ _] + (map (fn [[k {:keys [debit-account credit-account amount] :as entry}]] + [[debit-account + {:account-name debit-account + :debit-credit-indicator :dr + :amount amount}] + [credit-account + {:account-name credit-account + :debit-credit-indicator :cr + :amount amount}]]))) + +(defn next-balances + [starting-balances {:keys [account-name debit-credit-indicator amount] + :as transaction}] + (let [op (if (= :dr debit-credit-indicator) - +)] + (update starting-balances account-name (fnil op 0) amount))) + +(defn xf-running-balances + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + {:keys [account-name debit-credit-indicator amount] :as entry} v + next (as-> entry % + (swap-fn state next-balances %) + (select-keys % [account-name]) + ((juxt (comp first keys) (comp first vals)) %) + (zipmap [:account-name :current-balance] %) + (assoc % :starting-balance (if (= :dr debit-credit-indicator) + (+ amount (:current-balance %)) + (- amount (:current-balance %)))) + (vector k %) + (vector %))] + (rf result next)))))) - ([topic-name key-serde value-serde] - {:topic-name topic-name - :partition-count 1 - :replication-factor 1 - :topic-config {} - :key-serde key-serde - :value-serde value-serde})) - -;;; App Template -;;; - -(defn app-config - "Returns the application config." - [] +(comment + ;; Use this comment block to explore the Simple Ledger using Clojure + ;; transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/simple-ledger + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def coll + [[nil {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + [nil {:debit-account "cash" + :credit-account "sales" + :amount 2000}]]) + + ;; Let's record the entries. Evaluate the form: + (->> coll + (transduce (xf-split-entries nil nil) concat) + (transduce (xf-running-balances (atom {}) swap!) concat)) + + ;; You should see output like the following: + + ;; (["tech" + ;; {:account-name "tech" + ;; :starting-balance 0 + ;; :current-balance -1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :starting-balance 0 + ;; :current-balance 1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :starting-balance 1000 + ;; :current-balance -1000}] + ;; ["sales" + ;; {:account-name "sales" + ;; :starting-balance 0 + ;; :current-balance 2000}]) + + + ;; This time, let's count the words using + ;; `jackdaw.streams.xform/fake-kv-store` which implements the + ;; KeyValueStore interface with overrides for get and put." + + ;; Evaluate the form: + (->> coll + (transduce (xf-split-entries nil nil) concat) + (transduce (xf-running-balances (jxf/fake-kv-store {}) + jxf/kv-store-swap-fn) concat)) + + ;; You should see the same output. + ) + + +(def streams-config {"application.id" "simple-ledger" - "bootstrap.servers" "localhost:9092" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") "cache.max.bytes.buffering" "0"}) -(defn build-topology - "Returns a topology builder. - - WARNING: This is just a stub. Before publishing to the input topic, - evaluate one of the `build-topology` functions in the comment forms - below." - [builder] - (-> (j/kstream builder (topic-config "ledger-entries-requested")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "ledger-transaction-added"))) - builder) - -(defn start-app - "Starts the stream processing application." - [app-config] - (let [builder (j/streams-builder) - topology (build-topology builder) - app (j/kafka-streams topology app-config)] - (j/start app) - (info "simple-ledger is up") - app)) - -(defn stop-app - "Stops the stream processing application." - [app] - (j/close app) - (info "simple-ledger is down")) - - -(defn -main - [& _] - (start-app (app-config))) - - -(comment - ;;; Start - ;;; - - ;; Needed to invoke the forms from this namespace. When typing - ;; directly in the REPL, skip this step. - (require '[user :refer :all :exclude [topic-config]]) - - - ;; Start ZooKeeper and Kafka. - ;; This requires the Confluent Platform CLI which may be obtained - ;; from `https://www.confluent.io/download/`. If ZooKeeper and Kafka - ;; are already running, skip this step. - (confluent/start) - - - ;; Create the topics, and start the app. - (start) - - - ;; Write to the input stream. - (publish (topic-config "ledger-entries-requested") - nil - "this is a pipe") - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added"))) +(defn topology-builder + [{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder entry-requested) + (jxf/transduce-kstream (::xf-split-entries xforms)) + (j/through transaction-pending) + (jxf/transduce-kstream (::xf-running-balances xforms)) + (j/to transaction-added)) + builder)) (comment - ;;; Add input validation - ;;; - - (do - - (s/def ::ledger-entries-requested-value - (s/keys :req-un [::id - ::entries])) - - (s/def ::id uuid?) - (s/def ::entries (s/+ ::entry)) - - (s/def ::entry - (s/and (s/keys :req-un [::debit-account-name - ::credit-account-name - ::amount]) - #(not= (:debit-account-name %) - (:credit-account-name %)))) - - (s/def ::debit-account-name string?) - (s/def ::credit-account-name string?) - (s/def ::amount pos-int?)) - - - (do - - (defn valid-input? - [[_ v]] - (s/valid? ::ledger-entries-requested-value v)) - + ;; Use this comment block to explore the Simple Ledger as a stream + ;; processing application. - (defn log-bad-input - [[k v]] - (info (str "Bad input: " - (s/explain-data ::ledger-entries-requested-value v)))) + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/simple-ledger + ;; clj -A:dev + ;; ``` - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input)] - - (-> valid - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Reset the app. + ;; Evaluate the form: (reset) - - ;; Write invalid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "foo" - :amount 10}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added"))) - - -(comment - ;;; Split `ledger-entries-requested` events into ledger - ;;; transactions (aka debits and credits). - ;;; - - (do - - (defn entry-sides - [{:keys [debit-account-name - credit-account-name - amount]}] - [[debit-account-name - {:account-name debit-account-name - :amount (- amount)}] - [credit-account-name - {:account-name credit-account-name - :amount amount}]]) - - - (defn entries->transactions - [[_ v]] - (reduce #(concat %1 (entry-sides %2)) [] (:entries v))) - - - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) - - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input) - - transactions (j/flat-map valid entries->transactions)] - - (-> transactions - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added"))) - - -(comment - ;;; Add unique identifiers and reference IDs. - ;;; - - (defn entries->transactions - [[_ v]] - (->> (:entries v) - (reduce #(concat %1 (entry-sides %2)) []) - (map-indexed #(assoc-in %2 [1 :id] (uuid/v5 (:id v) %1))) - (map #(assoc-in %1 [1 :causation-id] (:id v))))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Read from the output stream ("foo" only). - (->> (get-keyvals (topic-config "ledger-transaction-added")) - (filter (fn [[k v]] (= "foo" k))))) - - -(comment - ;;; Keep track of running balances. - ;;; - - (do - - (defn account-balance-reducer - [x y] - (let [starting-balance (:current-balance x)] - (merge y {:starting-balance starting-balance - :current-balance (+ starting-balance (:amount y))}))) - - - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) - - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input) - - transactions (j/flat-map valid entries->transactions) - - balances - (-> transactions - (j/map-values (fn [v] - (merge v - {:starting-balance 0 - :current-balance (:amount v)}))) - (j/group-by-key (topic-config nil (Serdes/String) - (jse/serde))) - (j/reduce account-balance-reducer - (topic-config "balances")))] - - (-> balances - (j/to-kstream) - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Read from the output stream ("foo" only). - (->> (get-keyvals (topic-config "ledger-transaction-added")) - (filter (fn [[k v]] (= "foo" k))))) + ;; Evaluate the form: + (let [coll [{:debit-account "tech" + :credit-account "cash" + :amount 1000} + {:debit-account "cash" + :credit-account "sales" + :amount 2000}]] + (doseq [x coll] + (publish (:entry-requested topic-metadata) nil x))) + + ;; Evaluate the form: + (get-keyvals (:transaction-added topic-metadata)) + + ;; You should see output like the following. Notice transaction + ;; order is not preserved: + + ;; (["sales" + ;; {:account-name "sales" + ;; :starting-balance 0 + ;; :current-balance 2000}] + ;; ["tech" + ;; {:account-name "tech" + ;; :starting-balance 0 + ;; :current-balance -1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :starting-balance 0 + ;; :current-balance 1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :starting-balance 1000 + ;; :current-balance -1000}]) + + + ;; The `transaction-added` topic has 15 partitions. Let's see how + ;; the records distributed. Evaluate the form: + (->> (get-records (:transaction-added topic-metadata)) + (map (fn [x] + (select-keys x [:key :offset :partition :value])))) + + ;; You should see output like the following. The balances are spread + ;; across partitions 0, 11, and 14. Transaction order is preserved + ;; only for each account. There is no global order. + + ;; ({:key "sales" + ;; :offset 0 + ;; :partition 0 + ;; :value + ;; {:account-name "sales" + ;; :starting-balance 0 + ;; :current-balance 2000}} + ;; {:key "tech" + ;; :offset 0 + ;; :partition 11 + ;; :value + ;; {:account-name "tech" + ;; :starting-balance 0 + ;; :current-balance -1000}} + ;; {:key "cash" + ;; :offset 0 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :starting-balance 0 + ;; :current-balance 1000}} + ;; {:key "cash" + ;; :offset 1 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :starting-balance 1000 + ;; :current-balance -1000}}) + ) diff --git a/examples/simple-ledger/test/simple_ledger_test.clj b/examples/simple-ledger/test/simple_ledger_test.clj deleted file mode 100644 index e497e483..00000000 --- a/examples/simple-ledger/test/simple_ledger_test.clj +++ /dev/null @@ -1,64 +0,0 @@ -(ns simple-ledger-test - (:require [simple-ledger :as sut] - [jackdaw.streams.mock :as jsm] - [clojure.test :refer :all])) - - -(deftest build-topology-unit-test - (testing "simple ledger unit test" - (let [driver (jsm/build-driver sut/build-topology) - publish (partial jsm/publish driver) - get-keyvals (partial jsm/get-keyvals driver)] - - (publish (sut/topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 2} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 3}]}) - - (publish (sut/topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 5} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 7}]}) - - (let [keyvals (get-keyvals - (sut/topic-config "ledger-transaction-added"))] - - (is (= [-2 -3 -5 -7] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :amount)))) - - (is (= [0 -2 -5 -10] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :starting-balance)))) - - (is (= [-2 -5 -10 -17] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :current-balance)))) - - (is (= [2 5] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :amount)))) - - (is (= [0 2] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :starting-balance)))) - - (is (= [2 7] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :current-balance)))))))) diff --git a/examples/word-count/dev/user.clj b/examples/word-count/dev/user.clj index 309a58b8..25722d52 100644 --- a/examples/word-count/dev/user.clj +++ b/examples/word-count/dev/user.clj @@ -23,6 +23,9 @@ :topics (ig/ref :topics)}}) +(integrant.repl/set-prep! (constantly repl-config)) + + (defmethod ig/init-key :topology [_ {:keys [topology-builder]}] (let [streams-builder (j/streams-builder)] ((topology-builder topic-metadata) streams-builder))) @@ -49,10 +52,6 @@ (defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] (j/close streams-app) - ;; BUG: Does not delete state on reset! (destroy-state-stores streams-config) (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] (re-delete-topics (:client-config topics) re))) - - -(integrant.repl/set-prep! (constantly repl-config)) diff --git a/examples/xf-word-count/.gitignore b/examples/xf-word-count/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/xf-word-count/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/xf-word-count/README.md b/examples/xf-word-count/README.md new file mode 100644 index 00000000..e6871924 --- /dev/null +++ b/examples/xf-word-count/README.md @@ -0,0 +1,3 @@ +# XF Word Count + +This is the classic 'word count' example done using transducers. diff --git a/examples/xf-word-count/deps.edn b/examples/xf-word-count/deps.edn new file mode 100644 index 00000000..f71d53ce --- /dev/null +++ b/examples/xf-word-count/deps.edn @@ -0,0 +1,28 @@ +{:paths + ["src" "resources"] + + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} + + :aliases + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj new file mode 100644 index 00000000..5b59b440 --- /dev/null +++ b/examples/xf-word-count/dev/user.clj @@ -0,0 +1,69 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [xf-word-count :as xfwc])) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topology {:topology-builder xfwc/topology-builder + :xform xfwc/xf + :swap-fn jxf/kv-store-swap-fn} + + :topics {:streams-config xfwc/streams-config + :client-config (select-keys xfwc/streams-config + ["bootstrap.servers"]) + :topology (ig/ref :topology)} + + :app {:streams-config xfwc/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] + (let [streams-builder (j/streams-builder)] + ((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) + +(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology] + :as opts}] + (let [topic-metadata (topology->topic-metadata topology streams-config)] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/xf-word-count/resources/logback.xml b/examples/xf-word-count/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/xf-word-count/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj new file mode 100644 index 00000000..f8eed98c --- /dev/null +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -0,0 +1,145 @@ +(ns xf-word-count + "This is the classic 'word count' example done using transducers." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf])) + + +(defn xf-running-total + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + next (as-> v % + (swap-fn state #(merge-with (fnil + 0) %1 %2) %) + (select-keys % (keys v)) + (map vec %))] + (rf result next)))))) + +(defn xf + [state swap-fn] + (comp + (map (fn [[k v]] [k (str/split v #" ")])) + (map (fn [[k v]] [k (frequencies v)])) + (xf-running-total state swap-fn))) + + +(comment + ;; Use this comment block to explore Word Count using Clojure + ;; transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def coll + [[nil "inside every large program"] + [nil "is a small program"] + [nil "struggling to get out"]]) + + ;; Let's counts the words. Evaluate the form: + (transduce (xf (atom {}) swap!) concat coll) + + ;; You should see output like the following: + + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + + + ;; This time, let's count the words using + ;; `jackdaw.streams.xform/fake-kv-store` which implements the + ;; KeyValueStore interface with overrides for get and put." + + ;; Evaluate the form: + (transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) + + ;; You should see the same output. + ) + + +(def streams-config + {"application.id" "xf-word-count" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + "cache.max.bytes.buffering" "0"}) + +(defn topology-builder + [{:keys [input output] :as topics} xf] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder input) + (jxf/transduce-kstream xf) + (j/to output)) + builder)) + + +(comment + ;; Use this comment block to explore Word Count as a stream + ;; processing application. + + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (reset) + + ;; Evaluate the form: + (let [coll ["inside every large program" + "is a small program" + "struggling to get out"]] + (doseq [x coll] + (publish (:input topic-metadata) nil x))) + + ;; Evaluate the form: + (get-keyvals (:output topic-metadata)) + + ;; You should see output like the following: + + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + ) diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj new file mode 100644 index 00000000..e024f0d4 --- /dev/null +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -0,0 +1,76 @@ +(ns xf-word-count-test + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jtf] + [xf-word-count :as xfwc]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) + +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata {:input + {:topic-name "input" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}} + :app-config xfwc/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn))) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(def mock-transport-config + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (j/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(def test-transport + (jt/mock-transport mock-transport-config (:topic-metadata test-config))) + +(defn done? + [journal] + (= 12 (count (get-in journal [:topics :output])))) + +(def commands + [[:write! :input "inside every large program" {:key-fn (constantly "")}] + [:write! :input "is a small program" {:key-fn (constantly "")}] + [:write! :input "struggling to get out" {:key-fn (constantly "")}] + [:watch done? {:timeout 2000}]]) + +(defn word-count + [journal word] + (->> (get-in journal [:topics :output]) + (filter (fn [x] (= word (:key x)))) + last + :value)) + +(deftest test-xf-word-count + (jtf/with-fixtures [(jtf/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine test-transport + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + + (is (= 1 (word-count journal "large"))) + (is (= 2 (word-count journal "program")))))))) diff --git a/src/jackdaw/streams/xform.clj b/src/jackdaw/streams/xform.clj new file mode 100644 index 00000000..3162fcc3 --- /dev/null +++ b/src/jackdaw/streams/xform.clj @@ -0,0 +1,67 @@ +(ns jackdaw.streams.xform + "Helper functions for working with transducers." + (:gen-class) + (:require [jackdaw.serdes :as js] + [jackdaw.streams :as j]) + (:import org.apache.kafka.streams.kstream.Transformer + org.apache.kafka.streams.KeyValue + [org.apache.kafka.streams.state KeyValueStore Stores] + org.apache.kafka.streams.StreamsBuilder)) + +(defn fake-kv-store + "Creates an instance of org.apache.kafka.streams.state.KeyValueStore + with overrides for get and put." + [init] + (let [store (volatile! init)] + (reify KeyValueStore + (get [_ k] + (clojure.core/get @store k)) + + (put [_ k v] + (vswap! store assoc k v))))) + +(defn kv-store-swap-fn + "Takes an instance of KeyValueStore, a function f, and map m, and + updates the store in a manner similar to `clojure.core/swap!`." + [^KeyValueStore store f m] + (let [ks (keys (f {} m)) + prev (reduce (fn [p k] + (assoc p k (.get store k))) + {} + ks) + next (f prev m)] + (doall (map (fn [[k v]] (.put store k v)) next)) + next)) + +(defn add-state-store! + [builder] + "Takes a builder and adds a state store." + (doto ^StreamsBuilder (j/streams-builder* builder) + (.addStateStore (Stores/keyValueStoreBuilder + (Stores/persistentKeyValueStore "transducer") + (js/edn-serde) + (js/edn-serde)))) + builder) + +(defn transformer + "Takes a transducer and creates an instance of + org.apache.kafka.streams.kstream.Transformer with overrides for + init, transform, and close." + [xf] + (let [ctx (atom nil)] + (reify + Transformer + (init [_ context] + (reset! ctx context)) + (transform [_ k v] + (let [^KeyValueStore store (.getStateStore @ctx "transducer") + v (first (into [] (xf store) [[k v]]))] + (KeyValue/pair k v))) + (close [_])))) + +(defn transduce-kstream + [kstream xf] + "Takes a kstream and xf and transduces the stream." + (-> kstream + (j/transform (fn [] (transformer xf)) ["transducer"]) + (j/flat-map (fn [[_ v]] v)))) From 08ae80f28ee1e70d841a13ea256f817669834399 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Thu, 26 Sep 2019 09:29:55 -0700 Subject: [PATCH 2/8] Replace topology xf-word-count topology with classic Word Count to rule out testing bug --- examples/xf-word-count/dev/user.clj | 6 ++--- examples/xf-word-count/src/xf_word_count.clj | 5 ++++ .../xf-word-count/test/xf_word_count_test.clj | 27 ++++++++++++++++++- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj index 5b59b440..6f27ccb2 100644 --- a/examples/xf-word-count/dev/user.clj +++ b/examples/xf-word-count/dev/user.clj @@ -18,9 +18,6 @@ [xf-word-count :as xfwc])) -(integrant.repl/set-prep! (constantly repl-config)) - - (def repl-config "The development config. When the 'dev' alias is active, this config will be used." @@ -38,6 +35,9 @@ :topics (ig/ref :topics)}}) +(integrant.repl/set-prep! (constantly repl-config)) + + (defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] (let [streams-builder (j/streams-builder)] ((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj index f8eed98c..ad3f3c0e 100644 --- a/examples/xf-word-count/src/xf_word_count.clj +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -82,6 +82,11 @@ (def streams-config {"application.id" "xf-word-count" "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + + ;; TEMP: These were added to rule out a testing bug so we can test with classic Word Count + "default.key.serde" "jackdaw.serdes.EdnSerde" + "default.value.serde" "jackdaw.serdes.EdnSerde" + "cache.max.bytes.buffering" "0"}) (defn topology-builder diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj index e024f0d4..1c2b43da 100644 --- a/examples/xf-word-count/test/xf_word_count_test.clj +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -28,10 +28,35 @@ :app-config xfwc/streams-config :enable? (System/getenv "BOOTSTRAP_SERVERS")}) -(defn topology-builder +#_(defn topology-builder [topic-metadata] (xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn))) +;; TEMP: This was added to rule out a testing bug so we can test with classic Word Count +(defn split-lines + "Takes an input string and returns a list of words with the + whitespace removed." + [s] + (clojure.string/split (clojure.string/lower-case s) #"\W+")) + +;; TEMP: This was added to rule out a testing bug so we can test with classic Word Count +(defn topology-builder + "Takes topic metadata and returns a function that builds the topology." + [topic-metadata] + (fn [builder] + (let [text-input (j/kstream builder (:input topic-metadata)) + + counts (-> text-input + (j/flat-map-values split-lines) + (j/group-by (fn [[_ v]] v)) + (j/count))] + + (-> counts + (j/to-kstream) + (j/to (:output topic-metadata))) + + builder))) + (defn props-for [x] (doto (Properties.) From 05cfd616e2a21b30897045ecf9693fc9a0a63e80 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Thu, 26 Sep 2019 11:38:26 -0700 Subject: [PATCH 3/8] Fix test-xf-word-count --- examples/xf-word-count/src/xf_word_count.clj | 5 -- .../xf-word-count/test/xf_word_count_test.clj | 77 +++++++------------ 2 files changed, 29 insertions(+), 53 deletions(-) diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj index ad3f3c0e..f8eed98c 100644 --- a/examples/xf-word-count/src/xf_word_count.clj +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -82,11 +82,6 @@ (def streams-config {"application.id" "xf-word-count" "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") - - ;; TEMP: These were added to rule out a testing bug so we can test with classic Word Count - "default.key.serde" "jackdaw.serdes.EdnSerde" - "default.value.serde" "jackdaw.serdes.EdnSerde" - "cache.max.bytes.buffering" "0"}) (defn topology-builder diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj index 1c2b43da..cfd68388 100644 --- a/examples/xf-word-count/test/xf_word_count_test.clj +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -4,58 +4,37 @@ [jackdaw.serdes :as js] [jackdaw.streams :as j] [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] [jackdaw.test :as jt] - [jackdaw.test.fixtures :as jtf] + [jackdaw.test.fixtures :as jt.fix] [xf-word-count :as xfwc]) (:import java.util.Properties org.apache.kafka.streams.TopologyTestDriver)) +(def topic-metadata + {:input + {:topic-name "input" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) + (def test-config {:broker-config {"bootstrap.servers" "localhost:9092"} - :topic-metadata {:input - {:topic-name "input" - :partition-count 1 - :replication-factor 1 - :key-serde (js/edn-serde) - :value-serde (js/edn-serde)} - - :output - {:topic-name "output" - :partition-count 1 - :replication-factor 1 - :key-serde (js/edn-serde) - :value-serde (js/edn-serde)}} - :app-config xfwc/streams-config - :enable? (System/getenv "BOOTSTRAP_SERVERS")}) - -#_(defn topology-builder - [topic-metadata] - (xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn))) + :topic-metadata topic-metadata + :app-config xfwc/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) -;; TEMP: This was added to rule out a testing bug so we can test with classic Word Count -(defn split-lines - "Takes an input string and returns a list of words with the - whitespace removed." - [s] - (clojure.string/split (clojure.string/lower-case s) #"\W+")) - -;; TEMP: This was added to rule out a testing bug so we can test with classic Word Count (defn topology-builder - "Takes topic metadata and returns a function that builds the topology." [topic-metadata] - (fn [builder] - (let [text-input (j/kstream builder (:input topic-metadata)) - - counts (-> text-input - (j/flat-map-values split-lines) - (j/group-by (fn [[_ v]] v)) - (j/count))] - - (-> counts - (j/to-kstream) - (j/to (:output topic-metadata))) - - builder))) + (xfwc/topology-builder topic-metadata #(xfwc/xf % jxf/kv-store-swap-fn))) (defn props-for [x] @@ -65,14 +44,16 @@ {} x)))) -(def mock-transport-config +(defn mock-transport-config + [] {:driver (let [streams-builder (j/streams-builder) topology ((topology-builder (:topic-metadata test-config)) streams-builder)] - (TopologyTestDriver. (.build (j/streams-builder* topology)) + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) (props-for (:app-config test-config))))}) -(def test-transport - (jt/mock-transport mock-transport-config (:topic-metadata test-config))) +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) (defn done? [journal] @@ -92,8 +73,8 @@ :value)) (deftest test-xf-word-count - (jtf/with-fixtures [(jtf/integration-fixture topology-builder test-config)] - (jackdaw.test/with-test-machine test-transport + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) (fn [machine] (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] From 2136e1c086edc6718ed729b661c3567c2288aa9e Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Sun, 29 Sep 2019 21:52:53 -0700 Subject: [PATCH 4/8] Remove nils from keys in examples and change simple ledger map keys --- examples/simple-ledger/src/simple_ledger.clj | 60 ++++++++++---------- examples/xf-word-count/src/xf_word_count.clj | 6 +- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index b3679e91..7e6e7ca6 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -39,10 +39,10 @@ (swap-fn state next-balances %) (select-keys % [account-name]) ((juxt (comp first keys) (comp first vals)) %) - (zipmap [:account-name :current-balance] %) - (assoc % :starting-balance (if (= :dr debit-credit-indicator) - (+ amount (:current-balance %)) - (- amount (:current-balance %)))) + (zipmap [:account-name :after-balance] %) + (assoc % :before-balance (if (= :dr debit-credit-indicator) + (+ amount (:after-balance %)) + (- amount (:after-balance %)))) (vector k %) (vector %))] (rf result next)))))) @@ -63,10 +63,10 @@ ;; Evaluate the form: (def coll - [[nil {:debit-account "tech" + [["1" {:debit-account "tech" :credit-account "cash" :amount 1000}] - [nil {:debit-account "cash" + ["2" {:debit-account "cash" :credit-account "sales" :amount 2000}]]) @@ -79,20 +79,20 @@ ;; (["tech" ;; {:account-name "tech" - ;; :starting-balance 0 - ;; :current-balance -1000}] + ;; :before-balance 0 + ;; :after-balance -1000}] ;; ["cash" ;; {:account-name "cash" - ;; :starting-balance 0 - ;; :current-balance 1000}] + ;; :before-balance 0 + ;; :after-balance 1000}] ;; ["cash" ;; {:account-name "cash" - ;; :starting-balance 1000 - ;; :current-balance -1000}] + ;; :before-balance 1000 + ;; :after-balance -1000}] ;; ["sales" ;; {:account-name "sales" - ;; :starting-balance 0 - ;; :current-balance 2000}]) + ;; :before-balance 0 + ;; :after-balance 2000}]) ;; This time, let's count the words using @@ -168,20 +168,20 @@ ;; (["sales" ;; {:account-name "sales" - ;; :starting-balance 0 - ;; :current-balance 2000}] + ;; :before-balance 0 + ;; :after-balance 2000}] ;; ["tech" ;; {:account-name "tech" - ;; :starting-balance 0 - ;; :current-balance -1000}] + ;; :before-balance 0 + ;; :after-balance -1000}] ;; ["cash" ;; {:account-name "cash" - ;; :starting-balance 0 - ;; :current-balance 1000}] + ;; :before-balance 0 + ;; :after-balance 1000}] ;; ["cash" ;; {:account-name "cash" - ;; :starting-balance 1000 - ;; :current-balance -1000}]) + ;; :before-balance 1000 + ;; :after-balance -1000}]) ;; The `transaction-added` topic has 15 partitions. Let's see how @@ -199,27 +199,27 @@ ;; :partition 0 ;; :value ;; {:account-name "sales" - ;; :starting-balance 0 - ;; :current-balance 2000}} + ;; :before-balance 0 + ;; :after-balance 2000}} ;; {:key "tech" ;; :offset 0 ;; :partition 11 ;; :value ;; {:account-name "tech" - ;; :starting-balance 0 - ;; :current-balance -1000}} + ;; :before-balance 0 + ;; :after-balance -1000}} ;; {:key "cash" ;; :offset 0 ;; :partition 14 ;; :value ;; {:account-name "cash" - ;; :starting-balance 0 - ;; :current-balance 1000}} + ;; :before-balance 0 + ;; :after-balance 1000}} ;; {:key "cash" ;; :offset 1 ;; :partition 14 ;; :value ;; {:account-name "cash" - ;; :starting-balance 1000 - ;; :current-balance -1000}}) + ;; :before-balance 1000 + ;; :after-balance -1000}}) ) diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj index f8eed98c..80f6603b 100644 --- a/examples/xf-word-count/src/xf_word_count.clj +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -45,9 +45,9 @@ ;; Evaluate the form: (def coll - [[nil "inside every large program"] - [nil "is a small program"] - [nil "struggling to get out"]]) + [["1" "inside every large program"] + ["2" "is a small program"] + ["3" "struggling to get out"]]) ;; Let's counts the words. Evaluate the form: (transduce (xf (atom {}) swap!) concat coll) From 4ee5d7fc4ee7befe77956533cd58c49325eecd1d Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Wed, 2 Oct 2019 17:29:29 -0700 Subject: [PATCH 5/8] Remove xf- prefix from transducer naming, move fake-kv-store to different ns --- examples/simple-ledger/dev/user.clj | 4 +-- examples/simple-ledger/src/simple_ledger.clj | 29 +++++++++++--------- examples/xf-word-count/dev/user.clj | 2 +- examples/xf-word-count/src/xf_word_count.clj | 15 +++++----- src/jackdaw/streams/xform.clj | 12 -------- src/jackdaw/streams/xform/fakes.clj | 15 ++++++++++ 6 files changed, 42 insertions(+), 35 deletions(-) create mode 100644 src/jackdaw/streams/xform/fakes.clj diff --git a/examples/simple-ledger/dev/user.clj b/examples/simple-ledger/dev/user.clj index cc2d6a3d..f2f47e8e 100644 --- a/examples/simple-ledger/dev/user.clj +++ b/examples/simple-ledger/dev/user.clj @@ -23,7 +23,7 @@ When the 'dev' alias is active, this config will be used." {:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"]) :topic-metadata {:entry-added - {:topic-name "entry-requested" + {:topic-name "entry-pending" :partition-count 15 :replication-factor 1 :key-serde (js/edn-serde) @@ -44,7 +44,7 @@ :value-serde (js/edn-serde)}}} :topology {:topology-builder sl/topology-builder - :xforms [#'sl/xf-split-entries #'sl/xf-running-balances] + :xforms [#'sl/split-entries #'sl/running-balances] :swap-fn jxf/kv-store-swap-fn} :app {:streams-config sl/streams-config diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index 7e6e7ca6..44a1c286 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -5,10 +5,11 @@ [clojure.tools.logging :refer [info]] [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.streams.xform :as jxf])) + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) -(defn xf-split-entries +(defn split-entries [_ _] (map (fn [[k {:keys [debit-account credit-account amount] :as entry}]] [[debit-account @@ -26,7 +27,7 @@ (let [op (if (= :dr debit-credit-indicator) - +)] (update starting-balances account-name (fnil op 0) amount))) -(defn xf-running-balances +(defn running-balances [state swap-fn] (fn [rf] (fn @@ -72,8 +73,8 @@ ;; Let's record the entries. Evaluate the form: (->> coll - (transduce (xf-split-entries nil nil) concat) - (transduce (xf-running-balances (atom {}) swap!) concat)) + (transduce (split-entries nil nil) concat) + (transduce (running-balances (atom {}) swap!) concat)) ;; You should see output like the following: @@ -96,14 +97,14 @@ ;; This time, let's count the words using - ;; `jackdaw.streams.xform/fake-kv-store` which implements the + ;; `jackdaw.streams.xform.fakes/fake-kv-store` which implements the ;; KeyValueStore interface with overrides for get and put." ;; Evaluate the form: (->> coll - (transduce (xf-split-entries nil nil) concat) - (transduce (xf-running-balances (jxf/fake-kv-store {}) - jxf/kv-store-swap-fn) concat)) + (transduce (split-entries nil nil) concat) + (transduce (running-balances (fakes/fake-kv-store {}) + jxf/kv-store-swap-fn) concat)) ;; You should see the same output. ) @@ -115,13 +116,15 @@ "cache.max.bytes.buffering" "0"}) (defn topology-builder - [{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms] + [{:keys [entry-pending + transaction-pending + transaction-added] :as topics} xforms] (fn [builder] (jxf/add-state-store! builder) - (-> (j/kstream builder entry-requested) - (jxf/transduce-kstream (::xf-split-entries xforms)) + (-> (j/kstream builder entry-pending) + (jxf/transduce-kstream (::split-entries xforms)) (j/through transaction-pending) - (jxf/transduce-kstream (::xf-running-balances xforms)) + (jxf/transduce-kstream (::running-balances xforms)) (j/to transaction-added)) builder)) diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj index 6f27ccb2..6b5c4312 100644 --- a/examples/xf-word-count/dev/user.clj +++ b/examples/xf-word-count/dev/user.clj @@ -22,7 +22,7 @@ "The development config. When the 'dev' alias is active, this config will be used." {:topology {:topology-builder xfwc/topology-builder - :xform xfwc/xf + :xform xfwc/count-words :swap-fn jxf/kv-store-swap-fn} :topics {:streams-config xfwc/streams-config diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj index 80f6603b..d0339cc9 100644 --- a/examples/xf-word-count/src/xf_word_count.clj +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -5,10 +5,11 @@ [clojure.tools.logging :refer [info]] [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.streams.xform :as jxf])) + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) -(defn xf-running-total +(defn running-total [state swap-fn] (fn [rf] (fn @@ -22,12 +23,12 @@ (map vec %))] (rf result next)))))) -(defn xf +(defn count-words [state swap-fn] (comp (map (fn [[k v]] [k (str/split v #" ")])) (map (fn [[k v]] [k (frequencies v)])) - (xf-running-total state swap-fn))) + (running-total state swap-fn))) (comment @@ -50,7 +51,7 @@ ["3" "struggling to get out"]]) ;; Let's counts the words. Evaluate the form: - (transduce (xf (atom {}) swap!) concat coll) + (transduce (count-words (atom {}) swap!) concat coll) ;; You should see output like the following: @@ -69,11 +70,11 @@ ;; This time, let's count the words using - ;; `jackdaw.streams.xform/fake-kv-store` which implements the + ;; `jackdaw.streams.xform.fakes/fake-kv-store` which implements the ;; KeyValueStore interface with overrides for get and put." ;; Evaluate the form: - (transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) + (transduce (count-words (fakes/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) ;; You should see the same output. ) diff --git a/src/jackdaw/streams/xform.clj b/src/jackdaw/streams/xform.clj index 3162fcc3..9b3cfd9f 100644 --- a/src/jackdaw/streams/xform.clj +++ b/src/jackdaw/streams/xform.clj @@ -8,18 +8,6 @@ [org.apache.kafka.streams.state KeyValueStore Stores] org.apache.kafka.streams.StreamsBuilder)) -(defn fake-kv-store - "Creates an instance of org.apache.kafka.streams.state.KeyValueStore - with overrides for get and put." - [init] - (let [store (volatile! init)] - (reify KeyValueStore - (get [_ k] - (clojure.core/get @store k)) - - (put [_ k v] - (vswap! store assoc k v))))) - (defn kv-store-swap-fn "Takes an instance of KeyValueStore, a function f, and map m, and updates the store in a manner similar to `clojure.core/swap!`." diff --git a/src/jackdaw/streams/xform/fakes.clj b/src/jackdaw/streams/xform/fakes.clj new file mode 100644 index 00000000..2e3e1bfd --- /dev/null +++ b/src/jackdaw/streams/xform/fakes.clj @@ -0,0 +1,15 @@ +(ns jackdaw.streams.xform.fakes + (:import + [org.apache.kafka.streams.state KeyValueStore])) + +(defn fake-kv-store + "Creates an instance of org.apache.kafka.streams.state.KeyValueStore + with overrides for get and put." + [init] + (let [store (volatile! init)] + (reify KeyValueStore + (get [_ k] + (clojure.core/get @store k)) + + (put [_ k v] + (vswap! store assoc k v))))) From fe1a74b43821c2a3b7447ccdc9f332bd3eb1b9e8 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Wed, 9 Oct 2019 15:26:04 -0700 Subject: [PATCH 6/8] When publishing test data, use "entry-pending" --- examples/simple-ledger/src/simple_ledger.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index 44a1c286..f6325d6d 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -161,7 +161,7 @@ :credit-account "sales" :amount 2000}]] (doseq [x coll] - (publish (:entry-requested topic-metadata) nil x))) + (publish (:entry-pending topic-metadata) nil x))) ;; Evaluate the form: (get-keyvals (:transaction-added topic-metadata)) From 01317c2a9ed45397365376015aa903acb06ef877 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Thu, 12 Dec 2019 20:40:48 -0800 Subject: [PATCH 7/8] Use `forward` method on the processor context --- examples/pipe/.gitignore | 1 + examples/pipe/README.md | 95 +---------- examples/pipe/deps.edn | 40 +++-- examples/pipe/dev/system.clj | 49 ------ examples/pipe/dev/user.clj | 83 ++++++++++ examples/pipe/resources/logback.xml | 1 + examples/pipe/src/pipe.clj | 153 +++++++++--------- examples/pipe/test/pipe_test.clj | 95 +++++++++-- examples/simple-ledger/dev/user.clj | 2 +- examples/simple-ledger/src/simple_ledger.clj | 26 +-- .../simple-ledger/test/simple_ledger_test.clj | 111 +++++++++++++ examples/xf-word-count/src/xf_word_count.clj | 12 +- .../xf-word-count/test/xf_word_count_test.clj | 10 +- src/jackdaw/streams/xform.clj | 14 +- 14 files changed, 412 insertions(+), 280 deletions(-) create mode 100644 examples/pipe/.gitignore delete mode 100644 examples/pipe/dev/system.clj create mode 100644 examples/pipe/dev/user.clj create mode 120000 examples/pipe/resources/logback.xml create mode 100644 examples/simple-ledger/test/simple_ledger_test.clj diff --git a/examples/pipe/.gitignore b/examples/pipe/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/pipe/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/pipe/README.md b/examples/pipe/README.md index 6ab5aca2..3581230b 100644 --- a/examples/pipe/README.md +++ b/examples/pipe/README.md @@ -1,96 +1,3 @@ # Pipe -This tutorial contains a simple stream processing application using Jackdaw and Kafka Streams. - -## Setting up - -Before starting, it is recommended to install the Confluent Platform CLI which can be obtained from [https://www.confluent.io/download/](https://www.confluent.io/download/). - -To install Clojure: [https://clojure.org/guides/getting_started](https://clojure.org/guides/getting_started). - -## Project structure - -The project structure looks like this: -``` -$ tree pipe -pipe -├── README.md -├── deps.edn -├── dev -│   └── system.clj -├── src -│   └── pipe.clj -└── test - └── pipe_test.clj -``` - -The `deps.edn` file describes the project's dependencies and source paths. - -The `system.clj` file contains functions to start, stop, and reset the app. These are required by the `user` namespace for interactive development and should not be invoked directly. - -The `pipe.clj` file describes the app and topology. Pipe reads from a Kafka topic called "input", logs the key and value, and writes to a Kafka topic called "output": -``` -(defn build-topology - [builder] - (-> (j/kstream builder (topic-config "input")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "output"))) - builder) -``` - -The `pipe_test.clj` file contains a test. - -## Running the app - -Let's get started! Fire up a Clojure REPL and load the `pipe` namespace. Then, start ZooKeeper and Kafka. If these services are already running, you may skip this step: -``` -user> (confluent/start) -INFO zookeeper is up (confluent:288) -INFO kafka is up (confluent:288) -nil -``` - -Now, start the app. -``` -user> (start) -INFO topic 'input' is created (jackdaw.admin.client:288) -INFO topic 'output' is created (jackdaw.admin.client:288) -INFO pipe is up (pipe:288) -{:app #object[org.apache.kafka.streams.KafkaStreams 0x225dcbb9 "org.apache.kafka.streams.KafkaStreams@225dcbb9"]} -``` - -The `user/start` function creates two Kafka topics needed by Pipe and starts it. - -For the full list of topics, type: -``` -user> (get-topics) -#{"output" "__confluent.support.metrics" "input"} -``` - -With the app running, place a new record on the input stream: -``` -user> (publish (topic-config "input") nil "this is a pipe") -INFO {:key nil, :value "this is a pipe"} (pipe:288) -nil -``` -Pipe logs the key and value to the standard output. - -To read from the output stream: -``` -user> (get-keyvals (topic-config "output")) -((nil "this is a pipe")) -``` - -This concludes this tutorial. - -## Interactive development - -For interactive development, reload the file and invoke `user/reset`. These stops the app, deletes topics and internal state using a regex, and recreates the topics and restarts the app. The details are in the `system` namespace. - -## Running tests - -To run tests, load the `pipe-test` namespace and invoke a test runner using your editor, or from the command line: -``` -clj -Atest -``` +This example creates a simple stream processing application using transducers. diff --git a/examples/pipe/deps.edn b/examples/pipe/deps.edn index cd018879..f71d53ce 100644 --- a/examples/pipe/deps.edn +++ b/examples/pipe/deps.edn @@ -1,18 +1,28 @@ -{:deps - {fundingcircle/jackdaw {:mvn/version "0.6.4"} - org.apache.kafka/kafka-streams {:mvn/version "2.1.0"} - org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"} - org.clojure/clojure {:mvn/version "1.10.0"} - org.clojure/tools.logging {:mvn/version "0.4.1"}} +{:paths + ["src" "resources"] - :mvn/repos - {"confluent" {:url "https://packages.confluent.io/maven/"}} - - :paths - ["src" "test" "dev" "../dev"] + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} :aliases - {:test {:extra-deps {com.cognitect/test-runner - {:git/url "https://github.com/cognitect-labs/test-runner.git" - :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} - :main-opts ["-m" "cognitect.test-runner"]}}} + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/pipe/dev/system.clj b/examples/pipe/dev/system.clj deleted file mode 100644 index 083bb361..00000000 --- a/examples/pipe/dev/system.clj +++ /dev/null @@ -1,49 +0,0 @@ -(ns system - "Functions to start and stop the system, used for interactive - development. - The `system/start` and `system/stop` functions are required by the - `user` namespace and should not be called directly." - (:require [clojure.string :as str] - [clojure.tools.logging :as log] - [clojure.java.shell :refer [sh]] - [jackdaw.admin :as ja] - [pipe])) - -(def system nil) - -(defn kafka-admin-client-config - [] - {"bootstrap.servers" "localhost:9092"}) - -(defn create-topics - "Takes a list of topics and creates these using the names given." - [topic-config-list] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (ja/create-topics! client topic-config-list))) - -(defn re-delete-topics - "Takes an instance of java.util.regex.Pattern and deletes any Kafka - topics that match." - [re] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (let [topics-to-delete (->> (ja/list-topics client) - (filter #(re-find re (:topic-name %))))] - (ja/delete-topics! client topics-to-delete)))) - -(defn stop - "Stops the app and deletes topics. - This functions is required by the `user` namespace and should not - be called directly." - [] - (when system - (pipe/stop-app (:app system))) - (re-delete-topics #"(input|output)")) - -(defn start - "Creates topics, and starts the app. - This functions is required by the `user` namespace and should not - be called directly." - [] - (with-out-str (stop)) - (create-topics (map pipe/topic-config ["input" "output"])) - {:app (pipe/start-app (pipe/app-config))}) diff --git a/examples/pipe/dev/user.clj b/examples/pipe/dev/user.clj new file mode 100644 index 00000000..2ebd5b08 --- /dev/null +++ b/examples/pipe/dev/user.clj @@ -0,0 +1,83 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [pipe])) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topics {:client-config (select-keys pipe/streams-config ["bootstrap.servers"]) + :topic-metadata {:input + {:topic-name "input" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}} + + :topology {:topology-builder pipe/topology-builder + :xforms [#'pipe/xf] + :swap-fn jxf/kv-store-swap-fn} + + :app {:streams-config pipe/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata)) + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}] + (let [xform-map (reduce-kv (fn [m k v] + (let [k (keyword (str (:ns (meta v))) + (str (:name (meta v))))] + (assoc m k #(v % jxf/kv-store-swap-fn)))) + {} + xforms) + streams-builder (j/streams-builder)] + ((topology-builder topic-metadata xform-map) streams-builder))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/pipe/resources/logback.xml b/examples/pipe/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/pipe/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/pipe/src/pipe.clj b/examples/pipe/src/pipe.clj index fc4ae956..0fc1af38 100644 --- a/examples/pipe/src/pipe.clj +++ b/examples/pipe/src/pipe.clj @@ -1,95 +1,98 @@ (ns pipe - "This tutorial contains a simple stream processing application using - Jackdaw and Kafka Streams. - - Pipe reads from a Kafka topic called `input`, logs the key and - value, and writes these to a Kafka topic called `output`." + "This example creates a simple stream processing application using transducers." (:gen-class) (:require [clojure.string :as str] [clojure.tools.logging :refer [info]] + [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.serdes.edn :as jse]) - (:import [org.apache.kafka.common.serialization Serdes])) - - -(defn topic-config - "Takes a topic name and returns a topic configuration map, which may - be used to create a topic or produce/consume records." - [topic-name] - {:topic-name topic-name - :partition-count 1 - :replication-factor 1 - :key-serde (jse/serde) - :value-serde (jse/serde)}) - - -(defn app-config - "Returns the application config." - [] - {"application.id" "word-count" - "bootstrap.servers" "localhost:9092" - "cache.max.bytes.buffering" "0"}) + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) -(defn build-topology - "Reads from a Kafka topic called `input`, logs the key and value, - and writes these to a Kafka topic called `output`. Returns a - topology builder." - [builder] - (-> (j/kstream builder (topic-config "input")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "output"))) - builder) - -(defn start-app - "Starts the stream processing application." - [app-config] - (let [builder (j/streams-builder) - topology (build-topology builder) - app (j/kafka-streams topology app-config)] - (j/start app) - (info "pipe is up") - app)) - -(defn stop-app - "Stops the stream processing application." - [app] - (j/close app) - (info "pipe is down")) - - -(defn -main - [& _] - (start-app (app-config))) + +(defn xf + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + next [[k v]]] + (rf result next)))))) (comment - ;;; Evaluate the forms. - ;;; + ;; Use this comment block to explore Pipe using Clojure transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/pipe + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def input [["1" "foo"] ["2" "bar"]]) + + ;; Let's record the entries. Evaluate the form: + (transduce (xf (atom {}) swap!) concat input) - ;; Needed to invoke the forms from this namespace. When typing - ;; directly in the REPL, skip this step. - (require '[user :refer :all :exclude [topic-config]]) + ;; You should see output like the following: + ;; (["1" "foo"] ["2" "bar"]) + ) + + +(def streams-config + {"application.id" "pipe" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + "cache.max.bytes.buffering" "0"}) + +(defn topology-builder + [{:keys [input output] :as topics} xforms] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder input) + (jxf/transduce (::xf xforms)) + (j/to output)) + builder)) + + +(comment + ;; Use this comment block to explore Pipe as a stream processing + ;; application. - ;; Start ZooKeeper and Kafka. - ;; This requires the Confluent Platform CLI which may be obtained - ;; from `https://www.confluent.io/download/`. If ZooKeeper and Kafka - ;; are already running, skip this step. - (confluent/start) + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` - ;; Create the `input` and `output` topics, and start the app. - (start) + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/pipe + ;; clj -A:dev + ;; ``` + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. - ;; Get a list of current topics. - (list-topics) + ;; Evaluate the form: + (reset) + ;; Evaluate the form: + (let [input [["1" "foo"] ["2" "bar"]]] + (doseq [[k v] input] + (publish (:input topic-metadata) k v))) - ;; Write to the input stream. - (publish (topic-config "input") nil "this is a pipe") + ;; Evaluate the form: + (get-keyvals (:output topic-metadata)) + ;; You should see output like the following. - ;; Read from the output stream. - (get-keyvals (topic-config "output"))) + ;; (["1" "foo"] ["2" "bar"]) + ) diff --git a/examples/pipe/test/pipe_test.clj b/examples/pipe/test/pipe_test.clj index 17b48e18..8b014693 100644 --- a/examples/pipe/test/pipe_test.clj +++ b/examples/pipe/test/pipe_test.clj @@ -1,19 +1,86 @@ (ns pipe-test - "This illustrates the use of the TopologyTestDriver and jackdaw.test - to test Kafka Streams topologies." - (:require [pipe :as sut] - [jackdaw.streams.mock :as jsm] - [clojure.test :refer :all])) + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jt.fix] + [pipe]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) +(deftest pipe-unit-test + (let [input [["1" "foo"] ["2" "bar"]] + output (transduce (pipe/xf nil nil) concat input)] + (is (= "foo" (get (into {} output) "1"))) + (is (= "bar" (get (into {} output) "2"))))) -(deftest build-topology-unit-test - (testing "pipe unit test" - (let [driver (jsm/build-driver sut/build-topology) - publish (partial jsm/publish driver) - get-keyvals (partial jsm/get-keyvals driver)] +(def topic-metadata + {:input + {:topic-name "input" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} - (publish (sut/topic-config "input") nil "this is a pipe") + :output + {:topic-name "output" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) - (let [keyvals (get-keyvals (sut/topic-config "output"))] - (is (= 1 (count keyvals))) - (is (= [nil "this is a pipe"] (first keyvals))))))) +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata topic-metadata + :app-config pipe/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (pipe/topology-builder topic-metadata {:pipe/xf #(pipe/xf % jxf/kv-store-swap-fn)})) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(defn mock-transport-config + [] + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) + +(defn done? + [journal] + (= 2 (count (get-in journal [:topics :output])))) + +(def commands + [[:write! :input "foo" {:key-fn (constantly "1")}] + [:write! :input "bar" {:key-fn (constantly "2")}] + [:watch done? {:timeout 2000}]]) + +(defn pipe + [journal k] + (->> (get-in journal [:topics :output]) + (filter (fn [x] (= k (:key x)))) + last + :value)) + +(deftest simple-ledger-end-to-end-test + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + (is (= "foo" (pipe journal "1"))) + (is (= "bar" (pipe journal "2")))))))) diff --git a/examples/simple-ledger/dev/user.clj b/examples/simple-ledger/dev/user.clj index f2f47e8e..b8f8cba3 100644 --- a/examples/simple-ledger/dev/user.clj +++ b/examples/simple-ledger/dev/user.clj @@ -22,7 +22,7 @@ "The development config. When the 'dev' alias is active, this config will be used." {:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"]) - :topic-metadata {:entry-added + :topic-metadata {:entry-pending {:topic-name "entry-pending" :partition-count 15 :replication-factor 1 diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index f6325d6d..c3fae42a 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -63,7 +63,7 @@ ;; `M-x cider-jack-in`. ;; Evaluate the form: - (def coll + (def entries [["1" {:debit-account "tech" :credit-account "cash" :amount 1000}] @@ -72,7 +72,7 @@ :amount 2000}]]) ;; Let's record the entries. Evaluate the form: - (->> coll + (->> entries (transduce (split-entries nil nil) concat) (transduce (running-balances (atom {}) swap!) concat)) @@ -101,7 +101,7 @@ ;; KeyValueStore interface with overrides for get and put." ;; Evaluate the form: - (->> coll + (->> entries (transduce (split-entries nil nil) concat) (transduce (running-balances (fakes/fake-kv-store {}) jxf/kv-store-swap-fn) concat)) @@ -122,9 +122,9 @@ (fn [builder] (jxf/add-state-store! builder) (-> (j/kstream builder entry-pending) - (jxf/transduce-kstream (::split-entries xforms)) + (jxf/transduce (::split-entries xforms)) (j/through transaction-pending) - (jxf/transduce-kstream (::running-balances xforms)) + (jxf/transduce (::running-balances xforms)) (j/to transaction-added)) builder)) @@ -154,14 +154,14 @@ (reset) ;; Evaluate the form: - (let [coll [{:debit-account "tech" - :credit-account "cash" - :amount 1000} - {:debit-account "cash" - :credit-account "sales" - :amount 2000}]] - (doseq [x coll] - (publish (:entry-pending topic-metadata) nil x))) + (let [entries [["1" {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + ["2" {:debit-account "cash" + :credit-account "sales" + :amount 2000}]]] + (doseq [[k v] entries] + (publish (:entry-pending topic-metadata) k v))) ;; Evaluate the form: (get-keyvals (:transaction-added topic-metadata)) diff --git a/examples/simple-ledger/test/simple_ledger_test.clj b/examples/simple-ledger/test/simple_ledger_test.clj new file mode 100644 index 00000000..327178b6 --- /dev/null +++ b/examples/simple-ledger/test/simple_ledger_test.clj @@ -0,0 +1,111 @@ +(ns simple-ledger-test + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jt.fix] + [simple-ledger :as sl]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) + +(deftest simple-ledger-unit-test + (let [entries + [["1" {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + ["2" {:debit-account "cash" + :credit-account "sales" + :amount 2000}]] + transactions (->> entries + (transduce (sl/split-entries nil nil) concat) + (transduce (sl/running-balances (atom {}) swap!) concat))] + (is (= -1000 (:after-balance (get (into {} transactions) "tech")))) + (is (= -1000 (:after-balance (get (into {} transactions) "cash")))) + (is (= 2000 (:after-balance (get (into {} transactions) "sales")))))) + +(def topic-metadata + {:entry-pending + {:topic-name "entry-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-pending + {:topic-name "transaction-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-added + {:topic-name "transaction-added" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) + +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata topic-metadata + :app-config sl/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (sl/topology-builder topic-metadata + {::sl/split-entries #(sl/split-entries % nil) + ::sl/running-balances #(sl/running-balances % jxf/kv-store-swap-fn)})) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(defn mock-transport-config + [] + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) + +(defn done? + [journal] + (= 4 (count (get-in journal [:topics :transaction-added])))) + +(def commands + [[:write! + :entry-pending + {:debit-account "tech" :credit-account "cash" :amount 1000} + {:key-fn (constantly "1")}] + [:write! + :entry-pending + {:debit-account "cash" :credit-account "sales" :amount 2000} + {:key-fn (constantly "2")}] + [:watch done? {:timeout 2000}]]) + +(defn simple-ledger + [journal account-name] + (->> (get-in journal [:topics :transaction-added]) + (filter (fn [x] (= account-name (:account-name (:value x))))) + last + :value)) + +(deftest simple-ledger-end-to-end-test + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + (is (= -1000 (:after-balance (simple-ledger journal "tech")))) + (is (= -1000 (:after-balance (simple-ledger journal "cash")))) + (is (= 2000 (:after-balance (simple-ledger journal "sales"))))))))) diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj index d0339cc9..2e562b71 100644 --- a/examples/xf-word-count/src/xf_word_count.clj +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -90,7 +90,7 @@ (fn [builder] (jxf/add-state-store! builder) (-> (j/kstream builder input) - (jxf/transduce-kstream xf) + (jxf/transduce xf) (j/to output)) builder)) @@ -120,11 +120,11 @@ (reset) ;; Evaluate the form: - (let [coll ["inside every large program" - "is a small program" - "struggling to get out"]] - (doseq [x coll] - (publish (:input topic-metadata) nil x))) + (let [coll [["1" "inside every large program"] + ["2" "is a small program"] + ["3" "struggling to get out"]]] + (doseq [[k v] coll] + (publish (:input topic-metadata) k v))) ;; Evaluate the form: (get-keyvals (:output topic-metadata)) diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj index cfd68388..33fcbfe9 100644 --- a/examples/xf-word-count/test/xf_word_count_test.clj +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -34,7 +34,7 @@ (defn topology-builder [topic-metadata] - (xfwc/topology-builder topic-metadata #(xfwc/xf % jxf/kv-store-swap-fn))) + (xfwc/topology-builder topic-metadata #(xfwc/count-words % jxf/kv-store-swap-fn))) (defn props-for [x] @@ -60,9 +60,9 @@ (= 12 (count (get-in journal [:topics :output])))) (def commands - [[:write! :input "inside every large program" {:key-fn (constantly "")}] - [:write! :input "is a small program" {:key-fn (constantly "")}] - [:write! :input "struggling to get out" {:key-fn (constantly "")}] + [[:write! :input "inside every large program" {:key-fn (constantly "1")}] + [:write! :input "is a small program" {:key-fn (constantly "2")}] + [:write! :input "struggling to get out" {:key-fn (constantly "3")}] [:watch done? {:timeout 2000}]]) (defn word-count @@ -72,7 +72,7 @@ last :value)) -(deftest test-xf-word-count +(deftest xf-word-count-end-to-end-test (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] (jackdaw.test/with-test-machine (test-transport test-config) (fn [machine] diff --git a/src/jackdaw/streams/xform.clj b/src/jackdaw/streams/xform.clj index 9b3cfd9f..bf0c0d93 100644 --- a/src/jackdaw/streams/xform.clj +++ b/src/jackdaw/streams/xform.clj @@ -42,14 +42,12 @@ (init [_ context] (reset! ctx context)) (transform [_ k v] - (let [^KeyValueStore store (.getStateStore @ctx "transducer") - v (first (into [] (xf store) [[k v]]))] - (KeyValue/pair k v))) + (let [^KeyValueStore store (.getStateStore @ctx "transducer")] + (doseq [[result-k result-v] (first (sequence (xf store) [[k v]]))] + (.forward @ctx result-k result-v)))) (close [_])))) -(defn transduce-kstream +(defn transduce [kstream xf] - "Takes a kstream and xf and transduces the stream." - (-> kstream - (j/transform (fn [] (transformer xf)) ["transducer"]) - (j/flat-map (fn [[_ v]] v)))) + "Applies the transducer xf to each element of the kstream." + (j/transform kstream (fn [] (transformer xf)) ["transducer"])) From 9efe7b7e6c93462f0ef8adedde4b888133ce40e0 Mon Sep 17 00:00:00 2001 From: Charles Reese Date: Fri, 13 Dec 2019 11:11:53 -0800 Subject: [PATCH 8/8] Rename var from entry to txn --- examples/simple-ledger/src/simple_ledger.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index c3fae42a..54176a18 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -35,8 +35,8 @@ ([result] (rf result)) ([result input] (let [[k v] input - {:keys [account-name debit-credit-indicator amount] :as entry} v - next (as-> entry % + {:keys [account-name debit-credit-indicator amount] :as txn} v + next (as-> txn % (swap-fn state next-balances %) (select-keys % [account-name]) ((juxt (comp first keys) (comp first vals)) %)