diff --git a/examples/pipe/.gitignore b/examples/pipe/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/pipe/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/pipe/README.md b/examples/pipe/README.md index 6ab5aca2..3581230b 100644 --- a/examples/pipe/README.md +++ b/examples/pipe/README.md @@ -1,96 +1,3 @@ # Pipe -This tutorial contains a simple stream processing application using Jackdaw and Kafka Streams. - -## Setting up - -Before starting, it is recommended to install the Confluent Platform CLI which can be obtained from [https://www.confluent.io/download/](https://www.confluent.io/download/). - -To install Clojure: [https://clojure.org/guides/getting_started](https://clojure.org/guides/getting_started). - -## Project structure - -The project structure looks like this: -``` -$ tree pipe -pipe -├── README.md -├── deps.edn -├── dev -│   └── system.clj -├── src -│   └── pipe.clj -└── test - └── pipe_test.clj -``` - -The `deps.edn` file describes the project's dependencies and source paths. - -The `system.clj` file contains functions to start, stop, and reset the app. These are required by the `user` namespace for interactive development and should not be invoked directly. - -The `pipe.clj` file describes the app and topology. Pipe reads from a Kafka topic called "input", logs the key and value, and writes to a Kafka topic called "output": -``` -(defn build-topology - [builder] - (-> (j/kstream builder (topic-config "input")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "output"))) - builder) -``` - -The `pipe_test.clj` file contains a test. - -## Running the app - -Let's get started! Fire up a Clojure REPL and load the `pipe` namespace. Then, start ZooKeeper and Kafka. If these services are already running, you may skip this step: -``` -user> (confluent/start) -INFO zookeeper is up (confluent:288) -INFO kafka is up (confluent:288) -nil -``` - -Now, start the app. -``` -user> (start) -INFO topic 'input' is created (jackdaw.admin.client:288) -INFO topic 'output' is created (jackdaw.admin.client:288) -INFO pipe is up (pipe:288) -{:app #object[org.apache.kafka.streams.KafkaStreams 0x225dcbb9 "org.apache.kafka.streams.KafkaStreams@225dcbb9"]} -``` - -The `user/start` function creates two Kafka topics needed by Pipe and starts it. - -For the full list of topics, type: -``` -user> (get-topics) -#{"output" "__confluent.support.metrics" "input"} -``` - -With the app running, place a new record on the input stream: -``` -user> (publish (topic-config "input") nil "this is a pipe") -INFO {:key nil, :value "this is a pipe"} (pipe:288) -nil -``` -Pipe logs the key and value to the standard output. - -To read from the output stream: -``` -user> (get-keyvals (topic-config "output")) -((nil "this is a pipe")) -``` - -This concludes this tutorial. - -## Interactive development - -For interactive development, reload the file and invoke `user/reset`. These stops the app, deletes topics and internal state using a regex, and recreates the topics and restarts the app. The details are in the `system` namespace. - -## Running tests - -To run tests, load the `pipe-test` namespace and invoke a test runner using your editor, or from the command line: -``` -clj -Atest -``` +This example creates a simple stream processing application using transducers. diff --git a/examples/pipe/deps.edn b/examples/pipe/deps.edn index cd018879..f71d53ce 100644 --- a/examples/pipe/deps.edn +++ b/examples/pipe/deps.edn @@ -1,18 +1,28 @@ -{:deps - {fundingcircle/jackdaw {:mvn/version "0.6.4"} - org.apache.kafka/kafka-streams {:mvn/version "2.1.0"} - org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"} - org.clojure/clojure {:mvn/version "1.10.0"} - org.clojure/tools.logging {:mvn/version "0.4.1"}} +{:paths + ["src" "resources"] - :mvn/repos - {"confluent" {:url "https://packages.confluent.io/maven/"}} - - :paths - ["src" "test" "dev" "../dev"] + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} :aliases - {:test {:extra-deps {com.cognitect/test-runner - {:git/url "https://github.com/cognitect-labs/test-runner.git" - :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} - :main-opts ["-m" "cognitect.test-runner"]}}} + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/pipe/dev/system.clj b/examples/pipe/dev/system.clj deleted file mode 100644 index 083bb361..00000000 --- a/examples/pipe/dev/system.clj +++ /dev/null @@ -1,49 +0,0 @@ -(ns system - "Functions to start and stop the system, used for interactive - development. - The `system/start` and `system/stop` functions are required by the - `user` namespace and should not be called directly." - (:require [clojure.string :as str] - [clojure.tools.logging :as log] - [clojure.java.shell :refer [sh]] - [jackdaw.admin :as ja] - [pipe])) - -(def system nil) - -(defn kafka-admin-client-config - [] - {"bootstrap.servers" "localhost:9092"}) - -(defn create-topics - "Takes a list of topics and creates these using the names given." - [topic-config-list] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (ja/create-topics! client topic-config-list))) - -(defn re-delete-topics - "Takes an instance of java.util.regex.Pattern and deletes any Kafka - topics that match." - [re] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (let [topics-to-delete (->> (ja/list-topics client) - (filter #(re-find re (:topic-name %))))] - (ja/delete-topics! client topics-to-delete)))) - -(defn stop - "Stops the app and deletes topics. - This functions is required by the `user` namespace and should not - be called directly." - [] - (when system - (pipe/stop-app (:app system))) - (re-delete-topics #"(input|output)")) - -(defn start - "Creates topics, and starts the app. - This functions is required by the `user` namespace and should not - be called directly." - [] - (with-out-str (stop)) - (create-topics (map pipe/topic-config ["input" "output"])) - {:app (pipe/start-app (pipe/app-config))}) diff --git a/examples/pipe/dev/user.clj b/examples/pipe/dev/user.clj new file mode 100644 index 00000000..2ebd5b08 --- /dev/null +++ b/examples/pipe/dev/user.clj @@ -0,0 +1,83 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [pipe])) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topics {:client-config (select-keys pipe/streams-config ["bootstrap.servers"]) + :topic-metadata {:input + {:topic-name "input" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}} + + :topology {:topology-builder pipe/topology-builder + :xforms [#'pipe/xf] + :swap-fn jxf/kv-store-swap-fn} + + :app {:streams-config pipe/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata)) + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}] + (let [xform-map (reduce-kv (fn [m k v] + (let [k (keyword (str (:ns (meta v))) + (str (:name (meta v))))] + (assoc m k #(v % jxf/kv-store-swap-fn)))) + {} + xforms) + streams-builder (j/streams-builder)] + ((topology-builder topic-metadata xform-map) streams-builder))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/pipe/resources/logback.xml b/examples/pipe/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/pipe/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/pipe/src/pipe.clj b/examples/pipe/src/pipe.clj index fc4ae956..0fc1af38 100644 --- a/examples/pipe/src/pipe.clj +++ b/examples/pipe/src/pipe.clj @@ -1,95 +1,98 @@ (ns pipe - "This tutorial contains a simple stream processing application using - Jackdaw and Kafka Streams. - - Pipe reads from a Kafka topic called `input`, logs the key and - value, and writes these to a Kafka topic called `output`." + "This example creates a simple stream processing application using transducers." (:gen-class) (:require [clojure.string :as str] [clojure.tools.logging :refer [info]] + [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.serdes.edn :as jse]) - (:import [org.apache.kafka.common.serialization Serdes])) - - -(defn topic-config - "Takes a topic name and returns a topic configuration map, which may - be used to create a topic or produce/consume records." - [topic-name] - {:topic-name topic-name - :partition-count 1 - :replication-factor 1 - :key-serde (jse/serde) - :value-serde (jse/serde)}) - - -(defn app-config - "Returns the application config." - [] - {"application.id" "word-count" - "bootstrap.servers" "localhost:9092" - "cache.max.bytes.buffering" "0"}) + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) -(defn build-topology - "Reads from a Kafka topic called `input`, logs the key and value, - and writes these to a Kafka topic called `output`. Returns a - topology builder." - [builder] - (-> (j/kstream builder (topic-config "input")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "output"))) - builder) - -(defn start-app - "Starts the stream processing application." - [app-config] - (let [builder (j/streams-builder) - topology (build-topology builder) - app (j/kafka-streams topology app-config)] - (j/start app) - (info "pipe is up") - app)) - -(defn stop-app - "Stops the stream processing application." - [app] - (j/close app) - (info "pipe is down")) - - -(defn -main - [& _] - (start-app (app-config))) + +(defn xf + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + next [[k v]]] + (rf result next)))))) (comment - ;;; Evaluate the forms. - ;;; + ;; Use this comment block to explore Pipe using Clojure transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/pipe + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def input [["1" "foo"] ["2" "bar"]]) + + ;; Let's record the entries. Evaluate the form: + (transduce (xf (atom {}) swap!) concat input) - ;; Needed to invoke the forms from this namespace. When typing - ;; directly in the REPL, skip this step. - (require '[user :refer :all :exclude [topic-config]]) + ;; You should see output like the following: + ;; (["1" "foo"] ["2" "bar"]) + ) + + +(def streams-config + {"application.id" "pipe" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + "cache.max.bytes.buffering" "0"}) + +(defn topology-builder + [{:keys [input output] :as topics} xforms] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder input) + (jxf/transduce (::xf xforms)) + (j/to output)) + builder)) + + +(comment + ;; Use this comment block to explore Pipe as a stream processing + ;; application. - ;; Start ZooKeeper and Kafka. - ;; This requires the Confluent Platform CLI which may be obtained - ;; from `https://www.confluent.io/download/`. If ZooKeeper and Kafka - ;; are already running, skip this step. - (confluent/start) + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` - ;; Create the `input` and `output` topics, and start the app. - (start) + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/pipe + ;; clj -A:dev + ;; ``` + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. - ;; Get a list of current topics. - (list-topics) + ;; Evaluate the form: + (reset) + ;; Evaluate the form: + (let [input [["1" "foo"] ["2" "bar"]]] + (doseq [[k v] input] + (publish (:input topic-metadata) k v))) - ;; Write to the input stream. - (publish (topic-config "input") nil "this is a pipe") + ;; Evaluate the form: + (get-keyvals (:output topic-metadata)) + ;; You should see output like the following. - ;; Read from the output stream. - (get-keyvals (topic-config "output"))) + ;; (["1" "foo"] ["2" "bar"]) + ) diff --git a/examples/pipe/test/pipe_test.clj b/examples/pipe/test/pipe_test.clj index 17b48e18..8b014693 100644 --- a/examples/pipe/test/pipe_test.clj +++ b/examples/pipe/test/pipe_test.clj @@ -1,19 +1,86 @@ (ns pipe-test - "This illustrates the use of the TopologyTestDriver and jackdaw.test - to test Kafka Streams topologies." - (:require [pipe :as sut] - [jackdaw.streams.mock :as jsm] - [clojure.test :refer :all])) + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jt.fix] + [pipe]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) +(deftest pipe-unit-test + (let [input [["1" "foo"] ["2" "bar"]] + output (transduce (pipe/xf nil nil) concat input)] + (is (= "foo" (get (into {} output) "1"))) + (is (= "bar" (get (into {} output) "2"))))) -(deftest build-topology-unit-test - (testing "pipe unit test" - (let [driver (jsm/build-driver sut/build-topology) - publish (partial jsm/publish driver) - get-keyvals (partial jsm/get-keyvals driver)] +(def topic-metadata + {:input + {:topic-name "input" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} - (publish (sut/topic-config "input") nil "this is a pipe") + :output + {:topic-name "output" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) - (let [keyvals (get-keyvals (sut/topic-config "output"))] - (is (= 1 (count keyvals))) - (is (= [nil "this is a pipe"] (first keyvals))))))) +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata topic-metadata + :app-config pipe/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (pipe/topology-builder topic-metadata {:pipe/xf #(pipe/xf % jxf/kv-store-swap-fn)})) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(defn mock-transport-config + [] + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) + +(defn done? + [journal] + (= 2 (count (get-in journal [:topics :output])))) + +(def commands + [[:write! :input "foo" {:key-fn (constantly "1")}] + [:write! :input "bar" {:key-fn (constantly "2")}] + [:watch done? {:timeout 2000}]]) + +(defn pipe + [journal k] + (->> (get-in journal [:topics :output]) + (filter (fn [x] (= k (:key x)))) + last + :value)) + +(deftest simple-ledger-end-to-end-test + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + (is (= "foo" (pipe journal "1"))) + (is (= "bar" (pipe journal "2")))))))) diff --git a/examples/simple-ledger/.gitignore b/examples/simple-ledger/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/simple-ledger/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/simple-ledger/README.md b/examples/simple-ledger/README.md new file mode 100644 index 00000000..6984287b --- /dev/null +++ b/examples/simple-ledger/README.md @@ -0,0 +1,3 @@ +# Simple Ledger + +This example creates a simple accounting ledger using transducers. diff --git a/examples/simple-ledger/deps.edn b/examples/simple-ledger/deps.edn index 2fb64673..f71d53ce 100644 --- a/examples/simple-ledger/deps.edn +++ b/examples/simple-ledger/deps.edn @@ -1,19 +1,28 @@ -{:deps - {danlentz/clj-uuid {:mvn/version "0.1.7"} - fundingcircle/jackdaw {:mvn/version "0.6.4"} - org.apache.kafka/kafka-streams {:mvn/version "2.1.0"} - org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"} - org.clojure/clojure {:mvn/version "1.10.0"} - org.clojure/tools.logging {:mvn/version "0.4.1"}} +{:paths + ["src" "resources"] - :mvn/repos - {"confluent" {:url "https://packages.confluent.io/maven/"}} - - :paths - ["src" "test" "dev" "../dev"] + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} :aliases - {:test {:extra-deps {com.cognitect/test-runner - {:git/url "https://github.com/cognitect-labs/test-runner.git" - :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} - :main-opts ["-m" "cognitect.test-runner"]}}} + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/simple-ledger/dev/system.clj b/examples/simple-ledger/dev/system.clj deleted file mode 100644 index 4e6a80bb..00000000 --- a/examples/simple-ledger/dev/system.clj +++ /dev/null @@ -1,68 +0,0 @@ -(ns system - "Functions to start and stop the system, used for interactive - development. - The `system/start` and `system/stop` functions are required by the - `user` namespace and should not be called directly." - (:require [clojure.tools.logging :as log] - [clojure.java.shell :refer [sh]] - [jackdaw.admin :as ja] - [simple-ledger])) - -(def system nil) - -(defn kafka-admin-client-config - [] - {"bootstrap.servers" "localhost:9092"}) - -(defn create-topics - "Takes a list of topics and creates these using the names given." - [topic-config-list] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (ja/create-topics! client topic-config-list))) - -(defn re-delete-topics - "Takes an instance of java.util.regex.Pattern and deletes any Kafka - topics that match." - [re] - (with-open [client (ja/->AdminClient (kafka-admin-client-config))] - (let [topics-to-delete (->> (ja/list-topics client) - (filter #(re-find re (:topic-name %))))] - (ja/delete-topics! client topics-to-delete)))) - -(defn application-id - "Takes an application config and returns an `application.id`." - [app-config] - (get app-config "application.id")) - -(defn destroy-state-stores - "Takes an application config and deletes local files associated with - internal state." - [app-config] - (sh "rm" "-rf" (str "/tmp/kafka-streams/" - (application-id app-config))) - (log/info "internal state is deleted")) - -(defn stop - "Stops the app, and deletes topics and internal state. - This functions is required by the `user` namespace and should not - be called directly." - [] - (when system - (simple-ledger/stop-app (:app system))) - (let [app-id (application-id (simple-ledger/app-config)) - re (re-pattern (str "(ledger-entries-requested" - "|ledger-transaction-added" - "|" app-id - ".*)"))] - (re-delete-topics re) - (destroy-state-stores (simple-ledger/app-config)))) - -(defn start - "Creates topics, and starts the app. - This functions is required by the `user` namespace and should not - be called directly." - [] - (with-out-str (stop)) - (create-topics (map simple-ledger/topic-config ["ledger-entries-requested" - "ledger-transaction-added"])) - {:app (simple-ledger/start-app (simple-ledger/app-config))}) diff --git a/examples/simple-ledger/dev/user.clj b/examples/simple-ledger/dev/user.clj new file mode 100644 index 00000000..b8f8cba3 --- /dev/null +++ b/examples/simple-ledger/dev/user.clj @@ -0,0 +1,90 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [simple-ledger :as sl])) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"]) + :topic-metadata {:entry-pending + {:topic-name "entry-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-pending + {:topic-name "transaction-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :transaction-added + {:topic-name "transaction-added" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}} + + :topology {:topology-builder sl/topology-builder + :xforms [#'sl/split-entries #'sl/running-balances] + :swap-fn jxf/kv-store-swap-fn} + + :app {:streams-config sl/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata)) + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}] + (let [xform-map (reduce-kv (fn [m k v] + (let [k (keyword (str (:ns (meta v))) + (str (:name (meta v))))] + (assoc m k #(v % jxf/kv-store-swap-fn)))) + {} + xforms) + streams-builder (j/streams-builder)] + ((topology-builder topic-metadata xform-map) streams-builder))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/simple-ledger/resources/logback.xml b/examples/simple-ledger/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/simple-ledger/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/simple-ledger/src/simple_ledger.clj b/examples/simple-ledger/src/simple_ledger.clj index 4f5a3d60..54176a18 100644 --- a/examples/simple-ledger/src/simple_ledger.clj +++ b/examples/simple-ledger/src/simple_ledger.clj @@ -1,414 +1,228 @@ (ns simple-ledger - "This tutorial contains a simple stream processing application using - Jackdaw and Kafka Streams. - - It begins with Pipe which is then extended using an interactive - workflow. The result is a simple ledger." + "This example creates a simple accounting ledger using transducers." (:gen-class) - (:require [clojure.spec.alpha :as s] + (:require [clojure.string :as str] [clojure.tools.logging :refer [info]] - [clj-uuid :as uuid] + [jackdaw.serdes :as js] [jackdaw.streams :as j] - [jackdaw.serdes.edn :as jse]) - (:import [org.apache.kafka.common.serialization Serdes])) - - -;;; Topic Configuration -;;; - -(defn topic-config - "Takes a topic name and (optionally) a key and value serde and - returns a topic configuration map, which may be used to create a - topic or produce/consume records." - ([topic-name] - (topic-config topic-name (jse/serde) (jse/serde))) + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) + + +(defn split-entries + [_ _] + (map (fn [[k {:keys [debit-account credit-account amount] :as entry}]] + [[debit-account + {:account-name debit-account + :debit-credit-indicator :dr + :amount amount}] + [credit-account + {:account-name credit-account + :debit-credit-indicator :cr + :amount amount}]]))) + +(defn next-balances + [starting-balances {:keys [account-name debit-credit-indicator amount] + :as transaction}] + (let [op (if (= :dr debit-credit-indicator) - +)] + (update starting-balances account-name (fnil op 0) amount))) + +(defn running-balances + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + {:keys [account-name debit-credit-indicator amount] :as txn} v + next (as-> txn % + (swap-fn state next-balances %) + (select-keys % [account-name]) + ((juxt (comp first keys) (comp first vals)) %) + (zipmap [:account-name :after-balance] %) + (assoc % :before-balance (if (= :dr debit-credit-indicator) + (+ amount (:after-balance %)) + (- amount (:after-balance %)))) + (vector k %) + (vector %))] + (rf result next)))))) - ([topic-name key-serde value-serde] - {:topic-name topic-name - :partition-count 1 - :replication-factor 1 - :topic-config {} - :key-serde key-serde - :value-serde value-serde})) - -;;; App Template -;;; - -(defn app-config - "Returns the application config." - [] +(comment + ;; Use this comment block to explore the Simple Ledger using Clojure + ;; transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/simple-ledger + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def entries + [["1" {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + ["2" {:debit-account "cash" + :credit-account "sales" + :amount 2000}]]) + + ;; Let's record the entries. Evaluate the form: + (->> entries + (transduce (split-entries nil nil) concat) + (transduce (running-balances (atom {}) swap!) concat)) + + ;; You should see output like the following: + + ;; (["tech" + ;; {:account-name "tech" + ;; :before-balance 0 + ;; :after-balance -1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :before-balance 0 + ;; :after-balance 1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :before-balance 1000 + ;; :after-balance -1000}] + ;; ["sales" + ;; {:account-name "sales" + ;; :before-balance 0 + ;; :after-balance 2000}]) + + + ;; This time, let's count the words using + ;; `jackdaw.streams.xform.fakes/fake-kv-store` which implements the + ;; KeyValueStore interface with overrides for get and put." + + ;; Evaluate the form: + (->> entries + (transduce (split-entries nil nil) concat) + (transduce (running-balances (fakes/fake-kv-store {}) + jxf/kv-store-swap-fn) concat)) + + ;; You should see the same output. + ) + + +(def streams-config {"application.id" "simple-ledger" - "bootstrap.servers" "localhost:9092" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") "cache.max.bytes.buffering" "0"}) -(defn build-topology - "Returns a topology builder. - - WARNING: This is just a stub. Before publishing to the input topic, - evaluate one of the `build-topology` functions in the comment forms - below." - [builder] - (-> (j/kstream builder (topic-config "ledger-entries-requested")) - (j/peek (fn [[k v]] - (info (str {:key k :value v})))) - (j/to (topic-config "ledger-transaction-added"))) - builder) - -(defn start-app - "Starts the stream processing application." - [app-config] - (let [builder (j/streams-builder) - topology (build-topology builder) - app (j/kafka-streams topology app-config)] - (j/start app) - (info "simple-ledger is up") - app)) - -(defn stop-app - "Stops the stream processing application." - [app] - (j/close app) - (info "simple-ledger is down")) - - -(defn -main - [& _] - (start-app (app-config))) - - -(comment - ;;; Start - ;;; - - ;; Needed to invoke the forms from this namespace. When typing - ;; directly in the REPL, skip this step. - (require '[user :refer :all :exclude [topic-config]]) - - - ;; Start ZooKeeper and Kafka. - ;; This requires the Confluent Platform CLI which may be obtained - ;; from `https://www.confluent.io/download/`. If ZooKeeper and Kafka - ;; are already running, skip this step. - (confluent/start) - - - ;; Create the topics, and start the app. - (start) - - - ;; Write to the input stream. - (publish (topic-config "ledger-entries-requested") - nil - "this is a pipe") - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added"))) +(defn topology-builder + [{:keys [entry-pending + transaction-pending + transaction-added] :as topics} xforms] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder entry-pending) + (jxf/transduce (::split-entries xforms)) + (j/through transaction-pending) + (jxf/transduce (::running-balances xforms)) + (j/to transaction-added)) + builder)) (comment - ;;; Add input validation - ;;; - - (do - - (s/def ::ledger-entries-requested-value - (s/keys :req-un [::id - ::entries])) - - (s/def ::id uuid?) - (s/def ::entries (s/+ ::entry)) - - (s/def ::entry - (s/and (s/keys :req-un [::debit-account-name - ::credit-account-name - ::amount]) - #(not= (:debit-account-name %) - (:credit-account-name %)))) - - (s/def ::debit-account-name string?) - (s/def ::credit-account-name string?) - (s/def ::amount pos-int?)) - - - (do - - (defn valid-input? - [[_ v]] - (s/valid? ::ledger-entries-requested-value v)) - + ;; Use this comment block to explore the Simple Ledger as a stream + ;; processing application. - (defn log-bad-input - [[k v]] - (info (str "Bad input: " - (s/explain-data ::ledger-entries-requested-value v)))) + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/simple-ledger + ;; clj -A:dev + ;; ``` - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input)] - - (-> valid - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Reset the app. + ;; Evaluate the form: (reset) - - ;; Write invalid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "foo" - :amount 10}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added"))) - - -(comment - ;;; Split `ledger-entries-requested` events into ledger - ;;; transactions (aka debits and credits). - ;;; - - (do - - (defn entry-sides - [{:keys [debit-account-name - credit-account-name - amount]}] - [[debit-account-name - {:account-name debit-account-name - :amount (- amount)}] - [credit-account-name - {:account-name credit-account-name - :amount amount}]]) - - - (defn entries->transactions - [[_ v]] - (reduce #(concat %1 (entry-sides %2)) [] (:entries v))) - - - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) - - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input) - - transactions (j/flat-map valid entries->transactions)] - - (-> transactions - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added"))) - - -(comment - ;;; Add unique identifiers and reference IDs. - ;;; - - (defn entries->transactions - [[_ v]] - (->> (:entries v) - (reduce #(concat %1 (entry-sides %2)) []) - (map-indexed #(assoc-in %2 [1 :id] (uuid/v5 (:id v) %1))) - (map #(assoc-in %1 [1 :causation-id] (:id v))))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Read from the output stream ("foo" only). - (->> (get-keyvals (topic-config "ledger-transaction-added")) - (filter (fn [[k v]] (= "foo" k))))) - - -(comment - ;;; Keep track of running balances. - ;;; - - (do - - (defn account-balance-reducer - [x y] - (let [starting-balance (:current-balance x)] - (merge y {:starting-balance starting-balance - :current-balance (+ starting-balance (:amount y))}))) - - - (defn build-topology - [builder] - (let [input (j/kstream builder - (topic-config "ledger-entries-requested")) - - [valid invalid] (j/branch input [valid-input? - (constantly true)]) - - _ (j/peek invalid log-bad-input) - - transactions (j/flat-map valid entries->transactions) - - balances - (-> transactions - (j/map-values (fn [v] - (merge v - {:starting-balance 0 - :current-balance (:amount v)}))) - (j/group-by-key (topic-config nil (Serdes/String) - (jse/serde))) - (j/reduce account-balance-reducer - (topic-config "balances")))] - - (-> balances - (j/to-kstream) - (j/to (topic-config "ledger-transaction-added"))) - - builder))) - - - ;; Reset the app. - (reset) - - - ;; Write valid input. - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream. - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Write valid input (again). - (publish (topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 10} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 20}]}) - - - ;; Read from the output stream (again). - (get-keyvals (topic-config "ledger-transaction-added")) - - - ;; Read from the output stream ("foo" only). - (->> (get-keyvals (topic-config "ledger-transaction-added")) - (filter (fn [[k v]] (= "foo" k))))) + ;; Evaluate the form: + (let [entries [["1" {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + ["2" {:debit-account "cash" + :credit-account "sales" + :amount 2000}]]] + (doseq [[k v] entries] + (publish (:entry-pending topic-metadata) k v))) + + ;; Evaluate the form: + (get-keyvals (:transaction-added topic-metadata)) + + ;; You should see output like the following. Notice transaction + ;; order is not preserved: + + ;; (["sales" + ;; {:account-name "sales" + ;; :before-balance 0 + ;; :after-balance 2000}] + ;; ["tech" + ;; {:account-name "tech" + ;; :before-balance 0 + ;; :after-balance -1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :before-balance 0 + ;; :after-balance 1000}] + ;; ["cash" + ;; {:account-name "cash" + ;; :before-balance 1000 + ;; :after-balance -1000}]) + + + ;; The `transaction-added` topic has 15 partitions. Let's see how + ;; the records distributed. Evaluate the form: + (->> (get-records (:transaction-added topic-metadata)) + (map (fn [x] + (select-keys x [:key :offset :partition :value])))) + + ;; You should see output like the following. The balances are spread + ;; across partitions 0, 11, and 14. Transaction order is preserved + ;; only for each account. There is no global order. + + ;; ({:key "sales" + ;; :offset 0 + ;; :partition 0 + ;; :value + ;; {:account-name "sales" + ;; :before-balance 0 + ;; :after-balance 2000}} + ;; {:key "tech" + ;; :offset 0 + ;; :partition 11 + ;; :value + ;; {:account-name "tech" + ;; :before-balance 0 + ;; :after-balance -1000}} + ;; {:key "cash" + ;; :offset 0 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :before-balance 0 + ;; :after-balance 1000}} + ;; {:key "cash" + ;; :offset 1 + ;; :partition 14 + ;; :value + ;; {:account-name "cash" + ;; :before-balance 1000 + ;; :after-balance -1000}}) + ) diff --git a/examples/simple-ledger/test/simple_ledger_test.clj b/examples/simple-ledger/test/simple_ledger_test.clj index e497e483..327178b6 100644 --- a/examples/simple-ledger/test/simple_ledger_test.clj +++ b/examples/simple-ledger/test/simple_ledger_test.clj @@ -1,64 +1,111 @@ (ns simple-ledger-test - (:require [simple-ledger :as sut] - [jackdaw.streams.mock :as jsm] - [clojure.test :refer :all])) + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jt.fix] + [simple-ledger :as sl]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) +(deftest simple-ledger-unit-test + (let [entries + [["1" {:debit-account "tech" + :credit-account "cash" + :amount 1000}] + ["2" {:debit-account "cash" + :credit-account "sales" + :amount 2000}]] + transactions (->> entries + (transduce (sl/split-entries nil nil) concat) + (transduce (sl/running-balances (atom {}) swap!) concat))] + (is (= -1000 (:after-balance (get (into {} transactions) "tech")))) + (is (= -1000 (:after-balance (get (into {} transactions) "cash")))) + (is (= 2000 (:after-balance (get (into {} transactions) "sales")))))) -(deftest build-topology-unit-test - (testing "simple ledger unit test" - (let [driver (jsm/build-driver sut/build-topology) - publish (partial jsm/publish driver) - get-keyvals (partial jsm/get-keyvals driver)] +(def topic-metadata + {:entry-pending + {:topic-name "entry-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} - (publish (sut/topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 2} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 3}]}) + :transaction-pending + {:topic-name "transaction-pending" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} - (publish (sut/topic-config "ledger-entries-requested") - nil - {:id (java.util.UUID/randomUUID) - :entries [{:debit-account-name "foo" - :credit-account-name "bar" - :amount 5} - {:debit-account-name "foo" - :credit-account-name "qux" - :amount 7}]}) + :transaction-added + {:topic-name "transaction-added" + :partition-count 15 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) - (let [keyvals (get-keyvals - (sut/topic-config "ledger-transaction-added"))] +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata topic-metadata + :app-config sl/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) - (is (= [-2 -3 -5 -7] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :amount)))) +(defn topology-builder + [topic-metadata] + (sl/topology-builder topic-metadata + {::sl/split-entries #(sl/split-entries % nil) + ::sl/running-balances #(sl/running-balances % jxf/kv-store-swap-fn)})) - (is (= [0 -2 -5 -10] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :starting-balance)))) +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) - (is (= [-2 -5 -10 -17] - (->> (filter (fn [[k _]] (= "foo" k)) keyvals) - (map second) - (map :current-balance)))) +(defn mock-transport-config + [] + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) + (props-for (:app-config test-config))))}) - (is (= [2 5] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :amount)))) +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) - (is (= [0 2] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :starting-balance)))) +(defn done? + [journal] + (= 4 (count (get-in journal [:topics :transaction-added])))) - (is (= [2 7] - (->> (filter (fn [[k _]] (= "bar" k)) keyvals) - (map second) - (map :current-balance)))))))) +(def commands + [[:write! + :entry-pending + {:debit-account "tech" :credit-account "cash" :amount 1000} + {:key-fn (constantly "1")}] + [:write! + :entry-pending + {:debit-account "cash" :credit-account "sales" :amount 2000} + {:key-fn (constantly "2")}] + [:watch done? {:timeout 2000}]]) + +(defn simple-ledger + [journal account-name] + (->> (get-in journal [:topics :transaction-added]) + (filter (fn [x] (= account-name (:account-name (:value x))))) + last + :value)) + +(deftest simple-ledger-end-to-end-test + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + (is (= -1000 (:after-balance (simple-ledger journal "tech")))) + (is (= -1000 (:after-balance (simple-ledger journal "cash")))) + (is (= 2000 (:after-balance (simple-ledger journal "sales"))))))))) diff --git a/examples/word-count/dev/user.clj b/examples/word-count/dev/user.clj index 309a58b8..25722d52 100644 --- a/examples/word-count/dev/user.clj +++ b/examples/word-count/dev/user.clj @@ -23,6 +23,9 @@ :topics (ig/ref :topics)}}) +(integrant.repl/set-prep! (constantly repl-config)) + + (defmethod ig/init-key :topology [_ {:keys [topology-builder]}] (let [streams-builder (j/streams-builder)] ((topology-builder topic-metadata) streams-builder))) @@ -49,10 +52,6 @@ (defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] (j/close streams-app) - ;; BUG: Does not delete state on reset! (destroy-state-stores streams-config) (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] (re-delete-topics (:client-config topics) re))) - - -(integrant.repl/set-prep! (constantly repl-config)) diff --git a/examples/xf-word-count/.gitignore b/examples/xf-word-count/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/xf-word-count/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/xf-word-count/README.md b/examples/xf-word-count/README.md new file mode 100644 index 00000000..e6871924 --- /dev/null +++ b/examples/xf-word-count/README.md @@ -0,0 +1,3 @@ +# XF Word Count + +This is the classic 'word count' example done using transducers. diff --git a/examples/xf-word-count/deps.edn b/examples/xf-word-count/deps.edn new file mode 100644 index 00000000..f71d53ce --- /dev/null +++ b/examples/xf-word-count/deps.edn @@ -0,0 +1,28 @@ +{:paths + ["src" "resources"] + + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} + + :aliases + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj new file mode 100644 index 00000000..6b5c4312 --- /dev/null +++ b/examples/xf-word-count/dev/user.clj @@ -0,0 +1,69 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [xf-word-count :as xfwc])) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topology {:topology-builder xfwc/topology-builder + :xform xfwc/count-words + :swap-fn jxf/kv-store-swap-fn} + + :topics {:streams-config xfwc/streams-config + :client-config (select-keys xfwc/streams-config + ["bootstrap.servers"]) + :topology (ig/ref :topology)} + + :app {:streams-config xfwc/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(integrant.repl/set-prep! (constantly repl-config)) + + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] + (let [streams-builder (j/streams-builder)] + ((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) + +(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology] + :as opts}] + (let [topic-metadata (topology->topic-metadata topology streams-config)] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) diff --git a/examples/xf-word-count/resources/logback.xml b/examples/xf-word-count/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/xf-word-count/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj new file mode 100644 index 00000000..2e562b71 --- /dev/null +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -0,0 +1,146 @@ +(ns xf-word-count + "This is the classic 'word count' example done using transducers." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [jackdaw.streams.xform.fakes :as fakes])) + + +(defn running-total + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [[k v] input + next (as-> v % + (swap-fn state #(merge-with (fnil + 0) %1 %2) %) + (select-keys % (keys v)) + (map vec %))] + (rf result next)))))) + +(defn count-words + [state swap-fn] + (comp + (map (fn [[k v]] [k (str/split v #" ")])) + (map (fn [[k v]] [k (frequencies v)])) + (running-total state swap-fn))) + + +(comment + ;; Use this comment block to explore Word Count using Clojure + ;; transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def coll + [["1" "inside every large program"] + ["2" "is a small program"] + ["3" "struggling to get out"]]) + + ;; Let's counts the words. Evaluate the form: + (transduce (count-words (atom {}) swap!) concat coll) + + ;; You should see output like the following: + + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + + + ;; This time, let's count the words using + ;; `jackdaw.streams.xform.fakes/fake-kv-store` which implements the + ;; KeyValueStore interface with overrides for get and put." + + ;; Evaluate the form: + (transduce (count-words (fakes/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) + + ;; You should see the same output. + ) + + +(def streams-config + {"application.id" "xf-word-count" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + "cache.max.bytes.buffering" "0"}) + +(defn topology-builder + [{:keys [input output] :as topics} xf] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder input) + (jxf/transduce xf) + (j/to output)) + builder)) + + +(comment + ;; Use this comment block to explore Word Count as a stream + ;; processing application. + + ;; For more details on dynamic development, see the comment block in + ;; /examples/word-count/src/word_count.clj + + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (reset) + + ;; Evaluate the form: + (let [coll [["1" "inside every large program"] + ["2" "is a small program"] + ["3" "struggling to get out"]]] + (doseq [[k v] coll] + (publish (:input topic-metadata) k v))) + + ;; Evaluate the form: + (get-keyvals (:output topic-metadata)) + + ;; You should see output like the following: + + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + ) diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj new file mode 100644 index 00000000..33fcbfe9 --- /dev/null +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -0,0 +1,82 @@ +(ns xf-word-count-test + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.streams.xform :as jxf] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jt.fix] + [xf-word-count :as xfwc]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) + +(def topic-metadata + {:input + {:topic-name "input" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}}) + +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata topic-metadata + :app-config xfwc/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (xfwc/topology-builder topic-metadata #(xfwc/count-words % jxf/kv-store-swap-fn))) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(defn mock-transport-config + [] + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (jsp/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(defn test-transport + [{:keys [topic-metadata] :as test-config}] + (jt/mock-transport (mock-transport-config) topic-metadata)) + +(defn done? + [journal] + (= 12 (count (get-in journal [:topics :output])))) + +(def commands + [[:write! :input "inside every large program" {:key-fn (constantly "1")}] + [:write! :input "is a small program" {:key-fn (constantly "2")}] + [:write! :input "struggling to get out" {:key-fn (constantly "3")}] + [:watch done? {:timeout 2000}]]) + +(defn word-count + [journal word] + (->> (get-in journal [:topics :output]) + (filter (fn [x] (= word (:key x)))) + last + :value)) + +(deftest xf-word-count-end-to-end-test + (jt.fix/with-fixtures [(jt.fix/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine (test-transport test-config) + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + + (is (= 1 (word-count journal "large"))) + (is (= 2 (word-count journal "program")))))))) diff --git a/src/jackdaw/streams/xform.clj b/src/jackdaw/streams/xform.clj new file mode 100644 index 00000000..bf0c0d93 --- /dev/null +++ b/src/jackdaw/streams/xform.clj @@ -0,0 +1,53 @@ +(ns jackdaw.streams.xform + "Helper functions for working with transducers." + (:gen-class) + (:require [jackdaw.serdes :as js] + [jackdaw.streams :as j]) + (:import org.apache.kafka.streams.kstream.Transformer + org.apache.kafka.streams.KeyValue + [org.apache.kafka.streams.state KeyValueStore Stores] + org.apache.kafka.streams.StreamsBuilder)) + +(defn kv-store-swap-fn + "Takes an instance of KeyValueStore, a function f, and map m, and + updates the store in a manner similar to `clojure.core/swap!`." + [^KeyValueStore store f m] + (let [ks (keys (f {} m)) + prev (reduce (fn [p k] + (assoc p k (.get store k))) + {} + ks) + next (f prev m)] + (doall (map (fn [[k v]] (.put store k v)) next)) + next)) + +(defn add-state-store! + [builder] + "Takes a builder and adds a state store." + (doto ^StreamsBuilder (j/streams-builder* builder) + (.addStateStore (Stores/keyValueStoreBuilder + (Stores/persistentKeyValueStore "transducer") + (js/edn-serde) + (js/edn-serde)))) + builder) + +(defn transformer + "Takes a transducer and creates an instance of + org.apache.kafka.streams.kstream.Transformer with overrides for + init, transform, and close." + [xf] + (let [ctx (atom nil)] + (reify + Transformer + (init [_ context] + (reset! ctx context)) + (transform [_ k v] + (let [^KeyValueStore store (.getStateStore @ctx "transducer")] + (doseq [[result-k result-v] (first (sequence (xf store) [[k v]]))] + (.forward @ctx result-k result-v)))) + (close [_])))) + +(defn transduce + [kstream xf] + "Applies the transducer xf to each element of the kstream." + (j/transform kstream (fn [] (transformer xf)) ["transducer"])) diff --git a/src/jackdaw/streams/xform/fakes.clj b/src/jackdaw/streams/xform/fakes.clj new file mode 100644 index 00000000..2e3e1bfd --- /dev/null +++ b/src/jackdaw/streams/xform/fakes.clj @@ -0,0 +1,15 @@ +(ns jackdaw.streams.xform.fakes + (:import + [org.apache.kafka.streams.state KeyValueStore])) + +(defn fake-kv-store + "Creates an instance of org.apache.kafka.streams.state.KeyValueStore + with overrides for get and put." + [init] + (let [store (volatile! init)] + (reify KeyValueStore + (get [_ k] + (clojure.core/get @store k)) + + (put [_ k v] + (vswap! store assoc k v)))))