diff --git a/CHANGELOG.md b/CHANGELOG.md index c1770e71..8b9b04d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [0.6.7] - [2019-07-24] + +### Fixed + + * Supply `key-serde` as well as `value-serde` in `aggregate` methods + ## [0.6.6] - [2019-06-20] ### Added diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 7e26c348..f4de565e 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -430,14 +430,13 @@ (deftype CljKGroupedTable [^KGroupedTable kgroupedtable] IKGroupedBase (aggregate - [_ initializer-fn adder-fn subtractor-fn - {:keys [topic-name value-serde]}] + [_ initializer-fn adder-fn subtractor-fn topic] (clj-ktable (.aggregate ^KGroupedTable kgroupedtable ^Initializer (initializer initializer-fn) ^Aggregator (aggregator adder-fn) ^Aggregator (aggregator subtractor-fn) - (doto (Materialized/as ^String topic-name) (.withValueSerde value-serde))))) + ^Materialized (topic->materialized topic)))) (aggregate [_ initializer-fn adder-fn subtractor-fn] @@ -486,12 +485,12 @@ (deftype CljKGroupedStream [^KGroupedStream kgroupedstream] IKGroupedBase (aggregate - [_ initializer-fn aggregator-fn {:keys [topic-name value-serde]}] + [_ initializer-fn aggregator-fn topic] (clj-ktable (.aggregate ^KGroupedStream kgroupedstream ^Initializer (initializer initializer-fn) ^Aggregator (aggregator aggregator-fn) - (doto (Materialized/as ^String topic-name) (.withValueSerde value-serde))))) + ^Materialized (topic->materialized topic)))) (aggregate [_ initializer-fn aggregator-fn] @@ -547,12 +546,12 @@ (deftype CljTimeWindowedKStream [^TimeWindowedKStream windowed-kstream] IKGroupedBase (aggregate - [_ initializer-fn aggregator-fn {:keys [topic-name value-serde]}] + [_ initializer-fn aggregator-fn topic] (clj-ktable (.aggregate ^TimeWindowedKStream windowed-kstream ^Initializer (initializer initializer-fn) ^Aggregator (aggregator aggregator-fn) - (doto (Materialized/as ^String topic-name) (.withValueSerde value-serde))))) + ^Materialized (topic->materialized topic)))) (count [_] @@ -585,13 +584,13 @@ (deftype CljSessionWindowedKStream [^SessionWindowedKStream windowed-kstream] IKGroupedBase (aggregate - [_ initializer-fn aggregator-fn merger-fn {:keys [topic-name value-serde]}] + [_ initializer-fn aggregator-fn merger-fn topic] (clj-ktable (.aggregate ^SessionWindowedKStream windowed-kstream ^Initializer (initializer initializer-fn) ^Aggregator (aggregator aggregator-fn) ^Merger (merger merger-fn) - (doto (Materialized/as ^String topic-name) (.withValueSerde value-serde))))) + ^Materialized (topic->materialized topic)))) (count [_] diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index f4042b68..e94247f9 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -3,6 +3,7 @@ (:require [clojure.spec.test.alpha :as stest] [clojure.string :as string] [clojure.test :refer :all] + [jackdaw.serdes.edn :as jse] [jackdaw.streams :as k] [jackdaw.streams.configurable :as cfg] [jackdaw.streams.lambdas :as lambdas :refer [key-value]] @@ -734,6 +735,33 @@ (is (= [0 -7] (second keyvals))) (is (= [1 -8] (nth keyvals 2)))))) + (testing "aggregate: explicit kv store with custom serdes" + (let [serdes {:key-serde (jse/serde) + :value-serde (jse/serde)} + topic-a (merge (mock/topic "topic-a") serdes) + topic-b (merge (mock/topic "topic-b") serdes) + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + (k/group-by (fn [[_k v]] (:uuid v)) topic-a) + (k/aggregate (constantly 0) + (fn [acc [_k v]] (+ acc (:i v))) + topic-a) + (k/to-kstream) + (k/to topic-b)))) + publish (partial mock/publish driver topic-a) + uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149" + uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"] + + (publish uuid-a {:uuid uuid-a :i 1}) + (publish uuid-b {:uuid uuid-a :i 3}) + (publish uuid-b {:uuid uuid-b :i 5}) + + (let [keyvals (mock/get-keyvals driver topic-b)] + (is (= [uuid-a 1] (nth keyvals 0))) + (is (= [uuid-a 4] (nth keyvals 1))) + (is (= [uuid-b 5] (nth keyvals 2)))))) + (testing "aggregate: implicit kv store" (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b") @@ -898,6 +926,34 @@ (is (= [0 1] (nth keyvals 2))) (is (= [1 3] (nth keyvals 3)))))))) + (testing "aggregate: explicit kv store with custom serdes" + (let [serdes {:key-serde (jse/serde) + :value-serde (jse/serde)} + topic-a (merge (mock/topic "topic-a") serdes) + topic-b (merge (mock/topic "topic-b") serdes) + driver (mock/build-driver (fn [builder] + (-> (k/ktable builder topic-a) + (k/group-by (fn [[_k v]] [(:uuid v) v]) topic-a) + (k/aggregate (constantly 0) + (fn [acc [_k v]] (+ acc (:i v))) + (fn [acc [_k v]] (- acc (:i v))) + topic-b) + (k/to-kstream) + (k/to topic-b)))) + publish (partial mock/publish driver topic-a) + uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149" + uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"] + + (publish uuid-a {:uuid uuid-a :i 1}) + (publish uuid-b {:uuid uuid-a :i 3}) + (publish uuid-b {:uuid uuid-b :i 5}) + + (let [keyvals (mock/get-keyvals driver topic-b)] + (is (= [uuid-a 1] (nth keyvals 0))) + (is (= [uuid-a 4] (nth keyvals 1))) + (is (= [uuid-a 1] (nth keyvals 2))) + (is (= [uuid-b 5] (nth keyvals 3)))))) + (testing "aggregate: implicit kv store" (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b")]