diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a58c2fa..a8debf5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,19 +2,19 @@ ## Unreleased -* Fix `ktable` constructor to use supplied `store-name` +* Fix `ktable` constructor to use supplied `store-name` +* Added 7-arity signatures for `join-windowed`, `left-join-windowed` and `outer-join-windowed` allowing explicit join naming ## [0.7.3] - [2020-04-08] ### Added - * Allow avro deserializaton via the resolver without a local copy of the schema - - * Start formalizing test-machine commands with fspec'd functions +* Start formalizing test-machine commands with fspec'd functions +* Allow avro deserializaton via the resolver without a local copy of the schema ### Fixed - * Moved dependency on kafka_2.11 into the dev profile +* Moved dependency on kafka_2.11 into the dev profile ## [0.7.2] - [2020-02-07] diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 8973039b..c26abdbd 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -144,7 +144,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn left-join-windowed "Combines the values of two streams that share the same key using a @@ -152,7 +154,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/left-join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn map "Creates a KStream that consists of the result of applying @@ -166,7 +170,9 @@ ([kstream other-kstream value-joiner-fn windows] (p/outer-join-windowed kstream other-kstream value-joiner-fn windows)) ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] - (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))) + (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)) + ([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] + (p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name))) (defn process! "Applies `processor-fn` to each item in the input stream." diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 7cd5e0af..a7b74176 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -188,6 +188,18 @@ topic-config other-topic-config))) + (join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (left-join-windowed [_ other-kstream value-joiner-fn windows] (configured-kstream @@ -205,6 +217,18 @@ topic-config other-topic-config))) + (left-join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (left-join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (map [_ key-value-mapper-fn] (configured-kstream @@ -238,6 +262,18 @@ topic-config other-topic-config))) + (outer-join-windowed + [_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name] + (configured-kstream + config + (outer-join-windowed kstream + other-kstream + value-joiner-fn + windows + topic-config + other-topic-config + join-name))) + (process! [_ processor-supplier-fn state-store-names] (process! kstream processor-supplier-fn state-store-names)) diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 6f4254bc..54e8f9d6 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -254,6 +254,18 @@ ^JoinWindows windows (Joined/with key-serde this-value-serde other-value-serde)))) + (join-windowed + [_ other-kstream value-joiner-fn windows + {key-serde :key-serde this-value-serde :value-serde} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.join kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde this-value-serde other-value-serde join-name)))) + (left-join-windowed [_ other-kstream value-joiner-fn windows] (clj-kstream @@ -273,6 +285,18 @@ ^JoinWindows windows (Joined/with key-serde value-serde other-value-serde)))) + (left-join-windowed + [_ other-kstream value-joiner-fn windows + {:keys [key-serde value-serde]} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.leftJoin kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde value-serde other-value-serde join-name)))) + (map [_ key-value-mapper-fn] (clj-kstream @@ -303,6 +327,18 @@ ^JoinWindows windows (Joined/with key-serde value-serde other-value-serde)))) + (outer-join-windowed + [_ other-kstream value-joiner-fn windows + {key-serde :key-serde value-serde :value-serde} + {other-value-serde :value-serde} + join-name] + (clj-kstream + (.outerJoin ^KStream kstream + ^KStream (kstream* other-kstream) + ^ValueJoiner (value-joiner value-joiner-fn) + ^JoinWindows windows + (Joined/with key-serde value-serde other-value-serde join-name)))) + (process! [_ processor-supplier-fn state-store-names] (.process ^KStream kstream diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index f6979b51..8450e478 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -115,12 +115,14 @@ (join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed inner join.") (left-join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed left join.") @@ -136,6 +138,7 @@ (outer-join-windowed [kstream other-kstream value-joiner-fn windows] [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config] + [kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name] "Combines the values of two streams that share the same key using a windowed outer join.") diff --git a/src/jackdaw/streams/specs.clj b/src/jackdaw/streams/specs.clj index 9bd56f91..4610c1ee 100644 --- a/src/jackdaw/streams/specs.clj +++ b/src/jackdaw/streams/specs.clj @@ -179,7 +179,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/left-join-windowed @@ -188,7 +189,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/map @@ -207,7 +209,8 @@ :value-joiner-fn ifn? :windows join-windows? :this-topic-config (s/? ::topic-config) - :other-topic-config (s/? ::topic-config)) + :other-topic-config (s/? ::topic-config) + :join-name (s/? string?)) :ret kstream?) (s/fdef k/process! diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 9c4d7d10..476c0013 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -313,6 +313,7 @@ (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b") topic-c (mock/topic "topic-c") + topic-d (mock/topic "topic-d") windows (JoinWindows/of 1000) driver (mock/build-driver (fn [builder] (let [left-kstream (k/kstream builder topic-a) @@ -323,14 +324,23 @@ windows topic-a topic-b) - (k/to topic-c))))) + (k/to topic-c)) + (-> left-kstream + (k/join-windowed right-kstream + + + windows + topic-a + topic-b + "a-b-windowed-join") + (k/to topic-d))))) publish-a (partial mock/publish driver topic-a) publish-b (partial mock/publish driver topic-b)] (publish-a 1 1 1) (publish-b 100 1 2) (publish-b 10000 1 4) ;; Outside of join window - (is (= [[1 3]] (mock/get-keyvals driver topic-c))))) + (is (= [[1 3]] (mock/get-keyvals driver topic-c))) + (is (= [[1 3]] (mock/get-keyvals driver topic-d))))) (testing "map" (let [topic-a (mock/topic "topic-a")