Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flat-transform wrappers for Kafka Streams #245

Closed
wants to merge 3 commits into from
Closed

Conversation

99-not-out
Copy link
Contributor

@99-not-out 99-not-out commented Apr 27, 2020

Spotted flatTransform for a KStream and it looks worth adding. Also added a few of the common bits of wrapper / helper / sugar code which pop up from uses of transformers.

  • Add support for flat-transform and flat-transform-values
  • Add some helpers for adding state stores
  • Add some sugar for common transform use cases
  • Use some of the above to provide simple de-duping stream support in extras

With flatTransformValues note that if you return an empty list or nil, it halts processing. With transformValues, if you return nil processing is not halted and the next processor in the stream sees [k nil]. This difference can be useful if you meant to halt processing, as it avoids a subsequent filter.

@creese We really need to get your transducers changes merged too!

@99-not-out 99-not-out requested a review from a team as a code owner April 27, 2020 22:47
@99-not-out
Copy link
Contributor Author

Coverage doesn't seem to like jackdaw/streams/protocols.clj anymore 😞

.circleci/config.yml Outdated Show resolved Hide resolved
Copy link
Contributor

@kidpollo kidpollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. Some comments, Nothing blocking

@@ -200,6 +215,15 @@
([kstream value-transformer-supplier-fn state-store-names]
(p/transform-values kstream value-transformer-supplier-fn state-store-names)))

(defn flat-transform-values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not flat-transform-keys ?

@@ -323,7 +333,20 @@
(clj-kstream
(.transform ^KStream kstream
^TransformerSupplier (transformer-supplier transformer-supplier-fn)
^"[Ljava.lang.String;" (into-array String state-store-names))))
^"[Ljava.lang.String;" (into-array String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that how a string array is annotated? 🤯

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, although arrays are objects in Java they are not an instance of a class, so have a slightly funky type signature :-)

(k/with-state-store {:store-name "test-store"})
(k/kstream topic-a)
(k/transform
(lambdas/transformer-with-ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if a macro could make this feel more 'clojurey' like

(transformer-with-context [test-store k v]
(update test-store k some-fn)
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is possible a macro here as the state store names have to be repeated. I try to avoid reaching for macro's immediately however until the basic functions are there.

We could perhaps start a macros namespace, as there are probably several syntax things you could add over the basic functions for use in streams.

@@ -52,6 +52,10 @@
[_]
(source-topics streams-builder))

(with-state-store
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% on the naming here - maybe add-state-store. However this name make the code using it read nicer IMHO. (-> builder (k/with-state-store ...) (k/stream ...)) etc.

(kstream* [_]
kstream)

ITransformingKStream
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new protocol for the transform fns on a stream, which is always used wherever IKStream is used. This had to be done to allow Coverage to run over this IKStream protocol.

Its not ideal, and perhaps there is a newer / different code coverage tool to fix this.

The change is however invisible to users of Jackdaw.

@codecov
Copy link

codecov bot commented Apr 29, 2020

Codecov Report

Merging #245 into master will increase coverage by 0.70%.
The diff coverage is 90.12%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #245      +/-   ##
==========================================
+ Coverage   80.82%   81.53%   +0.70%     
==========================================
  Files          41       41              
  Lines        2519     2583      +64     
  Branches      149      149              
==========================================
+ Hits         2036     2106      +70     
+ Misses        334      328       -6     
  Partials      149      149              
Impacted Files Coverage Δ
src/jackdaw/streams/configured.clj 69.23% <ø> (ø)
src/jackdaw/streams/interop.clj 80.28% <ø> (ø)
src/jackdaw/streams/mock.clj 74.24% <25.00%> (-3.18%) ⬇️
src/jackdaw/streams.clj 83.33% <85.71%> (+1.07%) ⬆️
src/jackdaw/streams/specs.clj 97.04% <91.30%> (+0.15%) ⬆️
src/jackdaw/streams/extras.clj 35.29% <95.45%> (+28.77%) ⬆️
src/jackdaw/streams/lambdas.clj 75.71% <100.00%> (+7.78%) ⬆️
src/jackdaw/streams/protocols.clj 100.00% <100.00%> (ø)
src/jackdaw/serdes/edn2.clj 90.90% <0.00%> (+45.45%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d53da3f...0d7f9e1. Read the comment docs.

Matt Searle added 3 commits November 11, 2021 12:28
* Add support for flat-transform and flat-transform-values
* Add some helpers for adding state stores
* Add some sugar for common transform use cases
* Use some of the above to provide simple de-duping stream support in extras
* It cannot process the interop.clj namespace, which is otherwsie
fine.
@kidpollo
Copy link
Contributor

This PR and the #305 seem to have related/similar changes. @99-not-out Is there any insight on which will win hahaha or if both would be merged. I kinda want some of these stuff. Would love this merged and a new release cut sometime soon :P

@kidpollo
Copy link
Contributor

@99-not-out would it make sense to add a simple interop to set message headers?

Right now one would have to do something like:

...

(defn set-headers [context headers]
  (let [record-headers (.headers context)]
    (doseq [[k v] headers]
      (.add record-headers
            k
            (.getBytes v))))
  context)
...
(-> stream
          (j/transform-values
           (jl/value-transformer-with-ctx
            (fn [ctx v]
              (set-headers ctx {"foo" "bar"})
              v)))

@99-not-out
Copy link
Contributor Author

Going to close this PR in favour of #305
(which takes the core parts of this PR and will hopefully me merged soon!)

@99-not-out
Copy link
Contributor Author

This PR and the #305 seem to have related/similar changes. @99-not-out Is there any insight on which will win hahaha or if both would be merged. I kinda want some of these stuff. Would love this merged and a new release cut sometime soon :P

#305 will win :-)

@99-not-out
Copy link
Contributor Author

@99-not-out would it make sense to add a simple interop to set message headers?

Right now one would have to do something like:

...

(defn set-headers [context headers]
  (let [record-headers (.headers context)]
    (doseq [[k v] headers]
      (.add record-headers
            k
            (.getBytes v))))
  context)
...
(-> stream
          (j/transform-values
           (jl/value-transformer-with-ctx
            (fn [ctx v]
              (set-headers ctx {"foo" "bar"})
              v)))

I did have a bit of a think about headers a while a go - there is even a PR somewhere I think to add support in test machine.

Lets start a new PR specifically for headers @kidpollo - there are some nuances in there (like supporting encodings, so the interop isn't all byte[] based ...). Will tag you wen we start one (or feel free to start one yourself!)

@99-not-out 99-not-out closed this Nov 17, 2021
@gphilipp
Copy link
Contributor

gphilipp commented Nov 17, 2021

Superseded by #305.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants