diff --git a/CHANGELOG.md b/CHANGELOG.md index acb3473d..df52804c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Added + +* Add KStream-KTable inner join. + ## [0.8.0] - [2021-05-13] * Update Kafka to 2.8.0 (confluent 6.1.1) [#292](https://github.com/FundingCircle/jackdaw/pull/292) * Improve test-machine documentation [#287](https://github.com/FundingCircle/jackdaw/pull/287) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 62170a10..09f34a9e 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -53,6 +53,12 @@ ;; IKStreamBase +(defn join + "Combines the values of the KStream-or-KTable and the KTable that + share the same key using an inner join." + [kstream-or-ktable ktable value-joiner-fn] + (p/join kstream-or-ktable ktable value-joiner-fn)) + (defn left-join "Creates a KStream from the result of calling `value-joiner-fn` with each element in the KStream and the value in the KTable with the same @@ -221,12 +227,6 @@ ;; IKTable -(defn join - "Combines the values of the two KTables that share the same key using an - inner join." - [ktable other-ktable value-joiner-fn] - (p/join ktable other-ktable value-joiner-fn)) - (defn outer-join "Combines the values of two KTables that share the same key using an outer join." diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 72790a81..59b5c517 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -74,6 +74,12 @@ (deftype ConfiguredKStream [config kstream] IKStreamBase + (join + [_ ktable value-joiner-fn] + (configured-kstream + config + (join kstream ktable value-joiner-fn))) + (left-join [_ ktable value-joiner-fn] (configured-kstream @@ -301,6 +307,12 @@ (deftype ConfiguredKTable [config ktable] IKStreamBase + (join + [_ other-ktable value-joiner-fn] + (configured-ktable + config + (join ktable other-ktable value-joiner-fn))) + (left-join [_ other-ktable value-joiner-fn] (configured-ktable @@ -338,12 +350,6 @@ config (group-by ktable key-value-mapper-fn topic-config))) - (join - [_ other-ktable value-joiner-fn] - (configured-ktable - config - (join ktable other-ktable value-joiner-fn))) - (outer-join [_ other-ktable value-joiner-fn] (configured-ktable diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 6f4254bc..8780420b 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -138,6 +138,13 @@ (deftype CljKStream [^KStream kstream] IKStreamBase + (join + [_ ktable value-joiner-fn] + (clj-kstream + (.join ^KStream kstream + ^KTable (ktable* ktable) + ^ValueJoiner (value-joiner value-joiner-fn)))) + (left-join [_ ktable value-joiner-fn] (clj-kstream @@ -363,6 +370,13 @@ (deftype CljKTable [^KTable ktable] IKStreamBase + (join + [_ other-ktable value-joiner-fn] + (clj-ktable + (.join ^KTable ktable + ^KTable (ktable* other-ktable) + ^ValueJoiner (value-joiner value-joiner-fn)))) + (left-join [_ other-ktable value-joiner-fn] (clj-ktable @@ -400,13 +414,6 @@ ^KeyValueMapper (key-value-mapper key-value-mapper-fn) ^Serialized (topic->serialized topic-config)))) - (join - [_ other-ktable value-joiner-fn] - (clj-ktable - (.join ^KTable ktable - ^KTable (ktable* other-ktable) - ^ValueJoiner (value-joiner value-joiner-fn)))) - (outer-join [_ other-ktable value-joiner-fn] (clj-ktable diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index 90385198..aba97ebb 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -37,6 +37,11 @@ (defprotocol IKStreamBase "Methods common to KStream and KTable." + (join + [kstream-or-ktable ktable value-joiner-fn] + "Combines the values of the KStream-or-KTable with the values of the + KTable that share the same key using an inner join.") + (left-join [kstream-or-ktable ktable value-joiner-fn] [kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config] @@ -179,10 +184,6 @@ (defprotocol IKTable "A Ktable is an abstraction of a changlog stream." - (join - [ktable other-ktable value-joiner-fn] - "Combines the values of the two KTables that share the same key using an - inner join.") (outer-join [ktable other-ktable value-joiner-fn] diff --git a/src/jackdaw/streams/specs.clj b/src/jackdaw/streams/specs.clj index c14b677d..bffa3787 100644 --- a/src/jackdaw/streams/specs.clj +++ b/src/jackdaw/streams/specs.clj @@ -101,6 +101,12 @@ ;; IKStreamBase +(s/fdef k/join + :args (s/cat :kstream-or-ktable ::kstream-or-ktable + :ktable ktable? + :value-joiner-fn ifn?) + :ret ::kstream-or-ktable) + (s/fdef k/left-join :args (s/cat :kstream-or-ktable ::kstream-or-ktable :ktable ktable? @@ -254,12 +260,6 @@ ;; IKTable -(s/fdef k/join - :args (s/cat :ktable ktable? - :other-ktable ktable? - :value-joiner-fn ifn?) - :ret ktable?) - (s/fdef k/outer-join :args (s/cat :ktable ktable? :other-ktable ktable? diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 263658ba..a371844d 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -64,6 +64,33 @@ (apply + (filter some? args))) (deftest KStream + (testing "join" + (let [topic-a (mock/topic "table-a") + topic-b (mock/topic "table-b") + topic-c (mock/topic "topic-c")] + + (with-open [driver (mock/build-driver (fn [builder] + (let [left (k/kstream builder topic-a) + right (k/ktable builder topic-b)] + (-> (k/join left right +) + (k/to topic-c)))))] + (let [publish-left (partial mock/publish driver topic-a) + publish-right (partial mock/publish driver topic-b)] + + (publish-left 1 1) + (publish-right 1 10) + (publish-left 1 100) + (publish-right 1 1000) + (publish-left 1 10000) + (publish-right 2 1) + (publish-left 2 10) + + (let [keyvals (mock/get-keyvals driver topic-c)] + (is (= 3 (count keyvals))) + (is (= [1 110] (first keyvals))) + (is (= [1 11000] (second keyvals))) + (is (= [2 11] (nth keyvals 2)))))))) + (testing "left-join" (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b")