diff --git a/examples/xf-word-count/.gitignore b/examples/xf-word-count/.gitignore new file mode 100644 index 00000000..80dd262d --- /dev/null +++ b/examples/xf-word-count/.gitignore @@ -0,0 +1 @@ +log/ diff --git a/examples/xf-word-count/README.md b/examples/xf-word-count/README.md new file mode 100644 index 00000000..e6871924 --- /dev/null +++ b/examples/xf-word-count/README.md @@ -0,0 +1,3 @@ +# XF Word Count + +This is the classic 'word count' example done using transducers. diff --git a/examples/xf-word-count/deps.edn b/examples/xf-word-count/deps.edn new file mode 100644 index 00000000..f71d53ce --- /dev/null +++ b/examples/xf-word-count/deps.edn @@ -0,0 +1,28 @@ +{:paths + ["src" "resources"] + + :deps + {fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" + :exclusions [org.apache.zookeeper/zookeeper]} + org.clojure/clojure {:mvn/version "1.10.1"} + org.clojure/tools.logging {:mvn/version "0.4.1"} + org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} + org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} + ch.qos.logback/logback-classic {:mvn/version "1.2.3"} + integrant {:mvn/version "0.7.0"}} + + :aliases + {:dev + {:extra-paths ["dev" "../../dev"] + :extra-deps {integrant/repl {:mvn/version "0.3.1"} + danlentz/clj-uuid {:mvn/version "0.1.7" + :exclusions [primitive-math]}}} + + :test + {:extra-paths ["test"] + :extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} + :main-opts ["-m" "cognitect.test-runner"]}} + + :mvn/repos + {"confluent" {:url "https://packages.confluent.io/maven/"}}} diff --git a/examples/xf-word-count/dev/user.clj b/examples/xf-word-count/dev/user.clj new file mode 100644 index 00000000..e8094787 --- /dev/null +++ b/examples/xf-word-count/dev/user.clj @@ -0,0 +1,71 @@ +(ns user + "Use this namespace for interactive development. + + This namespace requires libs needed to reset the app and helpers + from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or + equivalent) to clean this namespace since these tools cannot tell + which libs are actually required." + (:gen-class) + (:require [clojure.string :as str] + [integrant.core :as ig] + [integrant.repl :refer [clear go halt prep init reset reset-all]] + [jackdaw.admin :as ja] + [jackdaw.serdes :as js] + [jackdaw.repl :refer :all] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf] + [xf-word-count :as xfwc]) + (:import org.apache.kafka.streams.kstream.ValueTransformer + [org.apache.kafka.streams.state KeyValueStore Stores] + org.apache.kafka.streams.StreamsBuilder)) + + +(def repl-config + "The development config. + When the 'dev' alias is active, this config will be used." + {:topology {:topology-builder xfwc/topology-builder + :xform xfwc/xf + :swap-fn jxf/kv-store-swap-fn} + + :topics {:streams-config xfwc/streams-config + :client-config (select-keys xfwc/streams-config + ["bootstrap.servers"]) + :topology (ig/ref :topology)} + + :app {:streams-config xfwc/streams-config + :topology (ig/ref :topology) + :topics (ig/ref :topics)}}) + + +(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] + (let [streams-builder (j/streams-builder)] + ((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) + +(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology] + :as opts}] + (let [topic-metadata (topology->topic-metadata topology streams-config)] + (with-open [client (ja/->AdminClient client-config)] + (ja/create-topics! client (vals topic-metadata))) + (assoc opts :topic-metadata topic-metadata))) + +(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] + (let [streams-app (j/kafka-streams topology streams-config)] + (j/start streams-app) + (assoc opts :streams-app streams-app))) + +(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] + (let [re (re-pattern (str "(" (->> topic-metadata + keys + (map name) + (str/join "|")) + ")"))] + (re-delete-topics client-config re))) + +(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] + (j/close streams-app) + (destroy-state-stores streams-config) + (let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] + (re-delete-topics (:client-config topics) re))) + + +(integrant.repl/set-prep! (constantly repl-config)) diff --git a/examples/xf-word-count/resources/logback.xml b/examples/xf-word-count/resources/logback.xml new file mode 120000 index 00000000..d1fd022d --- /dev/null +++ b/examples/xf-word-count/resources/logback.xml @@ -0,0 +1 @@ +../../resources/logback.xml \ No newline at end of file diff --git a/examples/xf-word-count/src/xf_word_count.clj b/examples/xf-word-count/src/xf_word_count.clj new file mode 100644 index 00000000..eaeaba64 --- /dev/null +++ b/examples/xf-word-count/src/xf_word_count.clj @@ -0,0 +1,147 @@ +(ns xf-word-count + "This is the classic 'word count' example done using transducers." + (:gen-class) + (:require [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.xform :as jxf]) + (:import org.apache.kafka.streams.kstream.ValueTransformer + [org.apache.kafka.streams.state KeyValueStore Stores] + org.apache.kafka.streams.StreamsBuilder)) + + +(defn xf-running-total + [state swap-fn] + (fn [rf] + (fn + ([] (rf)) + ([result] (rf result)) + ([result input] + (let [next (as-> input % + (swap-fn state #(merge-with (fnil + 0) %1 %2) %) + (select-keys % (keys input)) + (map vec %))] + (rf result next)))))) + +(defn xf + [state swap-fn] + (comp + (map (fn [x] (str/split x #" "))) + (map frequencies) + (xf-running-total state swap-fn))) + + +(comment + ;; Use this comment block to explore Word Count using Clojure + ;; transducers. + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (def coll + ["inside every large program" + "is a small program" + "struggling to get out"]) + + ;; Let's counts the words. + + ;; Evaluate the form: + (transduce (xf (atom {}) swap!) concat coll) + + ;; You should see output like the following: + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + + ;; This time, let's count the words using + ;; `jackdaw.streams.xform/fake-kv-store` which implements the + ;; KeyValueStore interface with overrides for get and put." + + ;; Evaluate the form: + (transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) + + ;; You should see the same output. + ) + + +(def streams-config + {"application.id" "xf-word-count" + "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") + "cache.max.bytes.buffering" "0"}) + + +(defn topology-builder + [{:keys [input output] :as topics} xf] + (fn [builder] + (jxf/add-state-store! builder) + (-> (j/kstream builder input) + (jxf/transduce-kstream xf) + (j/to output)) + builder)) + + +(comment + ;; Use this comment block to explore Word Count as a stream + ;; processing applicaton. + + ;; For more detail, see the comment block in + ;; /examples/word-count/src/word_count.clj + + ;; Start ZooKeeper and Kafka: + ;; ``` + ;; /bin/confluent local start kafka + ;; ``` + + ;; Launch a Clojure REPL: + ;; ``` + ;; cd /examples/xf-word-count + ;; clj -A:dev + ;; ``` + + ;; Emacs users: Open a project file, e.g. this one, and enter + ;; `M-x cider-jack-in`. + + ;; Evaluate the form: + (reset) + + ;; Evaluate the form: + (let [coll ["inside every large program" + "is a small program" + "struggling to get out"]] + (doseq [x coll] + (publish (:input topic-metadata) nil x))) + + ;; Evaluate the form: + (get-keyvals (:output topic-metadata)) + + ;; You should see output like the following: + ;; (["inside" 1] + ;; ["every" 1] + ;; ["large" 1] + ;; ["program" 1] + ;; ["is" 1] + ;; ["a" 1] + ;; ["small" 1] + ;; ["program" 2] + ;; ["struggling" 1] + ;; ["to" 1] + ;; ["get" 1] + ;; ["out" 1]) + ) diff --git a/examples/xf-word-count/test/xf_word_count_test.clj b/examples/xf-word-count/test/xf_word_count_test.clj new file mode 100644 index 00000000..e024f0d4 --- /dev/null +++ b/examples/xf-word-count/test/xf_word_count_test.clj @@ -0,0 +1,76 @@ +(ns xf-word-count-test + (:gen-class) + (:require [clojure.test :refer [deftest is]] + [jackdaw.serdes :as js] + [jackdaw.streams :as j] + [jackdaw.streams.protocols :as jsp] + [jackdaw.test :as jt] + [jackdaw.test.fixtures :as jtf] + [xf-word-count :as xfwc]) + (:import java.util.Properties + org.apache.kafka.streams.TopologyTestDriver)) + +(def test-config + {:broker-config {"bootstrap.servers" "localhost:9092"} + :topic-metadata {:input + {:topic-name "input" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)} + + :output + {:topic-name "output" + :partition-count 1 + :replication-factor 1 + :key-serde (js/edn-serde) + :value-serde (js/edn-serde)}} + :app-config xfwc/streams-config + :enable? (System/getenv "BOOTSTRAP_SERVERS")}) + +(defn topology-builder + [topic-metadata] + (xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn))) + +(defn props-for + [x] + (doto (Properties.) + (.putAll (reduce-kv (fn [m k v] + (assoc m (str k) (str v))) + {} + x)))) + +(def mock-transport-config + {:driver (let [streams-builder (j/streams-builder) + topology ((topology-builder (:topic-metadata test-config)) streams-builder)] + (TopologyTestDriver. (.build (j/streams-builder* topology)) + (props-for (:app-config test-config))))}) + +(def test-transport + (jt/mock-transport mock-transport-config (:topic-metadata test-config))) + +(defn done? + [journal] + (= 12 (count (get-in journal [:topics :output])))) + +(def commands + [[:write! :input "inside every large program" {:key-fn (constantly "")}] + [:write! :input "is a small program" {:key-fn (constantly "")}] + [:write! :input "struggling to get out" {:key-fn (constantly "")}] + [:watch done? {:timeout 2000}]]) + +(defn word-count + [journal word] + (->> (get-in journal [:topics :output]) + (filter (fn [x] (= word (:key x)))) + last + :value)) + +(deftest test-xf-word-count + (jtf/with-fixtures [(jtf/integration-fixture topology-builder test-config)] + (jackdaw.test/with-test-machine test-transport + (fn [machine] + (let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] + + (is (= 1 (word-count journal "large"))) + (is (= 2 (word-count journal "program")))))))) diff --git a/src/jackdaw/streams/xform.clj b/src/jackdaw/streams/xform.clj new file mode 100644 index 00000000..6211bb3d --- /dev/null +++ b/src/jackdaw/streams/xform.clj @@ -0,0 +1,63 @@ +(ns jackdaw.streams.xform + "Helper functions for working with transducers." + (:gen-class) + (:require [jackdaw.serdes :as js] + [jackdaw.streams :as j]) + (:import org.apache.kafka.streams.kstream.ValueTransformer + [org.apache.kafka.streams.state KeyValueStore Stores] + org.apache.kafka.streams.StreamsBuilder)) + +(defn fake-kv-store + "Creates an instance of org.apache.kafka.streams.state.KeyValueStore + with overrides for get and put." + [init] + (let [store (volatile! init)] + (reify KeyValueStore + (get [_ k] + (clojure.core/get @store k)) + + (put [_ k v] + (vswap! store assoc k v))))) + +(defn kv-store-swap-fn + "Takes an instance of KeyValueStore, a function f, and a map m, and + updates the store in a manner similar to `clojure.core/swap!`." + [^KeyValueStore store f m] + (let [prev (reduce (fn [m k] + (assoc m k (.get store k))) + {} + (keys m)) + next (f prev m)] + (doall (map (fn [[k v]] (.put store k v)) next)) + next)) + +(defn value-transformer + "Creates an instance of org.apache.kafka.streams.kstream.ValueTransformer + with overrides for init, transform, and close." + [xf] + (let [ctx (atom nil)] + (reify + ValueTransformer + (init [_ context] + (reset! ctx context)) + (transform [_ v] + (let [^KeyValueStore store (.getStateStore @ctx "transducer")] + (first (into [] (xf store) [v])))) + (close [_])))) + +(defn transduce-kstream + [kstream xf] + "Takes kstream and xf and transduces the stream." + (-> kstream + (j/transform-values (fn [] (value-transformer xf)) ["transducer"]) + (j/flat-map (fn [[_ v]] v)))) + +(defn add-state-store! + [builder] + "Takes a builder and adds a state store." + (doto ^StreamsBuilder (j/streams-builder* builder) + (.addStateStore (Stores/keyValueStoreBuilder + (Stores/persistentKeyValueStore "transducer") + (js/edn-serde) + (js/edn-serde)))) + builder)