Skip to content

Commit

Permalink
Add the suppress streams operator
Browse files Browse the repository at this point in the history
This adds the `suppress` operator for Kafka Streams apps. This allows to
only emit messages for a specific key once within a specific time
window, e.g.
  • Loading branch information
truemped committed Sep 13, 2019
1 parent 1474750 commit 2907ca1
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@
([ktable key-value-mapper-fn]
(p/to-kstream ktable key-value-mapper-fn)))

(defn suppress
"Suppress some updates from this changelog stream"
[ktable suppressed]
(p/suppress ktable suppressed))

(defn ktable*
"Returns the underlying KTable object."
[ktable]
Expand Down
6 changes: 6 additions & 0 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@
config
(outer-join ktable other-ktable value-joiner-fn)))

(suppress
[_ suppressed]
(configured-ktable
config
(suppress ktable suppressed)))

(to-kstream
[_]
(configured-kstream
Expand Down
26 changes: 24 additions & 2 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
Pattern]
[org.apache.kafka.common.serialization
Serde]
[java.time
Duration]
[org.apache.kafka.streams
KafkaStreams]
[org.apache.kafka.streams
Expand All @@ -19,8 +21,8 @@
JoinWindows KGroupedStream KGroupedTable KStream KTable
KeyValueMapper Materialized Merger Predicate Printed Produced
Reducer Serialized SessionWindowedKStream SessionWindows
TimeWindowedKStream ValueJoiner ValueMapper
ValueMapperWithKey ValueTransformerSupplier Windows]
Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner
ValueMapper ValueMapperWithKey ValueTransformerSupplier Windows]
[org.apache.kafka.streams.processor
StreamPartitioner]))

Expand All @@ -42,6 +44,21 @@
key-serde (.withKeySerde key-serde)
value-serde (.withValueSerde value-serde)))

(defn suppress-config->suppressed
[{:keys [max-records max-bytes until-time-limit-ms]}]
(let [config (cond
(not (nil? max-records))
(Suppressed$BufferConfig/maxRecords max-records)

(not (nil? max-bytes))
(Suppressed$BufferConfig/maxBytes max-bytes)

:else (Suppressed$BufferConfig/unbounded))]
(if-some [time-limit until-time-limit-ms]
(Suppressed/untilTimeLimit (Duration/ofMillis time-limit) config)
(-> (.shutDownWhenFull config)
Suppressed/untilWindowCloses))))

(declare clj-kstream clj-ktable clj-kgroupedtable clj-kgroupedstream
clj-global-ktable clj-session-windowed-kstream
clj-time-windowed-kstream)
Expand Down Expand Up @@ -396,6 +413,11 @@
^KTable (ktable* other-ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(suppress
[_ suppress-config]
(clj-ktable
(.suppress ^KTable ktable (suppress-config->suppressed suppress-config))))

(to-kstream
[_]
(clj-kstream
Expand Down
8 changes: 8 additions & 0 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@
[ktable key-value-mapper-fn]
"Converts a KTable to a KStream.")

(suppress
[ktable {:keys [max-records max-bytes until-time-limit-ms]}]
"Suppress some updates from this changelog stream.
You can either specify `max-records` or `max-bytes`. If an empty map is
passed, the suppress will be unbounded. If `until-time-limit-ms` is set,
this will override the `TimeWindow` interval. Note that when relying on the
configured `TimeWindow` the default `grace` period is `24h - window-size`.")

(ktable*
[ktable]
"Returns the underlying KTable object."))
Expand Down
11 changes: 11 additions & 0 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
(def streams-builder?
(partial satisfies? IStreamsBuilder))

(s/def ::max-records int?)
(s/def ::max-bytes int?)
(s/def ::until-time-limit-ms int?)
(s/def ::suppress-config (s/keys :opt-un [::max-records
::max-bytes
::until-time-limit-ms]))
(s/def ::topic-name string?)
(s/def ::key-serde any?)
(s/def ::value-serde any?)
Expand Down Expand Up @@ -257,6 +263,11 @@
:value-joiner-fn ifn?)
:ret ktable?)

(s/fdef k/suppress
:args (s/cat :ktable ktable?
:suppress ::suppress-config)
:ret ktable?)

(s/fdef k/to-kstream
:args (s/cat :ktable ktable?
:key-value-mapper-fn (s/? ifn?))
Expand Down
171 changes: 169 additions & 2 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[jackdaw.streams.protocols
:refer [IKStream IKTable IStreamsBuilder]]
[jackdaw.streams.specs])
(:import [org.apache.kafka.streams.kstream
(:import [java.time Duration]
[org.apache.kafka.streams.kstream
JoinWindows SessionWindows TimeWindows Transformer
ValueTransformer]
org.apache.kafka.streams.StreamsBuilder
Expand Down Expand Up @@ -643,7 +644,173 @@
(is (= 3 (count keyvals)))
(is (= [1 1] (first keyvals)))
(is (= [1 3] (second keyvals)))
(is (= [1 6] (nth keyvals 2)))))))))
(is (= [1 6] (nth keyvals 2))))))))

(testing "suppress records per time window"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")

;; if we don't set the `grace` period of the `TimeWindows`, the
;; default is used: 24h - window
window (Duration/ofMillis 100)
grace (Duration/ofMillis 1)
time-windows (-> window
TimeWindows/of
(.grace grace))]

(with-open [driver (mock/build-driver (fn [builder]
(-> builder
(k/ktable topic-a)
(k/to-kstream)
(k/group-by-key)
(k/window-by-time time-windows)
(k/reduce + topic-b)
(k/suppress {})
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
(k/to topic-c))))]

(let [publish (partial mock/publish driver topic-a)]

(publish 1 1 3)
(publish 2 1 4)
(publish 201 1 1)
(publish 202 1 2)
(publish 303 1 1)

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 2 (count keyvals)))
(is (= [1 7] (first keyvals)))
(is (= [1 3] (second keyvals))))))))

(testing "suppress records with a bounded buffer"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")

window (Duration/ofMillis 100)
grace (Duration/ofMillis 1)
time-windows (-> window
TimeWindows/of
(.grace grace))
max-records 2]

(with-open [driver (mock/build-driver (fn [builder]
(-> builder
(k/ktable topic-a)
(k/to-kstream)
(k/group-by-key)
(k/window-by-time time-windows)
(k/reduce + topic-b)
(k/suppress {:max-records max-records})
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
(k/to topic-c))))]

(let [publish (partial mock/publish driver topic-a)]

(publish 1 1 3)
(publish 2 1 4)
(publish 201 1 1)
(publish 202 1 2)
(publish 210 1 3)
(publish 303 1 1)

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 2 (count keyvals)))
(is (= [1 7] (first keyvals)))
(is (= [1 6] (second keyvals))))))))

(testing "suppress records with a bounded buffer that will shutdown the app"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")

window (Duration/ofMillis 100)
grace (Duration/ofMillis 1)
time-windows (-> window
TimeWindows/of
(.grace grace))
max-records 2]

(with-open [driver (mock/build-driver (fn [builder]
(-> builder
(k/ktable topic-a)
(k/to-kstream)
(k/group-by-key)
(k/window-by-time time-windows)
(k/reduce + topic-b)
(k/suppress {:max-records max-records})
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
(k/to topic-c))))]

(let [publish (partial mock/publish driver topic-a)]

(publish 1 1 3)
(publish 2 2 4)
(try
(publish 2 3 4)
(catch org.apache.kafka.streams.errors.StreamsException e
(is "task [0_0] Failed to flush state store topic-b" (.getMessage e))))

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 0 (count keyvals))))))))

(testing "suppress records until time limit is reached"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")]

(with-open [driver (mock/build-driver (fn [builder]
(-> builder
(k/kstream topic-a)
(k/group-by-key)
(k/count topic-a)
(k/suppress {:until-time-limit-ms 10})
(k/to-kstream)
(k/to topic-b))))]

(let [publish (partial mock/publish driver topic-a)]

(publish 1 1 3)
(publish 2 1 4)
(publish 12 1 4)
(publish 201 1 1)
(publish 202 1 2)
(publish 222 1 3)

(let [keyvals (mock/get-keyvals driver topic-b)]
(is (= 2 (count keyvals)))
(is (= [1 3] (first keyvals)))
(is (= [1 6] (second keyvals))))))))

(testing "joining tables and suppressing records in between"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")]

(with-open [driver (mock/build-driver (fn [builder]
(let [left (k/ktable builder topic-a)
right (k/ktable builder topic-b)]
(-> (k/left-join left right safe-add)
(k/suppress {:until-time-limit-ms 10})
(k/to-kstream)
(k/to topic-c)))))]

(let [publish-left (partial mock/publish driver topic-a)
publish-right (partial mock/publish driver topic-b)]

(publish-left 10 1 1)
(publish-right 11 1 2)
(publish-right 20 1 3) ; new time-window, will be emitted 1+3
(publish-right 25 1 4)
(publish-right 35 1 5) ; new time-window, will be emitted 1+5

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 2 (count keyvals)))
(is (= [1 4] (first keyvals)))
(is (= [1 6] (second keyvals)))))))))

(deftest grouped-stream
(testing "count"
Expand Down

0 comments on commit 2907ca1

Please sign in to comment.