From 8536ff9232e95c30e13cc88d0cdf90a8bcb9eee2 Mon Sep 17 00:00:00 2001 From: Ted Kassen Date: Sun, 1 Mar 2020 17:40:21 -0800 Subject: [PATCH 1/2] Add name support for windowed joins --- src/jackdaw/streams.clj | 12 +++++++--- src/jackdaw/streams/configured.clj | 36 ++++++++++++++++++++++++++++++ src/jackdaw/streams/interop.clj | 36 ++++++++++++++++++++++++++++++ src/jackdaw/streams/protocols.clj | 3 +++ src/jackdaw/streams/specs.clj | 9 +++++--- test/jackdaw/streams_test.clj | 14 ++++++++++-- 6 files changed, 102 insertions(+), 8 deletions(-) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 8973039b..c26abdbd 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -144,7 +144,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn left-join-windowed "Combines the values of two streams that share the same key using a @@ -152,7 +154,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/left-join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn map "Creates a KStream that consists of the result of applying @@ -166,7 +170,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/outer-join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn process! "Applies `processor-fn` to each item in the input stream." diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 7cd5e0af..a7b74176 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -188,6 +188,18 @@ topic-config other-topic-config))) + (join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (left-join-windowed [_ other-kstream value-joiner-fn windows] (configured-kstream @@ -205,6 +217,18 @@ topic-config other-topic-config))) + (left-join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (left-join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (map [_ key-value-mapper-fn] (configured-kstream @@ -238,6 +262,18 @@ topic-config other-topic-config))) + (outer-join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (outer-join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (process! [_ processor-supplier-fn state-store-names] (process! kstream processor-supplier-fn state-store-names)) diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 1bb755c8..63f6d284 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -253,6 +253,18 @@ ^JoinWindows windows (Joined/with key-serde this-value-serde other-value-serde)))) + (join-windowed + [_ other-kstream value-joiner-fn windows + {key-serde :key-serde this-value-serde :value-serde} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.join kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde this-value-serde other-value-serde join-name)))) + (left-join-windowed [_ other-kstream value-joiner-fn windows] (clj-kstream @@ -272,6 +284,18 @@ ^JoinWindows windows (Joined/with key-serde value-serde other-value-serde)))) + (left-join-windowed + [_ other-kstream value-joiner-fn windows + {:keys [key-serde value-serde]} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.leftJoin kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde value-serde other-value-serde join-name)))) + (map [_ key-value-mapper-fn] (clj-kstream @@ -302,6 +326,18 @@ ^JoinWindows windows (Joined/with key-serde value-serde other-value-serde)))) + (outer-join-windowed + [_ other-kstream value-joiner-fn windows + {key-serde :key-serde value-serde :value-serde} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.outerJoin ^KStream kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde value-serde other-value-serde join-name)))) + (process! [_ processor-supplier-fn state-store-names] (.process ^KStream kstream diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index f6979b51..8450e478 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -115,12 +115,14 @@ (join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed inner join.") (left-join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed left join.") @@ -136,6 +138,7 @@ (outer-join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed outer join.") diff --git a/src/jackdaw/streams/specs.clj b/src/jackdaw/streams/specs.clj index 58bce5a8..4959150b 100644 --- a/src/jackdaw/streams/specs.clj +++ b/src/jackdaw/streams/specs.clj @@ -178,7 +178,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/left-join-windowed @@ -187,7 +188,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/map @@ -206,7 +208,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/process! diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 7c8b0943..38eaff39 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -310,6 +310,7 @@ (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b") topic-c (mock/topic "topic-c") + topic-d (mock/topic "topic-d") windows (JoinWindows/of 1000) driver (mock/build-driver (fn [builder] (let [left-kstream (k/kstream builder topic-a) @@ -320,14 +321,23 @@ windows topic-a topic-b) - (k/to topic-c))))) + (k/to topic-c)) + (-> left-kstream + (k/join-windowed right-kstream + + + windows + topic-a + topic-b + "a-b-windowed-join") + (k/to topic-d))))) publish-a (partial mock/publish driver topic-a) publish-b (partial mock/publish driver topic-b)] (publish-a 1 1 1) (publish-b 100 1 2) (publish-b 10000 1 4) ;; Outside of join window - (is (= [[1 3]] (mock/get-keyvals driver topic-c))))) + (is (= [[1 3]] (mock/get-keyvals driver topic-c))) + (is (= [[1 3]] (mock/get-keyvals driver topic-d))))) (testing "map" (let [topic-a (mock/topic "topic-a") From 5cec11161bf02fd65ef56da702bb061f289b9fd8 Mon Sep 17 00:00:00 2001 From: Ted Kassen Date: Sun, 1 Mar 2020 18:12:32 -0800 Subject: [PATCH 2/2] Updated CHANGELOG.md for named join addition --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a793a6f..fe80bad9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added * Start formalizing test-machine commands with fspec'd functions +* Added 7-arity signatures for `join-windowed`, `left-join-windowed` and `outer-join-windowed` allowing explicit join naming ## [0.7.2] - [2020-02-07]