Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some additions and helpers for Kafka Streams interop #305

Merged
merged 4 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
[streams-builder]
(p/source-topics streams-builder))

(defn with-kv-state-store
"Adds a persistent state store to the topology with the configured name
and serdes"
[streams-builder store-config]
(p/with-kv-state-store streams-builder store-config))

(defn streams-builder*
"Returns the underlying KStreamBuilder."
[streams-builder]
Expand Down Expand Up @@ -220,6 +226,40 @@
[kstream other]
(p/merge kstream other))

(defn transform
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream."
([kstream transformer-supplier-fn]
(p/transform kstream transformer-supplier-fn))
([kstream transformer-supplier-fn state-store-names]
(p/transform kstream transformer-supplier-fn state-store-names)))

(defn flat-transform
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream. Result of the transform should be iterable,
and the resulting stream is as per flatMap"
([kstream transformer-supplier-fn]
(p/flat-transform kstream transformer-supplier-fn))
([kstream transformer-supplier-fn state-store-names]
(p/flat-transform kstream transformer-supplier-fn state-store-names)))

(defn transform-values
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream."
([kstream value-transformer-supplier-fn]
(p/transform-values kstream value-transformer-supplier-fn))
([kstream value-transformer-supplier-fn state-store-names]
(p/transform-values kstream value-transformer-supplier-fn state-store-names)))

(defn flat-transform-values
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream. Result of the transform should be iterable,
and the resulting stream is as per flatMap"
([kstream value-transformer-supplier-fn]
(p/flat-transform-values kstream value-transformer-supplier-fn))
([kstream value-transformer-supplier-fn state-store-names]
(p/flat-transform-values kstream value-transformer-supplier-fn state-store-names)))

(defn kstream*
"Returns the underlying KStream object."
[kstream]
Expand Down
34 changes: 34 additions & 0 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
ValueMapper ValueMapperWithKey ValueTransformerSupplier Windows]
[org.apache.kafka.streams.processor
StreamPartitioner]
[org.apache.kafka.streams.state
KeyValueStore Stores]
(org.apache.kafka.streams.processor.api
ProcessorSupplier)))

Expand Down Expand Up @@ -129,6 +131,15 @@
^String topic-name
^Consumed (topic->consumed topic-config))))

(with-kv-state-store
[builder {:keys [store-name key-serde value-serde] :as store-config}]
(.addStateStore ^StreamsBuilder streams-builder
(Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore store-name)
gphilipp marked this conversation as resolved.
Show resolved Hide resolved
key-serde
value-serde))
builder)

(streams-builder*
[_]
streams-builder))
Expand Down Expand Up @@ -335,6 +346,17 @@
^TransformerSupplier (transformer-supplier transformer-supplier-fn)
^"[Ljava.lang.String;" (into-array String state-store-names))))

(flat-transform
[this transformer-supplier-fn]
(flat-transform this transformer-supplier-fn []))

(flat-transform
[_ transformer-supplier-fn state-store-names]
(clj-kstream
(.flatTransform ^KStream kstream
^TransformerSupplier (transformer-supplier transformer-supplier-fn)
^"[Ljava.lang.String;" (into-array String
(clojure.core/map name state-store-names)))))
(transform-values
[this value-transformer-supplier-fn]
(transform-values this value-transformer-supplier-fn []))
Expand All @@ -346,6 +368,18 @@
^ValueTransformerSupplier (value-transformer-supplier value-transformer-supplier-fn)
^"[Ljava.lang.String;" (into-array String state-store-names))))

(flat-transform-values
[this value-transformer-supplier-fn]
(flat-transform-values this value-transformer-supplier-fn []))

(flat-transform-values
[_ value-transformer-supplier-fn state-store-names]
(clj-kstream
(.flatTransformValues ^KStream kstream
^ValueTransformerSupplier (value-transformer-supplier value-transformer-supplier-fn)
^"[Ljava.lang.String;" (into-array String
(clojure.core/map name state-store-names)))))

(join-global
[_ global-ktable key-value-mapper-fn joiner-fn]
(clj-kstream
Expand Down
52 changes: 50 additions & 2 deletions src/jackdaw/streams/lambdas.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
(:import org.apache.kafka.streams.KeyValue
[org.apache.kafka.streams.kstream
Aggregator ForeachAction Initializer KeyValueMapper
Merger Predicate Reducer TransformerSupplier ValueJoiner
ValueMapper ValueTransformerSupplier]
Merger Predicate Reducer Transformer TransformerSupplier
ValueJoiner ValueMapper ValueTransformer ValueTransformerSupplier]
[org.apache.kafka.streams.processor
Processor ProcessorSupplier StreamPartitioner]))

Expand Down Expand Up @@ -184,3 +184,51 @@
"Packages up a Clojure fn in a kstream value transformer supplier."
[value-transformer-supplier-fn]
(FnValueTransformerSupplier. value-transformer-supplier-fn))

(deftype FnTransformer [context xfm-fn]
Transformer
(init [this trasnformer-context]
99-not-out marked this conversation as resolved.
Show resolved Hide resolved
(reset! context trasnformer-context))
(close [this])
(transform [this k v]
(xfm-fn @context k v)))

(defn transformer-with-ctx
"Helper to create a Transformer for use inside the jackdaw transform wrapper.
Passed function should take three args - the context, key and value for the stream.
The processor context allows access to stream internals such as state stores.
Result is returned from the transform. E.g.
```
(-> builder
(k/stream topic)
(k/transform
(kl/transformer-with-ctx
(fn [ctx k v]
...))))
```"
[xfm-fn]
(fn [] (FnTransformer. (atom nil) xfm-fn)))

(deftype FnValueTransformer [context xfm-fn]
ValueTransformer
(init [this trasnformer-context]
99-not-out marked this conversation as resolved.
Show resolved Hide resolved
(reset! context trasnformer-context))
(close [this])
(transform [this v]
(xfm-fn @context v)))

(defn value-transformer-with-ctx
"Helper to create a ValueTransformer for use inside the jackdaw transform-values wrapper.
Passed function should take two args - the context and value for the stream.
The processor context allows access to stream internals such as state stores.
Result is returned from the transform-values. E.g.
```
(-> builder
(k/stream topic)
(k/transform-values
(kl/value-transformer-with-ctx
(fn [ctx v]
...))))
```"
[xfm-fn]
(fn [] (FnValueTransformer. (atom nil) xfm-fn)))
66 changes: 22 additions & 44 deletions src/jackdaw/streams/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,36 @@
"Mocks for testing kafka streams."
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:refer-clojure :exclude [send])
(:require [jackdaw.streams.protocols :as k]
[jackdaw.streams.configurable :refer [config configure]]
[jackdaw.streams.configured :as configured]
(:require [jackdaw.streams :as js]
[jackdaw.streams.interop :as interop]
[jackdaw.streams.protocols :as k]
[jackdaw.data :as data])
(:import java.nio.file.Files
java.nio.file.attribute.FileAttribute
org.apache.kafka.streams.TopologyTestDriver
[org.apache.kafka.streams Topology TopologyTestDriver]
java.util.Properties
org.apache.kafka.streams.test.ConsumerRecordFactory
org.apache.kafka.common.header.internals.RecordHeaders
[org.apache.kafka.common.serialization Serde Serdes Serializer]))

(set! *warn-on-reflection* false)

(defn streams-builder
"Creates a mock streams-builder."
([]
(streams-builder (interop/streams-builder)))
([streams-builder]
(configured/streams-builder
{::streams-builder (k/streams-builder* streams-builder)}
streams-builder)))
gphilipp marked this conversation as resolved.
Show resolved Hide resolved
(defn topology->test-driver
""
[topology]
(TopologyTestDriver.
topology
(doto (Properties.)
(.put "application.id" (str (java.util.UUID/randomUUID)))
(.put "bootstrap.servers" "fake")
(.put "default.key.serde" "jackdaw.serdes.EdnSerde")
(.put "default.value.serde" "jackdaw.serdes.EdnSerde"))))

(defn streams-builder->test-driver
""
[streams-builder]
(let [topology (-> streams-builder config ::streams-builder .build)]
(TopologyTestDriver.
topology
(doto (Properties.)
(.put "application.id" (str (java.util.UUID/randomUUID)))
(.put "bootstrap.servers" "fake")
(.put "default.key.serde" "jackdaw.serdes.EdnSerde")
(.put "default.value.serde" "jackdaw.serdes.EdnSerde")))))

(defn send
"Publishes message to a topic."
[topology topic-config key message]
(let [test-driver (-> topology config ::test-driver)
time (or (-> topology config ::test-driver-time) 0)]
(.setTime test-driver time)
(.process test-driver
(:topic-name topic-config)
key
message)
(.flushState test-driver)
(-> topology
(configure ::test-driver-time (inc time)))))

(defn collect
"Collects the test results. The test driver returns a list of messages with
each message formatted like \"key:value\""
[streams-builder]
(let [processor-supplier (-> streams-builder config ::processor-supplier)
processed (vec (.processed processor-supplier))]
(.clear (.processed processor-supplier))
processed))
gphilipp marked this conversation as resolved.
Show resolved Hide resolved
(let [topology (.build (js/streams-builder* streams-builder))]
(topology->test-driver topology)))

(defn producer
""
Expand Down Expand Up @@ -107,10 +79,16 @@
(repeatedly-consume test-driver topic-config))

(defn build-driver [f]
(let [builder (streams-builder)]
(let [builder (interop/streams-builder)]
(f builder)
(streams-builder->test-driver builder)))

(defn build-topology-driver [f]
(let [topology (Topology.)]
(f topology)
(topology->test-driver topology)))


;; FIXME (arrdem 2018-11-24):
;; This is used by the test suite but has no bearing on anything else
(defn topic
Expand Down
17 changes: 17 additions & 0 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
[topology-builder]
"Gets the names of source topics for the topology.")

(with-kv-state-store
[topology-builder store-config]
"Adds a persistent state store to the topology with the configured name
and serdes.")

(streams-builder*
[streams-builder]
"Returns the underlying KStreamBuilder."))
Expand Down Expand Up @@ -166,12 +171,24 @@
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream.")

(flat-transform
[kstream transformer-supplier-fn]
[kstream transformer-supplier-fn state-store-names]
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream via flatTransform.")

(transform-values
[kstream value-transformer-supplier-fn]
[kstream value-transformer-supplier-fn state-store-names]
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream.")

(flat-transform-values
[kstream value-transformer-supplier-fn]
[kstream value-transformer-supplier-fn state-store-names]
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream via flatTransformValues.")

(join-global
[kstream global-ktable kv-mapper joiner])

Expand Down
12 changes: 12 additions & 0 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,24 @@
:state-store-names (s/? (s/coll-of string?)))
:ret kstream?)

(s/fdef k/flat-transform
:args (s/cat :kstream kstream?
:transformer-supplier-fn ifn?
:state-store-names (s/? (s/coll-of string?)))
:ret kstream?)

(s/fdef k/transform-values
:args (s/cat :kstream kstream?
:value-transformer-supplier-fn ifn?
:state-store-names (s/? (s/coll-of string?)))
:ret kstream?)

(s/fdef k/flat-transform-values
:args (s/cat :kstream kstream?
:value-transformer-supplier-fn ifn?
:state-store-names (s/? (s/coll-of string?)))
:ret kstream?)

(s/fdef k/join-global
:args (s/cat :kstream kstream?
:global-ktable global-ktable?
Expand Down
4 changes: 2 additions & 2 deletions src/jackdaw/test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
(:import
(java.io Closeable)
(java.util Properties)
(org.apache.kafka.streams TopologyTestDriver StreamsBuilder)))
(org.apache.kafka.streams Topology TopologyTestDriver StreamsBuilder)))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -247,4 +247,4 @@
.build))
props (-> (Properties.)
(set-properties app-config))]
(TopologyTestDriver. topology props)))
(TopologyTestDriver. ^Topology topology ^Properties props)))
gphilipp marked this conversation as resolved.
Show resolved Hide resolved
Loading