Skip to content

Commit

Permalink
Add KStream-KTable inner join
Browse files Browse the repository at this point in the history
The main changes are:

- Move the join method from IKTable to IKStreamBase. Also move the join
  implementations for KTable-KTable joins and the join function accordingly.
- Add join implementations for ConfiguredKStream and CljKStream.
- Update the spec for join.
  • Loading branch information
simon-katz committed Jun 30, 2021
1 parent 144748e commit f0e9fab
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Added

* Add KStream-KTable inner join.

## [0.8.0] - [2021-05-13]
* Update Kafka to 2.8.0 (confluent 6.1.1) [#292](https://github.com/FundingCircle/jackdaw/pull/292)
* Improve test-machine documentation [#287](https://github.com/FundingCircle/jackdaw/pull/287)
Expand Down
12 changes: 6 additions & 6 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@

;; IKStreamBase

(defn join
"Combines the values of the KStream-or-KTable and the KTable that
share the same key using an inner join."
[kstream-or-ktable ktable value-joiner-fn]
(p/join kstream-or-ktable ktable value-joiner-fn))

(defn left-join
"Creates a KStream from the result of calling `value-joiner-fn` with
each element in the KStream and the value in the KTable with the same
Expand Down Expand Up @@ -221,12 +227,6 @@

;; IKTable

(defn join
"Combines the values of the two KTables that share the same key using an
inner join."
[ktable other-ktable value-joiner-fn]
(p/join ktable other-ktable value-joiner-fn))

(defn outer-join
"Combines the values of two KTables that share the same key using an outer
join."
Expand Down
18 changes: 12 additions & 6 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@

(deftype ConfiguredKStream [config kstream]
IKStreamBase
(join
[_ ktable value-joiner-fn]
(configured-kstream
config
(join kstream ktable value-joiner-fn)))

(left-join
[_ ktable value-joiner-fn]
(configured-kstream
Expand Down Expand Up @@ -301,6 +307,12 @@

(deftype ConfiguredKTable [config ktable]
IKStreamBase
(join
[_ other-ktable value-joiner-fn]
(configured-ktable
config
(join ktable other-ktable value-joiner-fn)))

(left-join
[_ other-ktable value-joiner-fn]
(configured-ktable
Expand Down Expand Up @@ -338,12 +350,6 @@
config
(group-by ktable key-value-mapper-fn topic-config)))

(join
[_ other-ktable value-joiner-fn]
(configured-ktable
config
(join ktable other-ktable value-joiner-fn)))

(outer-join
[_ other-ktable value-joiner-fn]
(configured-ktable
Expand Down
21 changes: 14 additions & 7 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@

(deftype CljKStream [^KStream kstream]
IKStreamBase
(join
[_ ktable value-joiner-fn]
(clj-kstream
(.join ^KStream kstream
^KTable (ktable* ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(left-join
[_ ktable value-joiner-fn]
(clj-kstream
Expand Down Expand Up @@ -363,6 +370,13 @@

(deftype CljKTable [^KTable ktable]
IKStreamBase
(join
[_ other-ktable value-joiner-fn]
(clj-ktable
(.join ^KTable ktable
^KTable (ktable* other-ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(left-join
[_ other-ktable value-joiner-fn]
(clj-ktable
Expand Down Expand Up @@ -400,13 +414,6 @@
^KeyValueMapper (key-value-mapper key-value-mapper-fn)
^Serialized (topic->serialized topic-config))))

(join
[_ other-ktable value-joiner-fn]
(clj-ktable
(.join ^KTable ktable
^KTable (ktable* other-ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(outer-join
[_ other-ktable value-joiner-fn]
(clj-ktable
Expand Down
9 changes: 5 additions & 4 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@

(defprotocol IKStreamBase
"Methods common to KStream and KTable."
(join
[kstream-or-ktable ktable value-joiner-fn]
"Combines the values of the KStream-or-KTable with the values of the
KTable that share the same key using an inner join.")

(left-join
[kstream-or-ktable ktable value-joiner-fn]
[kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config]
Expand Down Expand Up @@ -179,10 +184,6 @@

(defprotocol IKTable
"A Ktable is an abstraction of a changlog stream."
(join
[ktable other-ktable value-joiner-fn]
"Combines the values of the two KTables that share the same key using an
inner join.")

(outer-join
[ktable other-ktable value-joiner-fn]
Expand Down
12 changes: 6 additions & 6 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@

;; IKStreamBase

(s/fdef k/join
:args (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
:value-joiner-fn ifn?)
:ret ::kstream-or-ktable)

(s/fdef k/left-join
:args (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
Expand Down Expand Up @@ -254,12 +260,6 @@

;; IKTable

(s/fdef k/join
:args (s/cat :ktable ktable?
:other-ktable ktable?
:value-joiner-fn ifn?)
:ret ktable?)

(s/fdef k/outer-join
:args (s/cat :ktable ktable?
:other-ktable ktable?
Expand Down
27 changes: 27 additions & 0 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@
(apply + (filter some? args)))

(deftest KStream
(testing "join"
(let [topic-a (mock/topic "table-a")
topic-b (mock/topic "table-b")
topic-c (mock/topic "topic-c")]

(with-open [driver (mock/build-driver (fn [builder]
(let [left (k/kstream builder topic-a)
right (k/ktable builder topic-b)]
(-> (k/join left right +)
(k/to topic-c)))))]
(let [publish-left (partial mock/publish driver topic-a)
publish-right (partial mock/publish driver topic-b)]

(publish-left 1 1)
(publish-right 1 10)
(publish-left 1 100)
(publish-right 1 1000)
(publish-left 1 10000)
(publish-right 2 1)
(publish-left 2 10)

(let [keyvals (mock/get-keyvals driver topic-c)]
(is (= 3 (count keyvals)))
(is (= [1 110] (first keyvals)))
(is (= [1 11000] (second keyvals)))
(is (= [2 11] (nth keyvals 2))))))))

(testing "left-join"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
Expand Down

0 comments on commit f0e9fab

Please sign in to comment.