Skip to content

Commit

Permalink
Use Transformer instead of ValueTransformer in jackdaw.streams.xform
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles Reese committed Sep 24, 2019
1 parent b40642c commit e7e7a21
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
6 changes: 2 additions & 4 deletions examples/xf-word-count/dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
[xf-word-count :as xfwc])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))
[xf-word-count :as xfwc]))


(def repl-config
Expand Down
20 changes: 9 additions & 11 deletions examples/xf-word-count/src/xf_word_count.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
[clojure.tools.logging :refer [info]]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))
[jackdaw.streams.xform :as jxf]))


(defn xf-running-total
Expand All @@ -18,17 +15,18 @@
([] (rf))
([result] (rf result))
([result input]
(let [next (as-> input %
(let [[k v] input
next (as-> v %
(swap-fn state #(merge-with (fnil + 0) %1 %2) %)
(select-keys % (keys input))
(select-keys % (keys v))
(map vec %))]
(rf result next))))))

(defn xf
[state swap-fn]
(comp
(map (fn [x] (str/split x #" ")))
(map frequencies)
(map (fn [[k v]] [k (str/split v #" ")]))
(map (fn [[k v]] [k (frequencies v)]))
(xf-running-total state swap-fn)))


Expand All @@ -47,9 +45,9 @@

;; Evaluate the form:
(def coll
["inside every large program"
"is a small program"
"struggling to get out"])
[[nil "inside every large program"]
[nil "is a small program"]
[nil "struggling to get out"]])

;; Let's counts the words. Evaluate the form:
(transduce (xf (atom {}) swap!) concat coll)
Expand Down
41 changes: 22 additions & 19 deletions src/jackdaw/streams/xform.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
(:gen-class)
(:require [jackdaw.serdes :as js]
[jackdaw.streams :as j])
(:import org.apache.kafka.streams.kstream.ValueTransformer
(:import org.apache.kafka.streams.kstream.Transformer
org.apache.kafka.streams.KeyValue
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))

Expand Down Expand Up @@ -32,33 +33,35 @@
(doall (map (fn [[k v]] (.put store k v)) next))
next))

(defn value-transformer
"Creates an instance of org.apache.kafka.streams.kstream.ValueTransformer
with overrides for init, transform, and close."
(defn add-state-store!
[builder]
"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
(js/edn-serde)
(js/edn-serde))))
builder)

(defn transformer
"Takes a transducer and creates an instance of
org.apache.kafka.streams.kstream.Transformer with overrides for
init, transform, and close."
[xf]
(let [ctx (atom nil)]
(reify
ValueTransformer
Transformer
(init [_ context]
(reset! ctx context))
(transform [_ v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")]
(first (into [] (xf store) [v]))))
(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
v (first (into [] (xf store) [[k v]]))]
(KeyValue/pair k v)))
(close [_]))))

(defn transduce-kstream
[kstream xf]
"Takes a kstream and xf and transduces the stream."
(-> kstream
(j/transform-values (fn [] (value-transformer xf)) ["transducer"])
(j/transform (fn [] (transformer xf)) ["transducer"])
(j/flat-map (fn [[_ v]] v))))

(defn add-state-store!
[builder]
"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
(js/edn-serde)
(js/edn-serde))))
builder)

0 comments on commit e7e7a21

Please sign in to comment.