Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 3.4 compatibility #353

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Unreleased

- Make compatible with Kafka 3.4

### [0.9.9] - [2023-01-30]

- Fix JSON_SR serde to correctly "Clojurize" deeply nested maps
Expand Down
57 changes: 21 additions & 36 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,28 @@
:repositories [["confluent" {:url "https://packages.confluent.io/maven/"}]
["mulesoft" {:url "https://repository.mulesoft.org/nexus/content/repositories/public/"}]]

:dependencies [[aleph "0.4.6"]
:dependencies [[aleph "0.6.1"]
[danlentz/clj-uuid "0.1.9"
:exclusions [primitive-math]]

;; Confluent does paired releases with Kafka, this should tie
;; off with the kafka version.
;; See https://docs.confluent.io/current/release-notes.html
[io.confluent/kafka-schema-registry-client "6.1.1"
[io.confluent/kafka-schema-registry-client "7.3.2"
:exclusions [com.fasterxml.jackson.core/jackson-databind]]
[io.confluent/kafka-avro-serializer "6.1.1"]
[io.confluent/kafka-json-schema-serializer "6.1.1"]
[org.apache.kafka/kafka-clients "2.8.0"]
[org.apache.kafka/kafka-streams "2.8.0"]
[org.apache.kafka/kafka-streams-test-utils "2.8.0"]
[io.confluent/kafka-avro-serializer "7.3.2"]
[io.confluent/kafka-json-schema-serializer "7.3.2"]
[org.apache.kafka/kafka-clients "3.4.0"]
[org.apache.kafka/kafka-streams "3.4.0"]
[org.apache.kafka/kafka-streams-test-utils "3.4.0"]

[org.clojure/clojure "1.10.1" :scope "provided"]
[org.clojure/clojure "1.11.1" :scope "provided"]
[org.clojure/java.data "1.0.95"]
[org.clojure/data.json "0.2.6"]
[org.clojure/data.fressian "0.2.1"]
[org.clojure/tools.logging "0.4.1"]
[org.clojure/core.cache "0.7.2"]
[metosin/jsonista "0.3.3"]

;; Use specific netty version to avoid critical CVE
;; pulled by Aleph v0.4.6 (last stable version)
[io.netty/netty-transport "4.1.68.Final"]
[io.netty/netty-transport-native-epoll "4.1.68.Final"]
[io.netty/netty-codec "4.1.68.Final"]
[io.netty/netty-codec-http "4.1.68.Final"]
[io.netty/netty-handler "4.1.68.Final"]
[io.netty/netty-handler-proxy "4.1.68.Final"]
[io.netty/netty-resolver "4.1.68.Final"]
[io.netty/netty-resolver-dns "4.1.68.Final"]

;; Use specific commons-compress version to avoid
;; CVE-2021-36090 pulled by avro 1.9.2
[org.apache.commons/commons-compress "1.21"]]
[org.clojure/data.json "2.4.0"]
[org.clojure/data.fressian "1.0.0"]
[org.clojure/tools.logging "1.2.4"]
[org.clojure/core.cache "1.0.225"]
[metosin/jsonista "0.3.7"]]

:aliases {"kaocha" ["run" "-m" "kaocha.runner"]}
:aot [jackdaw.serdes.edn2 jackdaw.serdes.fressian jackdaw.serdes.fn-impl]
Expand Down Expand Up @@ -87,14 +72,14 @@

:resource-paths ["test/resources"]
:injections [(require 'io.aviso.logging.setup)]
:dependencies [[io.aviso/logging "0.3.2"]
[org.apache.kafka/kafka-streams-test-utils "2.8.0"]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "0.9.0"]
[org.apache.kafka/kafka_2.13 "2.8.0"]
[lambdaisland/kaocha "0.0-529"]
[lambdaisland/kaocha-cloverage "0.0-32"]
[lambdaisland/kaocha-junit-xml "0.0-70"]]}
:dependencies [[io.aviso/logging "1.0"]
[org.apache.kafka/kafka-streams-test-utils "3.4.0"]
[org.apache.kafka/kafka-clients "3.4.0" :classifier "test"]
[org.clojure/test.check "1.1.1"]
[org.apache.kafka/kafka_2.13 "3.4.0"]
[lambdaisland/kaocha "1.80.1274"]
[lambdaisland/kaocha-cloverage "1.1.89"]
[lambdaisland/kaocha-junit-xml "1.17.101"]]}

;; This is not in fact what lein defines repl to be
:repl
Expand Down
2 changes: 1 addition & 1 deletion src/jackdaw/data/common_record.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(defn ->TimestampType
"Given a keyword being a datafied Kafka `TimestampType`, return the
equivalent `TimestampType` instance."
[kw]
^TimestampType [kw]
(case kw
:timestamp-type/create TimestampType/CREATE_TIME
:timestamp-type/log-append TimestampType/LOG_APPEND_TIME
Expand Down
11 changes: 5 additions & 6 deletions src/jackdaw/data/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
(import '[org.apache.kafka.clients.consumer
ConsumerRecord OffsetAndTimestamp]
'org.apache.kafka.common.header.Headers
'org.apache.kafka.common.record.TimestampType)
'org.apache.kafka.common.record.TimestampType
'java.util.Optional)

(set! *warn-on-reflection* true)

Expand All @@ -17,18 +18,18 @@
Convenient for testing the consumer API and its helpers."
^ConsumerRecord [{:keys [:topic-name]} partition offset ts ts-type
key-size value-size key value ^Headers headers]
(ConsumerRecord. topic-name
(ConsumerRecord. ^String topic-name
(int partition)
(long offset)
(long ts)
(if (keyword? ts-type)
(->TimestampType ts-type)
^TimestampType ts-type)
nil ;; Deprecated checksum
(int key-size)
(int value-size)
key value
headers))
^Headers headers
(Optional/empty)))

(defn map->ConsumerRecord
"Given a `::consumer-record`, build an equivalent `ConsumerRecord`.
Expand Down Expand Up @@ -56,8 +57,6 @@
:headers (.headers r)
:partition (.partition r)
:timestamp (.timestamp r)
;; Deprecated field
;; :checksum (.checksum r)
:timestamp-type (TimestampType->data (.timestampType r))
:offset (.offset r)
:serialized-key-size (.serializedKeySize r)
Expand Down
24 changes: 1 addition & 23 deletions src/jackdaw/data/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,11 @@
previous abstraction that there's a \"offset\" field which is
absolute, an additional arity is provided which lets the user
construct a record with a base offset and a relative offset of 0 so
that the metadata's apparent offset is predictable.

Note that as the checksum is deprecated, by default it is not
required. The third arity allows a user to provide a checksum. This
arity may be removed in the future pending further breaking changes
to the Kafka APIs."
that the metadata's apparent offset is predictable."
([t partition offset timestamp key-size value-size]
(RecordMetadata. (->TopicPartition t partition)
offset 0 ;; Force absolute offset
timestamp
nil ;; No checksum, it's deprecated
^Integer (when key-size (int key-size))
^Integer (when value-size (int value-size))))
([t partition base-offset relative-offset timestamp
Expand All @@ -95,16 +89,6 @@
base-offset
relative-offset ;; Full offset control
timestamp
nil ;; No checksum, it's depreciated
^Integer (when key-size (int key-size))
^Integer (when value-size (int value-size))))
([t partition base-offset relative-offset timestamp checksum
key-size value-size]
(RecordMetadata. (->TopicPartition t partition)
base-offset
relative-offset ;; Full offset control
timestamp
checksum ;; Have fun I guess
^Integer (when key-size (int key-size))
^Integer (when value-size (int value-size)))))

Expand All @@ -124,12 +108,6 @@
{:topic-name (.topic rm)
:partition (.partition rm)
:timestamp (.timestamp rm)

;; As of Kafka 0.11.0 the checksum is deprecated. It is no longer
;; part of Kafka wire protocol, while the brokers may use
;; checksuming at reset to ensure message integrity.

;; :jackdaw.sent-record/checksum (.checksum rm)
:offset (.offset rm)
:serialized-key-size (.serializedKeySize rm)
:serialized-value-size (.serializedValueSize rm)})
Expand Down
10 changes: 5 additions & 5 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[org.apache.kafka.streams
StreamsBuilder]
[org.apache.kafka.streams.kstream
Aggregator Consumed GlobalKTable Grouped Initializer Joined
Aggregator Consumed GlobalKTable Grouped Initializer Joined StreamJoined
JoinWindows KGroupedStream KGroupedTable KStream KTable
KeyValueMapper Materialized Merger Predicate Printed Produced
Reducer SessionWindowedKStream SessionWindows
Expand Down Expand Up @@ -266,7 +266,7 @@
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde this-value-serde other-value-serde))))
(StreamJoined/with key-serde this-value-serde other-value-serde))))

(left-join-windowed
[_ other-kstream value-joiner-fn windows]
Expand All @@ -285,7 +285,7 @@
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))
(StreamJoined/with key-serde value-serde other-value-serde))))

(map
[_ key-value-mapper-fn]
Expand Down Expand Up @@ -315,13 +315,13 @@
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))
(StreamJoined/with key-serde value-serde other-value-serde))))

(process!
[_ processor-supplier-fn state-store-names]
(.process ^KStream kstream
^ProcessorSupplier (processor-supplier processor-supplier-fn)
(into-array String state-store-names)))
^"[Ljava.lang.String;" (into-array String state-store-names)))

(select-key
[_ select-key-value-mapper-fn]
Expand Down
10 changes: 6 additions & 4 deletions src/jackdaw/streams/lambdas.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
Merger Predicate Reducer Transformer TransformerSupplier
ValueJoiner ValueMapper ValueTransformer ValueTransformerSupplier]
[org.apache.kafka.streams.processor
Processor ProcessorSupplier StreamPartitioner]))
StreamPartitioner]
[org.apache.kafka.streams.processor.api
Processor ProcessorSupplier]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -147,8 +149,8 @@
(close [_])
(init [_ processor-context]
(reset! context processor-context))
(process [_ key message]
(processor-fn @context key message)))
(process [_ record]
(processor-fn @context (.key record) (.value record))))

(defn processor
"Packages up a Clojure fn as a kstream processor."
Expand All @@ -162,7 +164,7 @@

(defn processor-supplier
"Packages up a Clojure fn in a kstream processor supplier."
[processor-fn]
^ProcessorSupplier [processor-fn]
(FnProcessorSupplier. processor-fn))

(deftype FnTransformerSupplier [transformer-supplier-fn]
Expand Down
3 changes: 1 addition & 2 deletions src/jackdaw/test/transports/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@
"Clojurize the ConsumerRecord returned from consuming a kafka record"
[^ConsumerRecord consumer-record]
(when consumer-record
{:checksum (.checksum consumer-record)
:key (.key consumer-record)
{:key (.key consumer-record)
:offset (.offset consumer-record)
:partition (.partition consumer-record)
:serializedKeySize (.serializedKeySize consumer-record)
Expand Down
55 changes: 34 additions & 21 deletions src/jackdaw/test/transports/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@
(:require
[clojure.stacktrace :as stacktrace]
[clojure.tools.logging :as log]
[jackdaw.serdes.fn :as jfn]
[jackdaw.streams.mock :as smock]
[jackdaw.test.journal :as j]
[jackdaw.test.transports :as t :refer [deftransport]]
[jackdaw.test.serde :refer [byte-array-deserializer
[jackdaw.test.serde :refer [byte-array-serde
apply-serializers apply-deserializers serde-map]]
[manifold.stream :as s]
[manifold.deferred :as d])
(:import
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)))
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)
(org.apache.kafka.clients.producer ProducerRecord)
(org.apache.kafka.common.header.internals RecordHeaders)
(java.util Optional)))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -40,20 +45,23 @@
node by the TopologyTestDriver"
[_topic-config]
(fn [m]
(let [record (ConsumerRecord. (get-in m [:topic :topic-name])
(let [record (ConsumerRecord. ^String (get-in m [:topic :topic-name])
(int -1)
(long -1)
(:timestamp m)
(long (:timestamp m))
TimestampType/CREATE_TIME,
(long ConsumerRecord/NULL_CHECKSUM)
(if-let [k (:key m)]
(count k)
0)
(if-let [v (:value m)]
(count v)
0)
(int
(if-let [k (:key m)]
(count k)
0))
(int
(if-let [v (:value m)]
(count v)
0))
(:key m)
(:value m))]
(:value m)
(RecordHeaders.)
(Optional/empty))]
(set-headers record (:headers m))
(assoc m :input-record record))))

Expand All @@ -79,13 +87,12 @@
[messages topic-config]
(fn [driver]
(let [fetch (fn [[k t]]
{:topic k
:output (loop [collected []]
(if-let [o (.readOutput driver (:topic-name t)
byte-array-deserializer
byte-array-deserializer)]
(recur (conj collected o))
collected))})
(let [topic-name (:topic-name t)]
{:topic k
:output (loop [collected []]
(if-let [{:keys [key value]} (smock/consume driver (assoc byte-array-serde :topic-name topic-name))]
(recur (conj collected (ProducerRecord. topic-name key value)))
collected))}))
topic-batches (->> topic-config
(map fetch)
(remove #(empty? (:output %)))
Expand Down Expand Up @@ -160,13 +167,19 @@
{:messages messages
:process process}))

(def identity-serializer (jfn/new-serializer {:serialize (fn [_ _ data] data)}))

(deftransport :mock
[{:keys [driver topics]}]
(let [serdes (serde-map topics)
test-consumer (mock-consumer driver topics (get serdes :deserializers))
record-fn (fn [input-record]
(try
(.pipeInput driver input-record)
(let [input-topic (.createInputTopic driver
(.topic input-record)
identity-serializer ;; already serialized in mock-producer
identity-serializer)]
(.pipeInput input-topic (.key input-record) (.value input-record)))
(catch Exception e
(let [trace (with-out-str
(stacktrace/print-cause-trace e))]
Expand Down
Loading