Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add name support for windowed joins #233

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

## Unreleased

* Fix `ktable` constructor to use supplied `store-name`
* Fix `ktable` constructor to use supplied `store-name`
* Added 7-arity signatures for `join-windowed`, `left-join-windowed` and `outer-join-windowed` allowing explicit join naming

## [0.7.3] - [2020-04-08]

### Added

* Allow avro deserializaton via the resolver without a local copy of the schema

* Start formalizing test-machine commands with fspec'd functions
* Start formalizing test-machine commands with fspec'd functions
* Allow avro deserializaton via the resolver without a local copy of the schema

### Fixed

* Moved dependency on kafka_2.11 into the dev profile
* Moved dependency on kafka_2.11 into the dev profile

## [0.7.2] - [2020-02-07]

Expand Down
12 changes: 9 additions & 3 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@
([kstream other-kstream value-joiner-fn windows]
(p/join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
(p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name)))

(defn left-join-windowed
"Combines the values of two streams that share the same key using a
windowed left join."
([kstream other-kstream value-joiner-fn windows]
(p/left-join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
(p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name)))

(defn map
"Creates a KStream that consists of the result of applying
Expand All @@ -166,7 +170,9 @@
([kstream other-kstream value-joiner-fn windows]
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name)))

(defn process!
"Applies `processor-fn` to each item in the input stream."
Expand Down
36 changes: 36 additions & 0 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@
topic-config
other-topic-config)))

(join-windowed
[_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name]
(configured-kstream
config
(join-windowed kstream
other-kstream
value-joiner-fn
windows
topic-config
other-topic-config
join-name)))

(left-join-windowed
[_ other-kstream value-joiner-fn windows]
(configured-kstream
Expand All @@ -205,6 +217,18 @@
topic-config
other-topic-config)))

(left-join-windowed
[_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name]
(configured-kstream
config
(left-join-windowed kstream
other-kstream
value-joiner-fn
windows
topic-config
other-topic-config
join-name)))

(map
[_ key-value-mapper-fn]
(configured-kstream
Expand Down Expand Up @@ -238,6 +262,18 @@
topic-config
other-topic-config)))

(outer-join-windowed
[_ other-kstream value-joiner-fn windows topic-config other-topic-config join-name]
(configured-kstream
config
(outer-join-windowed kstream
other-kstream
value-joiner-fn
windows
topic-config
other-topic-config
join-name)))

(process!
[_ processor-supplier-fn state-store-names]
(process! kstream processor-supplier-fn state-store-names))
Expand Down
36 changes: 36 additions & 0 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@
^JoinWindows windows
(Joined/with key-serde this-value-serde other-value-serde))))

(join-windowed
[_ other-kstream value-joiner-fn windows
{key-serde :key-serde this-value-serde :value-serde}
{other-value-serde :value-serde}
join-name]
(clj-kstream
(.join kstream
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde this-value-serde other-value-serde join-name))))

(left-join-windowed
[_ other-kstream value-joiner-fn windows]
(clj-kstream
Expand All @@ -273,6 +285,18 @@
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))

(left-join-windowed
[_ other-kstream value-joiner-fn windows
{:keys [key-serde value-serde]}
{other-value-serde :value-serde}
join-name]
(clj-kstream
(.leftJoin kstream
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde join-name))))

(map
[_ key-value-mapper-fn]
(clj-kstream
Expand Down Expand Up @@ -303,6 +327,18 @@
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))

(outer-join-windowed
[_ other-kstream value-joiner-fn windows
{key-serde :key-serde value-serde :value-serde}
{other-value-serde :value-serde}
join-name]
(clj-kstream
(.outerJoin ^KStream kstream
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde join-name))))

(process!
[_ processor-supplier-fn state-store-names]
(.process ^KStream kstream
Expand Down
3 changes: 3 additions & 0 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@
(join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
"Combines the values of two streams that share the same key using a
windowed inner join.")

(left-join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
"Combines the values of two streams that share the same key using a
windowed left join.")

Expand All @@ -136,6 +138,7 @@
(outer-join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config join-name]
"Combines the values of two streams that share the same key using a
windowed outer join.")

Expand Down
9 changes: 6 additions & 3 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@
:value-joiner-fn ifn?
:windows join-windows?
:this-topic-config (s/? ::topic-config)
:other-topic-config (s/? ::topic-config))
:other-topic-config (s/? ::topic-config)
:join-name (s/? string?))
:ret kstream?)

(s/fdef k/left-join-windowed
Expand All @@ -188,7 +189,8 @@
:value-joiner-fn ifn?
:windows join-windows?
:this-topic-config (s/? ::topic-config)
:other-topic-config (s/? ::topic-config))
:other-topic-config (s/? ::topic-config)
:join-name (s/? string?))
:ret kstream?)

(s/fdef k/map
Expand All @@ -207,7 +209,8 @@
:value-joiner-fn ifn?
:windows join-windows?
:this-topic-config (s/? ::topic-config)
:other-topic-config (s/? ::topic-config))
:other-topic-config (s/? ::topic-config)
:join-name (s/? string?))
:ret kstream?)

(s/fdef k/process!
Expand Down
14 changes: 12 additions & 2 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")
topic-d (mock/topic "topic-d")
windows (JoinWindows/of 1000)
driver (mock/build-driver (fn [builder]
(let [left-kstream (k/kstream builder topic-a)
Expand All @@ -323,14 +324,23 @@
windows
topic-a
topic-b)
(k/to topic-c)))))
(k/to topic-c))
(-> left-kstream
(k/join-windowed right-kstream
+
windows
topic-a
topic-b
"a-b-windowed-join")
(k/to topic-d)))))
publish-a (partial mock/publish driver topic-a)
publish-b (partial mock/publish driver topic-b)]

(publish-a 1 1 1)
(publish-b 100 1 2)
(publish-b 10000 1 4) ;; Outside of join window
(is (= [[1 3]] (mock/get-keyvals driver topic-c)))))
(is (= [[1 3]] (mock/get-keyvals driver topic-c)))
(is (= [[1 3]] (mock/get-keyvals driver topic-d)))))

(testing "map"
(let [topic-a (mock/topic "topic-a")
Expand Down