Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KStream-KTable inner join #297

Merged
merged 1 commit into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Added

* Add KStream-KTable inner join.

## [0.8.0] - [2021-05-13]
* Update Kafka to 2.8.0 (confluent 6.1.1) [#292](https://github.com/FundingCircle/jackdaw/pull/292)
* Improve test-machine documentation [#287](https://github.com/FundingCircle/jackdaw/pull/287)
Expand Down
12 changes: 6 additions & 6 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@

;; IKStreamBase

(defn join
"Combines the values of the KStream-or-KTable and the KTable that
share the same key using an inner join."
[kstream-or-ktable ktable value-joiner-fn]
(p/join kstream-or-ktable ktable value-joiner-fn))

(defn left-join
"Creates a KStream from the result of calling `value-joiner-fn` with
each element in the KStream and the value in the KTable with the same
Expand Down Expand Up @@ -221,12 +227,6 @@

;; IKTable

(defn join
"Combines the values of the two KTables that share the same key using an
inner join."
[ktable other-ktable value-joiner-fn]
(p/join ktable other-ktable value-joiner-fn))

(defn outer-join
"Combines the values of two KTables that share the same key using an outer
join."
Expand Down
18 changes: 12 additions & 6 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@

(deftype ConfiguredKStream [config kstream]
IKStreamBase
(join
[_ ktable value-joiner-fn]
(configured-kstream
config
(join kstream ktable value-joiner-fn)))

(left-join
[_ ktable value-joiner-fn]
(configured-kstream
Expand Down Expand Up @@ -301,6 +307,12 @@

(deftype ConfiguredKTable [config ktable]
IKStreamBase
(join
[_ other-ktable value-joiner-fn]
(configured-ktable
config
(join ktable other-ktable value-joiner-fn)))

(left-join
[_ other-ktable value-joiner-fn]
(configured-ktable
Expand Down Expand Up @@ -338,12 +350,6 @@
config
(group-by ktable key-value-mapper-fn topic-config)))

(join
[_ other-ktable value-joiner-fn]
(configured-ktable
config
(join ktable other-ktable value-joiner-fn)))

(outer-join
[_ other-ktable value-joiner-fn]
(configured-ktable
Expand Down
21 changes: 14 additions & 7 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@

(deftype CljKStream [^KStream kstream]
IKStreamBase
(join
[_ ktable value-joiner-fn]
(clj-kstream
(.join ^KStream kstream
^KTable (ktable* ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(left-join
[_ ktable value-joiner-fn]
(clj-kstream
Expand Down Expand Up @@ -363,6 +370,13 @@

(deftype CljKTable [^KTable ktable]
IKStreamBase
(join
[_ other-ktable value-joiner-fn]
(clj-ktable
(.join ^KTable ktable
^KTable (ktable* other-ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(left-join
[_ other-ktable value-joiner-fn]
(clj-ktable
Expand Down Expand Up @@ -400,13 +414,6 @@
^KeyValueMapper (key-value-mapper key-value-mapper-fn)
^Serialized (topic->serialized topic-config))))

(join
[_ other-ktable value-joiner-fn]
(clj-ktable
(.join ^KTable ktable
^KTable (ktable* other-ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(outer-join
[_ other-ktable value-joiner-fn]
(clj-ktable
Expand Down
9 changes: 5 additions & 4 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@

(defprotocol IKStreamBase
"Methods common to KStream and KTable."
(join
[kstream-or-ktable ktable value-joiner-fn]
"Combines the values of the KStream-or-KTable with the values of the
KTable that share the same key using an inner join.")

(left-join
[kstream-or-ktable ktable value-joiner-fn]
[kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config]
Expand Down Expand Up @@ -179,10 +184,6 @@

(defprotocol IKTable
"A Ktable is an abstraction of a changlog stream."
(join
[ktable other-ktable value-joiner-fn]
"Combines the values of the two KTables that share the same key using an
inner join.")

(outer-join
[ktable other-ktable value-joiner-fn]
Expand Down
12 changes: 6 additions & 6 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@

;; IKStreamBase

(s/fdef k/join
:args (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
:value-joiner-fn ifn?)
:ret ::kstream-or-ktable)

(s/fdef k/left-join
:args (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
Expand Down Expand Up @@ -254,12 +260,6 @@

;; IKTable

(s/fdef k/join
:args (s/cat :ktable ktable?
:other-ktable ktable?
:value-joiner-fn ifn?)
:ret ktable?)

(s/fdef k/outer-join
:args (s/cat :ktable ktable?
:other-ktable ktable?
Expand Down
27 changes: 27 additions & 0 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@
(apply + (filter some? args)))

(deftest KStream
(testing "join"
(let [topic-a (mock/topic "table-a")
topic-b (mock/topic "table-b")
topic-c (mock/topic "topic-c")]

(with-open [driver (mock/build-driver (fn [builder]
(let [left (k/kstream builder topic-a)
right (k/ktable builder topic-b)]
(-> (k/join left right +)
(k/to topic-c)))))]
(let [publish-left (partial mock/publish driver topic-a)
publish-right (partial mock/publish driver topic-b)]

(publish-left 1 1)
(publish-right 1 10)
(publish-left 1 100)
(publish-right 1 1000)
(publish-left 1 10000)
(publish-right 2 1)
(publish-left 2 10)

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 3 (count keyvals)))
(is (= [1 110] (first keyvals)))
(is (= [1 11000] (second keyvals)))
(is (= [2 11] (nth keyvals 2))))))))

(testing "left-join"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
Expand Down