Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supply key-serde as well as value-serde in aggregate methods #172

Merged
merged 1 commit into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [0.6.7] - [2019-07-24]

### Fixed

* Supply `key-serde` as well as `value-serde` in `aggregate` methods

## [0.6.6] - [2019-06-20]

### Added
Expand Down
17 changes: 8 additions & 9 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,13 @@
(deftype CljKGroupedTable [^KGroupedTable kgroupedtable]
IKGroupedBase
(aggregate
[_ initializer-fn adder-fn subtractor-fn
{:keys [topic-name value-serde]}]
[_ initializer-fn adder-fn subtractor-fn topic]
(clj-ktable
(.aggregate ^KGroupedTable kgroupedtable
^Initializer (initializer initializer-fn)
^Aggregator (aggregator adder-fn)
^Aggregator (aggregator subtractor-fn)
(doto (Materialized/as ^String topic-name) (.withValueSerde value-serde)))))
^Materialized (topic->materialized topic))))

(aggregate
[_ initializer-fn adder-fn subtractor-fn]
Expand Down Expand Up @@ -486,12 +485,12 @@
(deftype CljKGroupedStream [^KGroupedStream kgroupedstream]
IKGroupedBase
(aggregate
[_ initializer-fn aggregator-fn {:keys [topic-name value-serde]}]
[_ initializer-fn aggregator-fn topic]
(clj-ktable
(.aggregate ^KGroupedStream kgroupedstream
^Initializer (initializer initializer-fn)
^Aggregator (aggregator aggregator-fn)
(doto (Materialized/as ^String topic-name) (.withValueSerde value-serde)))))
^Materialized (topic->materialized topic))))

(aggregate
[_ initializer-fn aggregator-fn]
Expand Down Expand Up @@ -547,12 +546,12 @@
(deftype CljTimeWindowedKStream [^TimeWindowedKStream windowed-kstream]
IKGroupedBase
(aggregate
[_ initializer-fn aggregator-fn {:keys [topic-name value-serde]}]
[_ initializer-fn aggregator-fn topic]
(clj-ktable
(.aggregate ^TimeWindowedKStream windowed-kstream
^Initializer (initializer initializer-fn)
^Aggregator (aggregator aggregator-fn)
(doto (Materialized/as ^String topic-name) (.withValueSerde value-serde)))))
^Materialized (topic->materialized topic))))

(count
[_]
Expand Down Expand Up @@ -585,13 +584,13 @@
(deftype CljSessionWindowedKStream [^SessionWindowedKStream windowed-kstream]
IKGroupedBase
(aggregate
[_ initializer-fn aggregator-fn merger-fn {:keys [topic-name value-serde]}]
[_ initializer-fn aggregator-fn merger-fn topic]
(clj-ktable
(.aggregate ^SessionWindowedKStream windowed-kstream
^Initializer (initializer initializer-fn)
^Aggregator (aggregator aggregator-fn)
^Merger (merger merger-fn)
(doto (Materialized/as ^String topic-name) (.withValueSerde value-serde)))))
^Materialized (topic->materialized topic))))

(count
[_]
Expand Down
56 changes: 56 additions & 0 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(:require [clojure.spec.test.alpha :as stest]
[clojure.string :as string]
[clojure.test :refer :all]
[jackdaw.serdes.edn :as jse]
[jackdaw.streams :as k]
[jackdaw.streams.configurable :as cfg]
[jackdaw.streams.lambdas :as lambdas :refer [key-value]]
Expand Down Expand Up @@ -734,6 +735,33 @@
(is (= [0 -7] (second keyvals)))
(is (= [1 -8] (nth keyvals 2))))))

(testing "aggregate: explicit kv store with custom serdes"
(let [serdes {:key-serde (jse/serde)
:value-serde (jse/serde)}
topic-a (merge (mock/topic "topic-a") serdes)
topic-b (merge (mock/topic "topic-b") serdes)
driver (mock/build-driver (fn [builder]
(-> builder
(k/kstream topic-a)
(k/group-by (fn [[_k v]] (:uuid v)) topic-a)
(k/aggregate (constantly 0)
(fn [acc [_k v]] (+ acc (:i v)))
topic-a)
(k/to-kstream)
(k/to topic-b))))
publish (partial mock/publish driver topic-a)
uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149"
uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"]

(publish uuid-a {:uuid uuid-a :i 1})
(publish uuid-b {:uuid uuid-a :i 3})
(publish uuid-b {:uuid uuid-b :i 5})

(let [keyvals (mock/get-keyvals driver topic-b)]
(is (= [uuid-a 1] (nth keyvals 0)))
(is (= [uuid-a 4] (nth keyvals 1)))
(is (= [uuid-b 5] (nth keyvals 2))))))

(testing "aggregate: implicit kv store"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
Expand Down Expand Up @@ -898,6 +926,34 @@
(is (= [0 1] (nth keyvals 2)))
(is (= [1 3] (nth keyvals 3))))))))

(testing "aggregate: explicit kv store with custom serdes"
(let [serdes {:key-serde (jse/serde)
:value-serde (jse/serde)}
topic-a (merge (mock/topic "topic-a") serdes)
topic-b (merge (mock/topic "topic-b") serdes)
driver (mock/build-driver (fn [builder]
(-> (k/ktable builder topic-a)
(k/group-by (fn [[_k v]] [(:uuid v) v]) topic-a)
(k/aggregate (constantly 0)
(fn [acc [_k v]] (+ acc (:i v)))
(fn [acc [_k v]] (- acc (:i v)))
topic-b)
(k/to-kstream)
(k/to topic-b))))
publish (partial mock/publish driver topic-a)
uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149"
uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"]

(publish uuid-a {:uuid uuid-a :i 1})
(publish uuid-b {:uuid uuid-a :i 3})
(publish uuid-b {:uuid uuid-b :i 5})

(let [keyvals (mock/get-keyvals driver topic-b)]
(is (= [uuid-a 1] (nth keyvals 0)))
(is (= [uuid-a 4] (nth keyvals 1)))
(is (= [uuid-a 1] (nth keyvals 2)))
(is (= [uuid-b 5] (nth keyvals 3))))))

(testing "aggregate: implicit kv store"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")]
Expand Down