From 01979424119e9645e01e3d7b150438959b5721be Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 11:04:24 +0100 Subject: [PATCH 01/31] Add lint job to CI --- .circleci/config.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index efdee78d..899cafb3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,6 +100,14 @@ jobs: key: v1-jackdaw-repo-{{ .Branch }}-{{ .Revision }} paths: - . + lint: + executor: machine + working_directory: /home/circleci/jackdaw + steps: + - checkout + - run: ls -la + - run: docker run --volume `pwd`:/project --rm --workdir /project cljkondo/clj-kondo sh -c 'clj-kondo --lint src test' + deps: <<: *build_config steps: @@ -111,6 +119,8 @@ jobs: key: *mvn_cache_key paths: - /home/circleci/.m2 + + test: <<: *test_config steps: @@ -160,6 +170,9 @@ workflows: build_and_test: jobs: - checkout_code + - lint: + requires: + - checkout_code - deps: requires: - checkout_code From 9174bebe120dc9cdf8a256310fb635be43929113 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 11:20:41 +0100 Subject: [PATCH 02/31] Ignore lint step failure --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 899cafb3..38845f2c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -106,7 +106,7 @@ jobs: steps: - checkout - run: ls -la - - run: docker run --volume `pwd`:/project --rm --workdir /project cljkondo/clj-kondo sh -c 'clj-kondo --lint src test' + - run: docker run --volume `pwd`:/project --rm --workdir /project cljkondo/clj-kondo sh -c 'clj-kondo --lint src test' || true deps: <<: *build_config From c540649af5430c2ea77ae08121e074475e9ecfb3 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 11:23:06 +0100 Subject: [PATCH 03/31] Don't need checkout to run lint --- .circleci/config.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 38845f2c..63ac6e43 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -169,10 +169,8 @@ workflows: version: 2 build_and_test: jobs: + - lint - checkout_code - - lint: - requires: - - checkout_code - deps: requires: - checkout_code From e5ca2a9da8c9253ce8dffe6ffdbc3552b0fa147d Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 11:37:10 +0100 Subject: [PATCH 04/31] ignore unused :as binding when linting --- .clj-kondo/config.edn | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .clj-kondo/config.edn diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 00000000..e3152f9b --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1,2 @@ +{:linters {:unused-binding { ;; ignore unused :as binding. + :exclude-destructured-as true}}} \ No newline at end of file From c6a53fb58656dcf889e19f7abc6c51a11c76412d Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 12:02:32 +0100 Subject: [PATCH 05/31] [lint] Remove unused imports --- src/jackdaw/serdes/avro.clj | 4 ++-- src/jackdaw/serdes/edn.clj | 1 - src/jackdaw/serdes/edn2.clj | 4 +--- src/jackdaw/serdes/fn_impl.clj | 2 +- src/jackdaw/serdes/fressian.clj | 4 +--- src/jackdaw/streams/interop.clj | 11 ++--------- src/jackdaw/streams/protocols.clj | 4 +--- src/jackdaw/test.clj | 1 - src/jackdaw/test/fixtures.clj | 1 - src/jackdaw/test/serde.clj | 7 ++----- src/jackdaw/test/transports/kafka.clj | 1 - test/jackdaw/data_test.clj | 3 +-- test/jackdaw/serdes/avro/integration_test.clj | 4 +--- test/jackdaw/test/fixtures_test.clj | 2 +- test/jackdaw/test/transports/kafka_test.clj | 4 +--- test/jackdaw/test/transports/rest_proxy_test.clj | 4 +--- test/jackdaw/test_test.clj | 5 +---- 17 files changed, 16 insertions(+), 46 deletions(-) diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index 53caaff8..6ee04a1b 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -68,12 +68,12 @@ KafkaAvroSerializer KafkaAvroDeserializer] java.lang.CharSequence java.nio.ByteBuffer - [java.io ByteArrayOutputStream ByteArrayInputStream] + [java.io ByteArrayOutputStream] [java.util Collection Map UUID] [org.apache.avro AvroTypeException Schema$Parser Schema$ArraySchema Schema Schema$Field] [org.apache.avro.io - EncoderFactory DecoderFactory JsonEncoder] + EncoderFactory DecoderFactory] [org.apache.avro.generic GenericDatumWriter GenericDatumReader GenericContainer GenericData$Array GenericData$EnumSymbol diff --git a/src/jackdaw/serdes/edn.clj b/src/jackdaw/serdes/edn.clj index 03e7191c..36d6de89 100644 --- a/src/jackdaw/serdes/edn.clj +++ b/src/jackdaw/serdes/edn.clj @@ -11,7 +11,6 @@ (:require [clojure.edn] [jackdaw.serdes.fn :as jsfn]) (:import java.nio.charset.StandardCharsets - org.apache.kafka.common.serialization.Serde org.apache.kafka.common.serialization.Serdes)) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/serdes/edn2.clj b/src/jackdaw/serdes/edn2.clj index f4d12bb0..01590730 100644 --- a/src/jackdaw/serdes/edn2.clj +++ b/src/jackdaw/serdes/edn2.clj @@ -2,9 +2,7 @@ "Implements an EDN SerDes (Serializer/Deserializer)." (:require [clojure.edn] [jackdaw.serdes.fn :as jsfn]) - (:import java.nio.charset.StandardCharsets - org.apache.kafka.common.serialization.Serde - org.apache.kafka.common.serialization.Serdes) + (:import java.nio.charset.StandardCharsets) (:gen-class :implements [org.apache.kafka.common.serialization.Serde] :prefix "EdnSerde-" diff --git a/src/jackdaw/serdes/fn_impl.clj b/src/jackdaw/serdes/fn_impl.clj index 4c1ec51f..ffdb361d 100644 --- a/src/jackdaw/serdes/fn_impl.clj +++ b/src/jackdaw/serdes/fn_impl.clj @@ -2,7 +2,7 @@ "FIXME" {:license "BSD 3-Clause License "} (:import [org.apache.kafka.common.serialization - Deserializer Serdes Serializer])) + Deserializer Serializer])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/serdes/fressian.clj b/src/jackdaw/serdes/fressian.clj index bfccd243..edb1fff2 100644 --- a/src/jackdaw/serdes/fressian.clj +++ b/src/jackdaw/serdes/fressian.clj @@ -3,9 +3,7 @@ {:license "BSD 3-Clause License "} (:require [clojure.data.fressian :as fressian] [jackdaw.serdes.fn :as jsfn]) - (:import org.apache.kafka.common.serialization.Serde - org.apache.kafka.common.serialization.Serdes - [java.io ByteArrayOutputStream Closeable] + (:import java.io.ByteArrayOutputStream org.fressian.FressianWriter) (:gen-class :implements [org.apache.kafka.common.serialization.Serde] diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 5a59bfd2..3d04852d 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -8,12 +8,8 @@ Collection] [java.util.regex Pattern] - [org.apache.kafka.common.serialization - Serde] [java.time Duration] - [org.apache.kafka.streams - KafkaStreams] [org.apache.kafka.streams StreamsBuilder] [org.apache.kafka.streams.kstream @@ -22,11 +18,8 @@ KeyValueMapper Materialized Merger Predicate Printed Produced Reducer SessionWindowedKStream SessionWindows Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner - ValueMapper ValueMapperWithKey ValueTransformerSupplier Windows] - [org.apache.kafka.streams.processor - StreamPartitioner] - [org.apache.kafka.streams.state - KeyValueStore Stores] + ValueMapper ValueTransformerSupplier Windows] + [org.apache.kafka.streams.state Stores] (org.apache.kafka.streams.processor.api ProcessorSupplier))) diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index 77bb06af..271f8d68 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -1,9 +1,7 @@ (ns jackdaw.streams.protocols "Kafka streams protocols." {:license "BSD 3-Clause License "} - (:refer-clojure :exclude [count map merge reduce group-by filter peek]) - (:import org.apache.kafka.streams.KafkaStreams - org.apache.kafka.streams.StreamsBuilder)) + (:refer-clojure :exclude [count map merge reduce group-by filter peek])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/test.clj b/src/jackdaw/test.clj index 0e75b1bc..d2f39965 100644 --- a/src/jackdaw/test.clj +++ b/src/jackdaw/test.clj @@ -33,7 +33,6 @@ [jackdaw.test.journal :refer [with-journal]] [jackdaw.test.middleware :refer [with-timing with-status with-journal-snapshots]]) (:import - (java.io Closeable) (java.util Properties) (org.apache.kafka.streams Topology TopologyTestDriver StreamsBuilder))) diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 137a8065..09ed3c8c 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -6,7 +6,6 @@ [clojure.tools.logging :as log] [clojure.reflect :refer [resolve-class]] [jackdaw.streams :as k] - [jackdaw.streams.interop :refer [streams-builder]] [jackdaw.test.transports.kafka :as kt] [jackdaw.test.serde :refer [byte-array-serializer byte-array-deserializer]] [manifold.deferred :as d] diff --git a/src/jackdaw/test/serde.clj b/src/jackdaw/test/serde.clj index 67449a9d..23933509 100644 --- a/src/jackdaw/test/serde.clj +++ b/src/jackdaw/test/serde.clj @@ -1,14 +1,11 @@ (ns jackdaw.test.serde (:require - [clojure.tools.logging :as log] [jackdaw.serdes.edn :as edn-serde] [jackdaw.serdes.json :as json-serde]) (:import - (org.apache.kafka.clients.consumer ConsumerRecord) - (org.apache.kafka.common.serialization Deserializer Serdes Serializer + (org.apache.kafka.common.serialization Serdes ByteArraySerializer - ByteArrayDeserializer) - (org.apache.kafka.common.errors SerializationException))) + ByteArrayDeserializer))) (set! *warn-on-reflection* false) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index 9f506863..c9b08845 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -14,7 +14,6 @@ (:import org.apache.kafka.common.header.Header org.apache.kafka.clients.consumer.Consumer - org.apache.kafka.streams.KafkaStreams$StateListener org.apache.kafka.clients.consumer.ConsumerRecord org.apache.kafka.clients.producer.Producer org.apache.kafka.clients.producer.ProducerRecord)) diff --git a/test/jackdaw/data_test.clj b/test/jackdaw/data_test.clj index a9553a41..726c9d98 100644 --- a/test/jackdaw/data_test.clj +++ b/test/jackdaw/data_test.clj @@ -3,8 +3,7 @@ [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] [jackdaw.data :as data]) - (:import [org.apache.kafka.clients.producer - ProducerRecord RecordMetadata] + (:import org.apache.kafka.clients.producer.ProducerRecord [org.apache.kafka.common.header Headers Header])) diff --git a/test/jackdaw/serdes/avro/integration_test.clj b/test/jackdaw/serdes/avro/integration_test.clj index 2eeb591d..4e875b5e 100644 --- a/test/jackdaw/serdes/avro/integration_test.clj +++ b/test/jackdaw/serdes/avro/integration_test.clj @@ -9,9 +9,7 @@ [jackdaw.serdes.avro :as avro] [jackdaw.serdes.avro.schema-registry :as reg] [jackdaw.test.fixtures :as fix]) - (:import [org.apache.avro Schema$Parser] - [org.apache.avro.generic GenericData$Record] - [org.apache.kafka.common.serialization Serde Serdes])) + (:import [org.apache.kafka.common.serialization Serde Serdes])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 0107825e..fe1d7c2e 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -4,7 +4,7 @@ [clojure.test :refer :all] [jackdaw.test.fixtures :refer :all]) (:import - (org.apache.kafka.clients.admin AdminClient NewTopic))) + (org.apache.kafka.clients.admin AdminClient))) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index 66d358e1..5cf407b0 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -8,9 +8,7 @@ [jackdaw.test.journal :refer [with-journal watch-for]] [jackdaw.test.serde :as serde] [jackdaw.test.transports.kafka] - [manifold.stream :as s]) - (:import - (java.util Properties))) + [manifold.stream :as s])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index 92e169c7..e8624c67 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -12,9 +12,7 @@ [jackdaw.test.transports :as trns] [jackdaw.test.transports.rest-proxy :as proxy] [manifold.stream :as s] - [manifold.deferred :as d]) - (:import - (java.util Properties))) + [manifold.deferred :as d])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index 481271ac..d7a006f7 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -7,10 +7,7 @@ [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] [jackdaw.test.transports :as trns] - [jackdaw.test.middleware :refer [with-status]]) - (:import - (java.util Properties) - (org.apache.kafka.streams TopologyTestDriver))) + [jackdaw.test.middleware :refer [with-status]])) (set! *warn-on-reflection* false) From 067364b6f49cafa2ad7d714295ca0449a1dc7500 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 12:23:57 +0100 Subject: [PATCH 06/31] [lint] Fix namespace required but never used --- src/jackdaw/serdes/resolver.clj | 4 ++-- src/jackdaw/test/commands/watch.clj | 3 +-- src/jackdaw/test/transports/identity.clj | 3 +-- test/jackdaw/data_test.clj | 2 -- test/jackdaw/serdes/avro_test.clj | 1 - test/jackdaw/streams_test.clj | 1 - test/jackdaw/test/commands/write_test.clj | 1 - test/jackdaw/test/fixtures_test.clj | 1 - 8 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/jackdaw/serdes/resolver.clj b/src/jackdaw/serdes/resolver.clj index 7cd1b6c3..29284e1b 100644 --- a/src/jackdaw/serdes/resolver.clj +++ b/src/jackdaw/serdes/resolver.clj @@ -2,8 +2,8 @@ "Helper function for creating serdes." (:require [clojure.java.io :as io] [clojure.spec.alpha :as s] - [jackdaw.serdes.avro.confluent :as c-avro] - [jackdaw.serdes.json-schema.confluent :as c-json] + [jackdaw.serdes.avro.confluent] + [jackdaw.serdes.json-schema.confluent] [jackdaw.serdes.edn] [jackdaw.serdes.json] [jackdaw.serdes] diff --git a/src/jackdaw/test/commands/watch.clj b/src/jackdaw/test/commands/watch.clj index 5b8d81ae..c780025d 100644 --- a/src/jackdaw/test/commands/watch.clj +++ b/src/jackdaw/test/commands/watch.clj @@ -1,7 +1,6 @@ (ns jackdaw.test.commands.watch (:require - [jackdaw.test.journal :as j] - [clojure.tools.logging :as log])) + [jackdaw.test.journal :as j])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/test/transports/identity.clj b/src/jackdaw/test/transports/identity.clj index 95787877..0a923fb2 100644 --- a/src/jackdaw/test/transports/identity.clj +++ b/src/jackdaw/test/transports/identity.clj @@ -1,8 +1,7 @@ (ns jackdaw.test.transports.identity (:require - [clojure.tools.logging :as log] [manifold.stream :as s] - [jackdaw.test.transports :as t :refer [deftransport]])) + [jackdaw.test.transports :refer [deftransport]])) (set! *warn-on-reflection* true) diff --git a/test/jackdaw/data_test.clj b/test/jackdaw/data_test.clj index 726c9d98..6a52efca 100644 --- a/test/jackdaw/data_test.clj +++ b/test/jackdaw/data_test.clj @@ -1,7 +1,5 @@ (ns jackdaw.data-test (:require [clojure.test :refer :all] - [jackdaw.test.fixtures :as fix] - [jackdaw.test.serde :as serde] [jackdaw.data :as data]) (:import org.apache.kafka.clients.producer.ProducerRecord [org.apache.kafka.common.header diff --git a/test/jackdaw/serdes/avro_test.clj b/test/jackdaw/serdes/avro_test.clj index 154caaf0..7f4d4a2b 100644 --- a/test/jackdaw/serdes/avro_test.clj +++ b/test/jackdaw/serdes/avro_test.clj @@ -3,7 +3,6 @@ [clj-uuid :as uuid] [clojure.data :refer [diff]] [clojure.data.json :as json] - [clojure.pprint :refer [pprint]] [jackdaw.serdes.avro :as avro] [jackdaw.serdes.avro.schema-registry :as reg]) (:import [java.nio ByteBuffer] diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index ae8c4e0b..940df531 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -4,7 +4,6 @@ [clojure.test :refer :all] [jackdaw.serdes.edn :as jse] [jackdaw.streams :as k] - [jackdaw.streams.configurable :as cfg] [jackdaw.streams.interop :as interop] [jackdaw.streams.lambdas :as lambdas :refer [key-value]] [jackdaw.streams.lambdas.specs] diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index 99e3db12..a7ef2c16 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -4,7 +4,6 @@ [jackdaw.test.transports :as trns] [jackdaw.test.transports.kafka] [jackdaw.test.serde :as serde] - [jackdaw.test :refer [test-machine]] [clojure.test :refer :all]) (:import [clojure.lang ExceptionInfo])) diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index fe1d7c2e..20332759 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -1,6 +1,5 @@ (ns jackdaw.test.fixtures-test (:require - [clojure.java.io :as io] [clojure.test :refer :all] [jackdaw.test.fixtures :refer :all]) (:import From 33e7d4452a0af2d7f928f6bde40681db0be7fb8e Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 21:51:26 +0100 Subject: [PATCH 07/31] [lint] Fix Prefer placing return type hint on arg vector as recommended per https://clojure.org/reference/java_interop#typehints --- src/jackdaw/client.clj | 20 ++++++++++---------- src/jackdaw/data/common.clj | 4 ++-- src/jackdaw/data/consumer.clj | 8 ++++---- src/jackdaw/data/producer.clj | 12 ++++++------ src/jackdaw/serdes/avro.clj | 4 ++-- src/jackdaw/test/transports/kafka.clj | 10 +++++----- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index bc5d08bc..232e316b 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -26,16 +26,16 @@ ;;;; Producer -(defn ^KafkaProducer producer +(defn producer "Return a producer with the supplied properties and optional Serdes." - ([config] + (^KafkaProducer [config] (KafkaProducer. ^java.util.Properties (jd/map->Properties config))) - ([config {:keys [^Serde key-serde ^Serde value-serde]}] + (^KafkaProducer [config {:keys [^Serde key-serde ^Serde value-serde]}] (KafkaProducer. ^java.util.Properties (jd/map->Properties config) (.serializer key-serde) (.serializer value-serde)))) -(defn ^Callback callback +(defn callback "Return a kafka `Callback` function out of a clojure `fn`. The fn must be of 2-arity, being `[record-metadata?, ex?]` where the @@ -44,7 +44,7 @@ the record. Callbacks are `void`, so the return value is ignored." - [on-completion] + ^Callback [on-completion] (reify Callback (onCompletion [this record-meta exception] (on-completion record-meta exception)))) @@ -89,11 +89,11 @@ ;;;; Consumer -(defn ^KafkaConsumer consumer +(defn consumer "Return a consumer with the supplied properties and optional Serdes." - ([config] + (^KafkaConsumer [config] (KafkaConsumer. ^java.util.Properties (jd/map->Properties config))) - ([config {:keys [^Serde key-serde ^Serde value-serde] :as t}] + (^KafkaConsumer [config {:keys [^Serde key-serde ^Serde value-serde] :as t}] (when-not (or key-serde (get config "key.deserializer")) @@ -134,7 +134,7 @@ topic-configs)) consumer) -(defn ^KafkaConsumer subscribed-consumer +(defn subscribed-consumer "Given a broker configuration and topics, returns a consumer that is subscribed to all of the given topic descriptors. @@ -142,7 +142,7 @@ single pair of key and value serde instances. The serdes of the first requested topic are used, and all other topics are expected to be able to use same serdes." - [config topic-configs] + ^KafkaConsumer [config topic-configs] (when-not (sequential? topic-configs) (throw (ex-info "subscribed-consumer takes a seq of topics!" {:topic-configs topic-configs}))) diff --git a/src/jackdaw/data/common.clj b/src/jackdaw/data/common.clj index ec42833a..4ecfb01c 100644 --- a/src/jackdaw/data/common.clj +++ b/src/jackdaw/data/common.clj @@ -40,9 +40,9 @@ ;;; Topic partition tuples -(defn ^TopicPartition ->TopicPartition +(defn ->TopicPartition "Given unrolled ctor-style arguments, create a Kafka `TopicPartition`." - [{:keys [:topic-name]} partition] + ^TopicPartition [{:keys [:topic-name]} partition] (TopicPartition. topic-name (int partition))) (defn map->TopicPartition diff --git a/src/jackdaw/data/consumer.clj b/src/jackdaw/data/consumer.clj index 8a451274..fc0e48a8 100644 --- a/src/jackdaw/data/consumer.clj +++ b/src/jackdaw/data/consumer.clj @@ -10,11 +10,11 @@ (set! *warn-on-reflection* true) -(defn ^ConsumerRecord ->ConsumerRecord +(defn ->ConsumerRecord "Given unrolled ctor-style arguments create a Kafka `ConsumerRecord`. Convenient for testing the consumer API and its helpers." - [{:keys [:topic-name]} partition offset ts ts-type + ^ConsumerRecord [{:keys [:topic-name]} partition offset ts ts-type key-size value-size key value ^Headers headers] (ConsumerRecord. topic-name (int partition) @@ -72,8 +72,8 @@ ;;; OffsetAndTimestamp tuples -(defn ^OffsetAndTimestamp ->OffsetAndTimestamp - [{:keys [offset timestamp]}] +(defn ->OffsetAndTimestamp + ^OffsetAndTimestamp [{:keys [offset timestamp]}] (OffsetAndTimestamp. offset (long timestamp))) (defn->data OffsetAndTimestamp->data [^OffsetAndTimestamp ots] diff --git a/src/jackdaw/data/producer.clj b/src/jackdaw/data/producer.clj index b79464da..83a4b3ae 100644 --- a/src/jackdaw/data/producer.clj +++ b/src/jackdaw/data/producer.clj @@ -13,25 +13,25 @@ ;;; Producer record -(defn ^ProducerRecord ->ProducerRecord +(defn ->ProducerRecord "Given unrolled ctor-style arguments creates a Kafka `ProducerRecord`." - ([{:keys [topic-name]} value] + (^ProducerRecord [{:keys [topic-name]} value] (ProducerRecord. ^String topic-name value)) - ([{:keys [topic-name]} key value] + (^ProducerRecord [{:keys [topic-name]} key value] (ProducerRecord. ^String topic-name key value)) - ([{:keys [topic-name]} partition key value] + (^ProducerRecord [{:keys [topic-name]} partition key value] (let [partition-or-nil (if partition (int partition))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil key value))) - ([{:keys [topic-name]} partition timestamp key value] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value] (let [partition-or-nil (if partition (int partition)) timestamp-or-nil (if timestamp (long timestamp))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil ^Long timestamp-or-nil key value))) - ([{:keys [topic-name]} partition timestamp key value headers] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value headers] (let [partition-or-nil (if partition (int partition)) timestamp-or-nil (if timestamp (long timestamp))] (ProducerRecord. ^String topic-name diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index 6ee04a1b..1f52026f 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -91,10 +91,10 @@ (when schema-str (.parse (Schema$Parser.) ^String schema-str))))) -(defn- ^String mangle [^String n] +(defn- mangle ^String [^String n] (str/replace n #"-" "_")) -(defn- ^String unmangle [^String n] +(defn- unmangle ^String [^String n] (str/replace n #"_" "-")) (defn- dispatch-on-type-fields diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index c9b08845..5fb53bff 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -84,15 +84,15 @@ (.key ^Header header) (.value ^Header header))) {} (.headers consumer-record))})) -(defn ^ProducerRecord mk-producer-record +(defn mk-producer-record "Creates a kafka ProducerRecord for use with `send!`." - ([{:keys [topic-name]} value] + (^ProducerRecord [{:keys [topic-name]} value] (ProducerRecord. ^String topic-name value)) - ([{:keys [topic-name]} key value] + (^ProducerRecord [{:keys [topic-name]} key value] (ProducerRecord. ^String topic-name key value)) - ([{:keys [topic-name]} partition key value] + (^ProducerRecord [{:keys [topic-name]} partition key value] (ProducerRecord. ^String topic-name ^Integer (int partition) key value)) - ([{:keys [topic-name]} partition timestamp key value] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value] (ProducerRecord. ^String topic-name ^Integer (int partition) ^Long timestamp key value))) (defn consumer From 2fc57ae42694b4c09b3ee85e7ee8e45ab7935e06 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 22:03:54 +0100 Subject: [PATCH 08/31] Fix wrongly named namespace --- test/jackdaw/client/partitioning_test.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/jackdaw/client/partitioning_test.clj b/test/jackdaw/client/partitioning_test.clj index b807cb1e..ed2478a2 100644 --- a/test/jackdaw/client/partitioning_test.clj +++ b/test/jackdaw/client/partitioning_test.clj @@ -1,4 +1,4 @@ -(ns jackdaw.client-test +(ns jackdaw.client.partitioning-test (:require [clojure.test :refer :all] [jackdaw.client :as client] From 1de71d1c7c03b97444ca4b2f6c0f50378b462a8c Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 22:20:43 +0100 Subject: [PATCH 09/31] [lint] Remove all usage of :refer :all --- .clj-kondo/config.edn | 4 +++- src/jackdaw/streams/configured.clj | 11 ++++++++++- src/jackdaw/streams/interop.clj | 10 ++++++++-- src/jackdaw/test/transports/rest_proxy.clj | 2 +- test/jackdaw/admin_test.clj | 2 +- test/jackdaw/client/partitioning_test.clj | 2 +- test/jackdaw/client_test.clj | 2 +- test/jackdaw/data_test.clj | 2 +- test/jackdaw/serdes/edn2_test.clj | 2 +- test/jackdaw/serdes/edn_test.clj | 2 +- test/jackdaw/serdes/fressian_test.clj | 2 +- test/jackdaw/serdes/json_test.clj | 2 +- test/jackdaw/specs_test.clj | 2 +- test/jackdaw/streams_test.clj | 2 +- test/jackdaw/test/commands/base_test.clj | 2 +- test/jackdaw/test/commands/write_test.clj | 2 +- test/jackdaw/test/commands_test.clj | 2 +- test/jackdaw/test/fixtures_test.clj | 4 ++-- test/jackdaw/test/journal_test.clj | 2 +- test/jackdaw/test/middleware_test.clj | 2 +- test/jackdaw/test/transports/kafka_test.clj | 2 +- test/jackdaw/test/transports/mock_test.clj | 2 +- test/jackdaw/test/transports/rest_proxy_test.clj | 2 +- test/jackdaw/test_test.clj | 2 +- 24 files changed, 43 insertions(+), 26 deletions(-) diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index e3152f9b..fef1ff61 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -1,2 +1,4 @@ {:linters {:unused-binding { ;; ignore unused :as binding. - :exclude-destructured-as true}}} \ No newline at end of file + :exclude-destructured-as true}} + + :lint-as {jackdaw.data/defn->data clojure.core/defn}} \ No newline at end of file diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 59b5c517..306b3f91 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -2,7 +2,16 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) - (:require [jackdaw.streams.protocols :refer :all] + (:require [jackdaw.streams.protocols :refer [IGlobalKTable IKGroupedBase IKGroupedStream IKGroupedTable IKStream + IKStreamBase IKTable ISessionWindowedKStream IStreamsBuilder + ITimeWindowedKStream aggregate branch count filter filter-not flat-map + flat-map-values for-each! global-ktable global-ktable* group-by + group-by-key join join-global join-windowed kgroupedtable* kstream + kstream* kstreams ktable ktable* left-join left-join-global + left-join-windowed map map-values merge outer-join outer-join-windowed + peek print! process! reduce select-key source-topics streams-builder* + suppress through to! to-kstream transform transform-values + windowed-by-session windowed-by-time]] [jackdaw.streams.configurable :refer [config IConfigurable]])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 3d04852d..d963cf0c 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -2,8 +2,14 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) - (:require [jackdaw.streams.protocols :refer :all] - [jackdaw.streams.lambdas :refer :all]) + (:require [jackdaw.streams.protocols :refer [IGlobalKTable IKGroupedBase IKGroupedStream IKGroupedTable IKStream + IKStreamBase IKTable ISessionWindowedKStream IStreamsBuilder + ITimeWindowedKStream flat-transform flat-transform-values global-ktable* + kstream* ktable* transform transform-values]] + [jackdaw.streams.lambdas :refer [->FnStreamPartitioner aggregator foreach-action initializer + key-value-flatmapper key-value-mapper merger predicate processor-supplier + reducer select-key-value-mapper transformer-supplier value-joiner + value-mapper value-transformer-supplier]]) (:import [java.util Collection] [java.util.regex diff --git a/src/jackdaw/test/transports/rest_proxy.clj b/src/jackdaw/test/transports/rest_proxy.clj index 4cbd5894..b53afdfe 100644 --- a/src/jackdaw/test/transports/rest_proxy.clj +++ b/src/jackdaw/test/transports/rest_proxy.clj @@ -7,7 +7,7 @@ [clojure.stacktrace :as stacktrace] [jackdaw.test.journal :as j] [jackdaw.test.transports :as t :refer [deftransport]] - [jackdaw.test.serde :refer :all] + [jackdaw.test.serde :refer [apply-deserializers apply-serializers serde-map]] [manifold.stream :as s] [manifold.deferred :as d]) (:import diff --git a/test/jackdaw/admin_test.clj b/test/jackdaw/admin_test.clj index 122db45e..eeb4ab09 100644 --- a/test/jackdaw/admin_test.clj +++ b/test/jackdaw/admin_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.admin-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.admin :as admin] [jackdaw.data :as data] [manifold.deferred :as d]) diff --git a/test/jackdaw/client/partitioning_test.clj b/test/jackdaw/client/partitioning_test.clj index ed2478a2..166335cb 100644 --- a/test/jackdaw/client/partitioning_test.clj +++ b/test/jackdaw/client/partitioning_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.client.partitioning-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.client :as client] [jackdaw.client.partitioning :as part])) diff --git a/test/jackdaw/client_test.clj b/test/jackdaw/client_test.clj index c000886a..b786c37e 100644 --- a/test/jackdaw/client_test.clj +++ b/test/jackdaw/client_test.clj @@ -1,5 +1,5 @@ (ns jackdaw.client-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [are deftest is testing]] [jackdaw.admin :as admin] [jackdaw.client :as client] [jackdaw.test.fixtures :as fix] diff --git a/test/jackdaw/data_test.clj b/test/jackdaw/data_test.clj index 6a52efca..ae017567 100644 --- a/test/jackdaw/data_test.clj +++ b/test/jackdaw/data_test.clj @@ -1,5 +1,5 @@ (ns jackdaw.data-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [are deftest]] [jackdaw.data :as data]) (:import org.apache.kafka.clients.producer.ProducerRecord [org.apache.kafka.common.header diff --git a/test/jackdaw/serdes/edn2_test.clj b/test/jackdaw/serdes/edn2_test.clj index 7e9685f7..13d92a74 100644 --- a/test/jackdaw/serdes/edn2_test.clj +++ b/test/jackdaw/serdes/edn2_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.edn2-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/serdes/edn_test.clj b/test/jackdaw/serdes/edn_test.clj index 9f2ce1af..a6eb70be 100644 --- a/test/jackdaw/serdes/edn_test.clj +++ b/test/jackdaw/serdes/edn_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.edn-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/serdes/fressian_test.clj b/test/jackdaw/serdes/fressian_test.clj index 2045c719..d52b2489 100644 --- a/test/jackdaw/serdes/fressian_test.clj +++ b/test/jackdaw/serdes/fressian_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.fressian-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/serdes/json_test.clj b/test/jackdaw/serdes/json_test.clj index 60cec73d..9bfa598c 100644 --- a/test/jackdaw/serdes/json_test.clj +++ b/test/jackdaw/serdes/json_test.clj @@ -1,7 +1,7 @@ (ns jackdaw.serdes.json-test (:require [clojure.data.json :as json] [clojure.java.io :as io] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.test.check.clojure-test :as ct :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/specs_test.clj b/test/jackdaw/specs_test.clj index 22ffd2a8..2618a86e 100644 --- a/test/jackdaw/specs_test.clj +++ b/test/jackdaw/specs_test.clj @@ -1,5 +1,5 @@ (ns jackdaw.specs-test - (:require [jackdaw.specs :refer :all] + (:require [jackdaw.specs :refer [exactly-one-true?]] [clojure.test :refer [deftest are]])) (deftest exactly-one-true?-test diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 940df531..330a8cd6 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -1,7 +1,7 @@ (ns jackdaw.streams-test "Tests of the kafka streams wrapper." (:require [clojure.spec.test.alpha :as stest] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.serdes.edn :as jse] [jackdaw.streams :as k] [jackdaw.streams.interop :as interop] diff --git a/test/jackdaw/test/commands/base_test.clj b/test/jackdaw/test/commands/base_test.clj index 168b5b31..2b9ceabb 100644 --- a/test/jackdaw/test/commands/base_test.clj +++ b/test/jackdaw/test/commands/base_test.clj @@ -2,7 +2,7 @@ (:require [jackdaw.test.commands.base :as cmd] [clojure.pprint :as pprint] - [clojure.test :refer :all])) + [clojure.test :refer [deftest is testing]])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index a7ef2c16..d8812135 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -4,7 +4,7 @@ [jackdaw.test.transports :as trns] [jackdaw.test.transports.kafka] [jackdaw.test.serde :as serde] - [clojure.test :refer :all]) + [clojure.test :refer [deftest is testing]]) (:import [clojure.lang ExceptionInfo])) diff --git a/test/jackdaw/test/commands_test.clj b/test/jackdaw/test/commands_test.clj index a5f72103..e2ad492d 100644 --- a/test/jackdaw/test/commands_test.clj +++ b/test/jackdaw/test/commands_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.commands-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.test.commands :as cmd])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 20332759..9684fd52 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -1,7 +1,7 @@ (ns jackdaw.test.fixtures-test (:require - [clojure.test :refer :all] - [jackdaw.test.fixtures :refer :all]) + [clojure.test :refer [deftest is]] + [jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]]) (:import (org.apache.kafka.clients.admin AdminClient))) diff --git a/test/jackdaw/test/journal_test.clj b/test/jackdaw/test/journal_test.clj index 4932cfe7..4221f2a7 100644 --- a/test/jackdaw/test/journal_test.clj +++ b/test/jackdaw/test/journal_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.journal-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.test.journal :as jrnl])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/middleware_test.clj b/test/jackdaw/test/middleware_test.clj index 565bc4bd..5a41fcc3 100644 --- a/test/jackdaw/test/middleware_test.clj +++ b/test/jackdaw/test/middleware_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.middleware-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.tools.logging :as log] [jackdaw.test.middleware :as middle] [jackdaw.test.transports :as trns] diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index 5cf407b0..031702d5 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -1,7 +1,7 @@ (ns jackdaw.test.transports.kafka-test (:require [clojure.tools.logging :as log] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.fixtures :as fix] diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 59d7b817..31aaf8bf 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.transports.mock-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.tools.logging :as log] [jackdaw.streams :as k] [jackdaw.test.journal :refer [with-journal watch-for]] diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index e8624c67..a41e11e0 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -2,7 +2,7 @@ (:require [byte-streams :as bs] [clojure.tools.logging :as log] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.data.json :as json] [jackdaw.streams :as k] [jackdaw.test :as jd.test] diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index d7a006f7..05db7e55 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.commands :as cmd] From 6b89c6727c6f4afe306bacd65a7ec86c58d3684a Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:02:55 +0100 Subject: [PATCH 10/31] [lint] Add linter aliases for jackdaw.test.transports/deftransport and manifold.deferred/loop --- .clj-kondo/config.edn | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index fef1ff61..53114ed7 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -1,4 +1,6 @@ {:linters {:unused-binding { ;; ignore unused :as binding. :exclude-destructured-as true}} - :lint-as {jackdaw.data/defn->data clojure.core/defn}} \ No newline at end of file + :lint-as {jackdaw.data/defn->data clojure.core/defn + jackdaw.test.transports/deftransport clojure.core/defn + manifold.deferred/loop clojure.core/let}} \ No newline at end of file From 6b8f3a56927642563216cc41821932b20c50f321 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:07:59 +0100 Subject: [PATCH 11/31] [lint] Fix redundant let expression --- test/jackdaw/serdes/avro/integration_test.clj | 30 +++++++++---------- .../test/transports/rest_proxy_test.clj | 28 ++++++++--------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/test/jackdaw/serdes/avro/integration_test.clj b/test/jackdaw/serdes/avro/integration_test.clj index 4e875b5e..36815e04 100644 --- a/test/jackdaw/serdes/avro/integration_test.clj +++ b/test/jackdaw/serdes/avro/integration_test.clj @@ -38,26 +38,26 @@ (let [serde ^Serde (avro/serde +type-registry+ +mock-schema-registry+ +topic-config+)] (let [msg {:customer-id (uuid/v4) :address {:value "foo" - :key-path "foo.bar.baz"}}] - (let [serialized (-> (.serializer serde) - (.serialize "foo" msg)) - deserialized (-> (.deserializer serde) - (.deserialize "foo" serialized))] - (is (= deserialized msg))))))) + :key-path "foo.bar.baz"}} + serialized (-> (.serializer serde) + (.serialize "foo" msg)) + deserialized (-> (.deserializer serde) + (.deserialize "foo" serialized))] + (is (= deserialized msg)))))) (deftest ^:integration real-schema-registry (fix/with-fixtures [(fix/service-ready? {:http-url +real-schema-registry-url+ :http-timeout 5000})] (testing "schema registry set in config" - (let [serde ^Serde (avro/serde +type-registry+ +real-schema-registry+ +topic-config+)] - (let [msg {:customer-id (uuid/v4) - :address {:value "foo" - :key-path "foo.bar.baz"}}] - (let [serialized (-> (.serializer serde) - (.serialize "foo" msg)) - deserialized (-> (.deserializer serde) - (.deserialize "foo" serialized))] - (is (= deserialized msg)))))))) + (let [serde ^Serde (avro/serde +type-registry+ +real-schema-registry+ +topic-config+) + msg {:customer-id (uuid/v4) + :address {:value "foo" + :key-path "foo.bar.baz"}} + serialized (-> (.serializer serde) + (.serialize "foo" msg)) + deserialized (-> (.deserializer serde) + (.deserialize "foo" serialized))] + (is (= deserialized msg)))))) ;;;; Client integration tests against real Kafka through a real topic diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index a41e11e0..53b3a9fd 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -173,17 +173,17 @@ (assoc :group-config {:auto.offset.reset "earliest" :fetch.min.bytes 100 :consumer.fetch.timeout.ms 200}))) - (proxy/with-consumer))] - (let [[url options] (first @http-reqs)] - (is (= "http://localhost:8082/consumers/test-group-config" url)) - (is (= {"Accept" "application/vnd.kafka.v2+json" - "Content-Type" "application/vnd.kafka.v2+json"} - (:headers options))) - (is (= {"auto.offset.reset" "earliest" - "fetch.min.bytes" 100 - "consumer.fetch.timeout.ms" 200} - (-> (:body options) - (json/read-str) - (select-keys ["auto.offset.reset" - "fetch.min.bytes" - "consumer.fetch.timeout.ms"]))))))))) + (proxy/with-consumer)) + [url options] (first @http-reqs)] + (is (= "http://localhost:8082/consumers/test-group-config" url)) + (is (= {"Accept" "application/vnd.kafka.v2+json" + "Content-Type" "application/vnd.kafka.v2+json"} + (:headers options))) + (is (= {"auto.offset.reset" "earliest" + "fetch.min.bytes" 100 + "consumer.fetch.timeout.ms" 200} + (-> (:body options) + (json/read-str) + (select-keys ["auto.offset.reset" + "fetch.min.bytes" + "consumer.fetch.timeout.ms"])))))))) From 33a73e4d91c283308a41fbfad251c5466fb0afde Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:20:13 +0100 Subject: [PATCH 12/31] [lint] Fix empty or misplaced docstrings --- src/jackdaw/data/admin.clj | 6 ------ src/jackdaw/data/common.clj | 3 --- src/jackdaw/data/common_config.clj | 8 -------- src/jackdaw/streams/specs.clj | 1 - src/jackdaw/test/commands.clj | 1 - src/jackdaw/test/fixtures.clj | 1 - src/jackdaw/test/journal.clj | 12 +++++++----- test/jackdaw/test/fixtures_test.clj | 1 - 8 files changed, 7 insertions(+), 26 deletions(-) diff --git a/src/jackdaw/data/admin.clj b/src/jackdaw/data/admin.clj index 2e753ddc..61ef07df 100644 --- a/src/jackdaw/data/admin.clj +++ b/src/jackdaw/data/admin.clj @@ -16,7 +16,6 @@ (ConfigEntry. k (:value v)))) (defn->data ConfigEntry->data - "" [^ConfigEntry e] {:name (.name e) :value (.value e) @@ -27,13 +26,11 @@ ;;; Config (defn map->Config - "" ^Config [m] (Config. (map (partial apply ->ConfigEntry) m))) (defn->data Config->data - "" [^Config c] (into {} (comp (map ConfigEntry->data) @@ -44,7 +41,6 @@ ;;; TopicDescription (defn->data TopicDescription->data - "" [^TopicDescription td] {:is-internal? (.isInternal td) :partition-info (map datafy (.partitions td))}) @@ -52,7 +48,6 @@ ;;; NewTopic (defn map->NewTopic - "" [{:keys [:topic-name :partition-count :replication-factor @@ -71,7 +66,6 @@ ;;;; Result types (defn->data DescribeClusterResult->data - "" [^DescribeClusterResult dcr] {:cluster-id (-> dcr .clusterId .get) :controller (-> dcr .controller .get datafy) diff --git a/src/jackdaw/data/common.clj b/src/jackdaw/data/common.clj index 4ecfb01c..1cf23934 100644 --- a/src/jackdaw/data/common.clj +++ b/src/jackdaw/data/common.clj @@ -10,7 +10,6 @@ ;;; Node (defn->data Node->data - "" [^Node node] {:host (.host node) :port (.port node) @@ -31,7 +30,6 @@ ;;; TopicPartitionInfo (defn->data TopicPartitionInfo->data - "" [^TopicPartitionInfo tpi] {:isr (mapv datafy (.isr tpi)) :leader (datafy (.leader tpi)) @@ -59,7 +57,6 @@ :partition (.partition tp)}) (defn as-TopicPartition - "" ^TopicPartition [o] (cond (instance? TopicPartition o) o diff --git a/src/jackdaw/data/common_config.clj b/src/jackdaw/data/common_config.clj index d6044f4e..e4d6bb24 100644 --- a/src/jackdaw/data/common_config.clj +++ b/src/jackdaw/data/common_config.clj @@ -8,15 +8,12 @@ ;;; ConfigResource.Type (def +broker-config-resource-type+ - "" ConfigResource$Type/BROKER) (def +topic-config-resource-type+ - "" ConfigResource$Type/TOPIC) (def +unknown-config-resource-type+ - "" ConfigResource$Type/UNKNOWN) (defn ->ConfigResourceType [o] @@ -26,7 +23,6 @@ +unknown-config-resource-type+)) (defn->data ConfigResourceType->data - "" [^ConfigResource$Type crt] (cond (= +broker-config-resource-type+ crt) :config-resource/broker @@ -40,22 +36,18 @@ ;;; ConfigResource (defn ->ConfigResource - "" [^ConfigResource$Type type ^String name] (ConfigResource. type name)) (defn ->topic-resource - "" [name] (->ConfigResource +topic-config-resource-type+ name)) (defn ->broker-resource - "" [name] (->ConfigResource +broker-config-resource-type+ name)) (defn->data ConfigResource->data - "" [^ConfigResource cr] {:name (.name cr) :type (datafy (.type cr))}) diff --git a/src/jackdaw/streams/specs.clj b/src/jackdaw/streams/specs.clj index 593cf0b9..bb3ae095 100644 --- a/src/jackdaw/streams/specs.clj +++ b/src/jackdaw/streams/specs.clj @@ -1,5 +1,4 @@ (ns jackdaw.streams.specs - "" {:license "BSD 3-Clause License "} (:require [clojure.spec.alpha :as s] [jackdaw.specs] diff --git a/src/jackdaw/test/commands.clj b/src/jackdaw/test/commands.clj index 728249ca..8ae9c498 100644 --- a/src/jackdaw/test/commands.clj +++ b/src/jackdaw/test/commands.clj @@ -1,5 +1,4 @@ (ns jackdaw.test.commands - "" (:require [clojure.spec.alpha :as s] [jackdaw.test.commands.base :as base] diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 09ed3c8c..67c03a16 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -1,5 +1,4 @@ (ns jackdaw.test.fixtures - "" (:require [aleph.http :as http] [clojure.java.io :as io] diff --git a/src/jackdaw/test/journal.clj b/src/jackdaw/test/journal.clj index 459414a7..d703ee84 100644 --- a/src/jackdaw/test/journal.clj +++ b/src/jackdaw/test/journal.clj @@ -1,5 +1,4 @@ (ns jackdaw.test.journal - "" (:require [clojure.set :refer [subset?]] [clojure.tools.logging :as log] @@ -56,9 +55,9 @@ (get m topic))) (defn journal-result - [machine record] "Journals the `record` in the appropriate place in the supplied test machine's `:journal`" + [machine record] (let [journal (:journal machine)] (if-let [err (agent-error journal)] (throw err) @@ -122,27 +121,30 @@ [journal topic-name ks value] (messages-by-kv-fn journal topic-name ks #(= value %))) -(defn by-key [topic-name ks value] +(defn by-key "Returns the first message in the topic where attribute 'ks' is equal to 'value'. Can be combined with the :watch command to assert that a message has been published: [:watch (j/by-key :result-topic [:object :color] \"red\")]" + [topic-name ks value] (fn [journal] (first (messages-by-kv journal topic-name ks value)))) -(defn by-keys [topic-name ks values] +(defn by-keys "Returns all of the messages in the topic where attribute 'ks' is equal to one of the values. Can be combined with the :watch command to assert that messages have been published: [:watch (j/by-key :result-topic [:object :color] #{\"red\" \"green\" \"blue\"})]" + [topic-name ks values] (fn [journal] (messages-by-kv-fn journal topic-name ks (set values)))) -(defn by-id [topic-name value] +(defn by-id "Returns all of the messages in the topic with an id of `value`. Can be combined with the :watch command to assert that a message with the supplied id has been published: [:watch (j/by-id :result-topic 123)]" + [topic-name value] (by-key topic-name [:id] value)) (defn all-keys-present diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 9684fd52..9e6875ad 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -34,7 +34,6 @@ (is (topic-exists? client topic-foo))))) (defn test-resetter - "" {:style/indent 1} [{:keys [app-config reset-params reset-fn]} assertion-fn] (let [reset-args (atom []) From c7164c4f7135e06318b49812fa3d76eb0bcdf95a Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:25:02 +0100 Subject: [PATCH 13/31] [lint] Fix unresolved symbol --- src/jackdaw/data/consumer.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/jackdaw/data/consumer.clj b/src/jackdaw/data/consumer.clj index fc0e48a8..c6010f66 100644 --- a/src/jackdaw/data/consumer.clj +++ b/src/jackdaw/data/consumer.clj @@ -6,7 +6,8 @@ (import '[org.apache.kafka.clients.consumer ConsumerRecord OffsetAndTimestamp] - 'org.apache.kafka.common.header.Headers) + 'org.apache.kafka.common.header.Headers + 'org.apache.kafka.common.record.TimestampType) (set! *warn-on-reflection* true) From a6fc7fafd53fe59d033084ed40ed5ebd097c06af Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:30:33 +0100 Subject: [PATCH 14/31] [lint] Fix missing else branch --- src/jackdaw/data/producer.clj | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/jackdaw/data/producer.clj b/src/jackdaw/data/producer.clj index 83a4b3ae..770fd990 100644 --- a/src/jackdaw/data/producer.clj +++ b/src/jackdaw/data/producer.clj @@ -20,20 +20,20 @@ (^ProducerRecord [{:keys [topic-name]} key value] (ProducerRecord. ^String topic-name key value)) (^ProducerRecord [{:keys [topic-name]} partition key value] - (let [partition-or-nil (if partition (int partition))] + (let [partition-or-nil (when partition (int partition))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil key value))) (^ProducerRecord [{:keys [topic-name]} partition timestamp key value] - (let [partition-or-nil (if partition (int partition)) - timestamp-or-nil (if timestamp (long timestamp))] + (let [partition-or-nil (when partition (int partition)) + timestamp-or-nil (when timestamp (long timestamp))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil ^Long timestamp-or-nil key value))) (^ProducerRecord [{:keys [topic-name]} partition timestamp key value headers] - (let [partition-or-nil (if partition (int partition)) - timestamp-or-nil (if timestamp (long timestamp))] + (let [partition-or-nil (when partition (int partition)) + timestamp-or-nil (when timestamp (long timestamp))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil ^Long timestamp-or-nil @@ -87,8 +87,8 @@ offset 0 ;; Force absolute offset timestamp nil ;; No checksum, it's deprecated - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size)))) + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size)))) ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) @@ -96,8 +96,8 @@ relative-offset ;; Full offset control timestamp nil ;; No checksum, it's depreciated - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size)))) + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size)))) ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp checksum key-size value-size] (RecordMetadata. (->TopicPartition t partition) @@ -105,8 +105,8 @@ relative-offset ;; Full offset control timestamp checksum ;; Have fun I guess - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size))))) + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size))))) (defn map->RecordMetadata "Given a `::record-metdata`, build an equivalent `RecordMetadata`. From 431448b458a1f4d556aa6f25ac85f27de4b448a5 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:48:03 +0100 Subject: [PATCH 15/31] [lint] Fix unresolved symbols --- src/jackdaw/streams/interop.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index d963cf0c..e1570f39 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -24,7 +24,7 @@ KeyValueMapper Materialized Merger Predicate Printed Produced Reducer SessionWindowedKStream SessionWindows Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner - ValueMapper ValueTransformerSupplier Windows] + ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier] [org.apache.kafka.streams.state Stores] (org.apache.kafka.streams.processor.api ProcessorSupplier))) From bcc4c56f2f1fb04afba28d2a8c0dcb2ef64cfbc2 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:48:44 +0100 Subject: [PATCH 16/31] [lint] Add linter alias for clojure.test.check.clojure-test/defspec --- .clj-kondo/config.edn | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index 53114ed7..9196f567 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -1,6 +1,7 @@ {:linters {:unused-binding { ;; ignore unused :as binding. :exclude-destructured-as true}} - :lint-as {jackdaw.data/defn->data clojure.core/defn + :lint-as {clojure.test.check.clojure-test/defspec clojure.core/def + jackdaw.data/defn->data clojure.core/defn jackdaw.test.transports/deftransport clojure.core/defn manifold.deferred/loop clojure.core/let}} \ No newline at end of file From d35d6524de74efd29f6c68d136513b9104e6eeef Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:53:40 +0100 Subject: [PATCH 17/31] [lint] Fix incorrect format string --- src/jackdaw/test/transports/kafka.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index 5fb53bff..f4aa376a 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -201,7 +201,7 @@ :else (do (.close ^Producer producer) - (log/infof "stopped kafka producer: " + (log/infof "stopped kafka producer: %s" (select-keys kafka-config ["bootstrap.servers" "group.id"])))))))] {:producer producer From ac6b51c1dbdb80e731f49dccf5eb374e1200ddf4 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:57:52 +0100 Subject: [PATCH 18/31] [lint] Fix redundant do --- src/jackdaw/test/transports/mock.clj | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index 955b57e4..4b29c020 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -155,8 +155,7 @@ :offset (.offset input-record)}) (d/recur (s/take! messages))) - :else (do - (log/infof "stopped mock producer: %s" {:driver driver}))))))] + :else (log/infof "stopped mock producer: %s" {:driver driver})))))] {:messages messages :process process})) From 32753ed40721856b2ed35a899134bffc8eddf45d Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 1 Jun 2022 23:58:12 +0100 Subject: [PATCH 19/31] [lint] Fix redundant let --- test/jackdaw/serdes/avro/integration_test.clj | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/test/jackdaw/serdes/avro/integration_test.clj b/test/jackdaw/serdes/avro/integration_test.clj index 36815e04..1349777f 100644 --- a/test/jackdaw/serdes/avro/integration_test.clj +++ b/test/jackdaw/serdes/avro/integration_test.clj @@ -35,15 +35,15 @@ (deftest mock-schema-registry (testing "schema can be serialized by registry client" - (let [serde ^Serde (avro/serde +type-registry+ +mock-schema-registry+ +topic-config+)] - (let [msg {:customer-id (uuid/v4) - :address {:value "foo" - :key-path "foo.bar.baz"}} - serialized (-> (.serializer serde) - (.serialize "foo" msg)) - deserialized (-> (.deserializer serde) - (.deserialize "foo" serialized))] - (is (= deserialized msg)))))) + (let [serde ^Serde (avro/serde +type-registry+ +mock-schema-registry+ +topic-config+) + msg {:customer-id (uuid/v4) + :address {:value "foo" + :key-path "foo.bar.baz"}} + serialized (-> (.serializer serde) + (.serialize "foo" msg)) + deserialized (-> (.deserializer serde) + (.deserialize "foo" serialized))] + (is (= deserialized msg))))) (deftest ^:integration real-schema-registry (fix/with-fixtures [(fix/service-ready? {:http-url +real-schema-registry-url+ From df11e1cdb4e3672cfeafb3f58bfa4227a17f3d1d Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Thu, 2 Jun 2022 00:03:38 +0100 Subject: [PATCH 20/31] [lint] Remove unused referred symbol --- src/jackdaw/streams/configured.clj | 2 +- src/jackdaw/test/transports/mock.clj | 2 +- test/jackdaw/test/transports/kafka_test.clj | 2 +- test/jackdaw/test/transports/mock_test.clj | 2 +- test/jackdaw/test/transports/rest_proxy_test.clj | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 306b3f91..1bbebf92 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -12,7 +12,7 @@ peek print! process! reduce select-key source-topics streams-builder* suppress through to! to-kstream transform transform-values windowed-by-session windowed-by-time]] - [jackdaw.streams.configurable :refer [config IConfigurable]])) + [jackdaw.streams.configurable :refer [IConfigurable]])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index 4b29c020..c7d13cea 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -4,7 +4,7 @@ [clojure.tools.logging :as log] [jackdaw.test.journal :as j] [jackdaw.test.transports :as t :refer [deftransport]] - [jackdaw.test.serde :refer [byte-array-serializer byte-array-deserializer + [jackdaw.test.serde :refer [byte-array-deserializer apply-serializers apply-deserializers serde-map]] [manifold.stream :as s] [manifold.deferred :as d]) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index 031702d5..c5d5cf4c 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -5,7 +5,7 @@ [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.fixtures :as fix] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.serde :as serde] [jackdaw.test.transports.kafka] [manifold.stream :as s])) diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 31aaf8bf..f53ddf3e 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -3,7 +3,7 @@ [clojure.test :refer [deftest is testing]] [clojure.tools.logging :as log] [jackdaw.streams :as k] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test :as jd.test] [jackdaw.test.transports :as trns] [jackdaw.test.serde :as serde] diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index 53b3a9fd..5377ed47 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -8,7 +8,7 @@ [jackdaw.test :as jd.test] [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.transports :as trns] [jackdaw.test.transports.rest-proxy :as proxy] [manifold.stream :as s] From 5e92ee353eec5041a18c16d6e18de77ce808eae5 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Thu, 2 Jun 2022 00:07:30 +0100 Subject: [PATCH 21/31] [lint] Add missing require --- src/jackdaw/client/partitioning.clj | 9 +++++---- src/jackdaw/test/transports/rest_proxy.clj | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/jackdaw/client/partitioning.clj b/src/jackdaw/client/partitioning.clj index 63634d32..234af265 100644 --- a/src/jackdaw/client/partitioning.clj +++ b/src/jackdaw/client/partitioning.clj @@ -34,7 +34,8 @@ in `jackdaw.client` but backed by the partitioning machinery." {:license "BSD 3-Clause License "} - (:require [jackdaw.client :as jc] + (:require [clojure.string :as str] + [jackdaw.client :as jc] [jackdaw.data :as jd]) (:import org.apache.kafka.clients.producer.Producer org.apache.kafka.common.serialization.Serde @@ -49,9 +50,9 @@ [{:keys [record-key] :as t}] (let [record-key (as-> record-key % (-> % - (clojure.string/replace "$." "") - (clojure.string/replace "_" "-") - (clojure.string/split #"\.")) + (str/replace "$." "") + (str/replace "_" "-") + (str/split #"\.")) (mapv keyword %))] (assoc t ::key-fn #(get-in % record-key)))) diff --git a/src/jackdaw/test/transports/rest_proxy.clj b/src/jackdaw/test/transports/rest_proxy.clj index b53afdfe..570b9be8 100644 --- a/src/jackdaw/test/transports/rest_proxy.clj +++ b/src/jackdaw/test/transports/rest_proxy.clj @@ -5,6 +5,7 @@ [clojure.data.json :as json] [clojure.tools.logging :as log] [clojure.stacktrace :as stacktrace] + [clojure.string :as str] [jackdaw.test.journal :as j] [jackdaw.test.transports :as t :refer [deftransport]] [jackdaw.test.serde :refer [apply-deserializers apply-serializers serde-map]] @@ -85,7 +86,7 @@ (json/read-str (:body %) :key-fn (comp keyword (fn [x] - (clojure.string/replace x "_" "-")))))) + (str/replace x "_" "-")))))) #(if-not (ok? (:status %)) (assoc % :error :proxy-error) %)))) From 6a2aa1d3d79b4a8ec8bc4f0e0c963830447b31d4 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Thu, 2 Jun 2022 00:08:07 +0100 Subject: [PATCH 22/31] [lint] Remove unused require --- src/jackdaw/test/commands/write.clj | 1 - 1 file changed, 1 deletion(-) diff --git a/src/jackdaw/test/commands/write.clj b/src/jackdaw/test/commands/write.clj index 9a79abee..aed0eb21 100644 --- a/src/jackdaw/test/commands/write.clj +++ b/src/jackdaw/test/commands/write.clj @@ -1,7 +1,6 @@ (ns jackdaw.test.commands.write (:require [manifold.stream :as s] - [clojure.tools.logging :as log] [jackdaw.client.partitioning :as partitioning])) (set! *warn-on-reflection* true) From aff8e0e122f556913fe1e640dbe9fdd9fb19b64d Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Tue, 7 Jun 2022 12:43:33 +0100 Subject: [PATCH 23/31] [lint] Remove unused bindings Only left the more tricky ones which will be tackled in a later commit --- src/jackdaw/admin.clj | 2 +- src/jackdaw/client.clj | 2 +- src/jackdaw/client/partitioning.clj | 16 +++--- src/jackdaw/data/common.clj | 6 +-- src/jackdaw/data/consumer.clj | 3 +- src/jackdaw/data/producer.clj | 6 +-- src/jackdaw/serdes/avro.clj | 25 ++++----- src/jackdaw/streams/extras.clj | 8 +-- src/jackdaw/streams/lambdas.clj | 42 +++++++-------- src/jackdaw/test.clj | 2 +- src/jackdaw/test/commands/base.clj | 6 +-- src/jackdaw/test/commands/write.clj | 2 +- src/jackdaw/test/fixtures.clj | 17 +++--- src/jackdaw/test/journal.clj | 2 +- src/jackdaw/test/serde.clj | 4 +- src/jackdaw/test/transports/kafka.clj | 4 +- src/jackdaw/test/transports/mock.clj | 2 +- src/jackdaw/test/transports/rest_proxy.clj | 12 ++--- test/jackdaw/admin_test.clj | 13 +++-- test/jackdaw/client_test.clj | 54 +++++++++---------- test/jackdaw/serdes/fressian_test.clj | 2 +- test/jackdaw/serdes/json_test.clj | 3 +- test/jackdaw/streams_test.clj | 50 ++++++++--------- test/jackdaw/test/commands/write_test.clj | 1 - test/jackdaw/test/fixtures_test.clj | 2 +- test/jackdaw/test/transports/kafka_test.clj | 2 - test/jackdaw/test/transports/mock_test.clj | 4 +- .../test/transports/rest_proxy_test.clj | 2 - test/jackdaw/test_test.clj | 12 ++--- 29 files changed, 141 insertions(+), 165 deletions(-) diff --git a/src/jackdaw/admin.clj b/src/jackdaw/admin.clj index fab15554..7d0ca4c1 100644 --- a/src/jackdaw/admin.clj +++ b/src/jackdaw/admin.clj @@ -158,7 +158,7 @@ {:pre [(client? client) (sequential? topics)]} (->> @(describe-topics* client (map :topic-name topics)) - (every? (fn [[topic-name {:keys [partition-info]}]] + (every? (fn [[_topic-name {:keys [partition-info]}]] (every? (fn [part-info] (and (boolean (:leader part-info)) (seq (:isr part-info)))) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index 232e316b..cbcd0bd7 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -46,7 +46,7 @@ Callbacks are `void`, so the return value is ignored." ^Callback [on-completion] (reify Callback - (onCompletion [this record-meta exception] + (onCompletion [_this record-meta exception] (on-completion record-meta exception)))) (defn send! diff --git a/src/jackdaw/client/partitioning.clj b/src/jackdaw/client/partitioning.clj index 234af265..2b07550a 100644 --- a/src/jackdaw/client/partitioning.clj +++ b/src/jackdaw/client/partitioning.clj @@ -64,7 +64,7 @@ (defn default-partition "The kafka default partitioner. As a `::partition-fn`" - [{:keys [topic-name key-serde]} key value partitions] + [{:keys [topic-name key-serde]} key _value partitions] (let [key-bytes (.serialize (.serializer ^Serde key-serde) topic-name key)] (default-partitioner* key-bytes partitions))) @@ -92,11 +92,11 @@ (partition-fn t key value %) (->ProducerRecord producer t % key value)) (jd/->ProducerRecord t key value))) - ([^Producer producer topic partition key value] + ([^Producer _producer topic partition key value] (jd/->ProducerRecord topic (int partition) key value)) - ([^Producer producer topic partition timestamp key value] + ([^Producer _producer topic partition timestamp key value] (jd/->ProducerRecord topic partition timestamp key value)) - ([^Producer producer topic partition timestamp key value headers] + ([^Producer _producer topic partition timestamp key value headers] (jd/->ProducerRecord topic partition timestamp key value headers))) (defn produce! @@ -108,15 +108,15 @@ ([producer topic value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic key value] + ([producer topic _key value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic partition key value] + ([producer topic partition _key value] (jc/send! producer (->ProducerRecord producer topic partition topic value))) - ([producer topic partition timestamp key value] + ([producer topic partition timestamp _key value] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value))) - ([producer topic partition timestamp key value headers] + ([producer topic partition timestamp _key value headers] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value headers)))) diff --git a/src/jackdaw/data/common.clj b/src/jackdaw/data/common.clj index 1cf23934..054d1d57 100644 --- a/src/jackdaw/data/common.clj +++ b/src/jackdaw/data/common.clj @@ -44,12 +44,10 @@ (TopicPartition. topic-name (int partition))) (defn map->TopicPartition - "Given a `::topic-parititon`, build an equivalent `TopicPartition`. + "Given a `topic-partition`, build an equivalent `TopicPartition`. Inverts `(datafy ^TopicPartition tp)`." - [{:keys [topic-name - partition] - :as m}] + [{:keys [partition] :as m}] (->TopicPartition m partition)) (defn->data TopicPartition->data [^TopicPartition tp] diff --git a/src/jackdaw/data/consumer.clj b/src/jackdaw/data/consumer.clj index c6010f66..04a8c485 100644 --- a/src/jackdaw/data/consumer.clj +++ b/src/jackdaw/data/consumer.clj @@ -81,8 +81,7 @@ {:offset (.offset ots) :timestamp (.timestamp ots)}) -(defn map->OffsetAndTimestamp - [{:keys [offset timestamp] :as m}] +(defn map->OffsetAndTimestamp [m] (->OffsetAndTimestamp m)) (defn as-OffsetAndTimestamp diff --git a/src/jackdaw/data/producer.clj b/src/jackdaw/data/producer.clj index 770fd990..87c61f32 100644 --- a/src/jackdaw/data/producer.clj +++ b/src/jackdaw/data/producer.clj @@ -82,14 +82,14 @@ required. The third arity allows a user to provide a checksum. This arity may be removed in the future pending further breaking changes to the Kafka APIs." - ([{:keys [:topic-name] :as t} partition offset timestamp key-size value-size] + ([t partition offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) offset 0 ;; Force absolute offset timestamp nil ;; No checksum, it's deprecated ^Integer (when key-size (int key-size)) ^Integer (when value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp + ([t partition base-offset relative-offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset @@ -98,7 +98,7 @@ nil ;; No checksum, it's depreciated ^Integer (when key-size (int key-size)) ^Integer (when value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp checksum + ([t partition base-offset relative-offset timestamp checksum key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index 1f52026f..b53616ca 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -198,7 +198,7 @@ (try (and (number? x) (coercion-fn (bigint x))) - (catch RuntimeException e + (catch RuntimeException _e false))) (defrecord DoubleType [] @@ -260,12 +260,14 @@ (defrecord SchemalessType [] SchemaCoercion - (match-clj? [_ x] + (match-clj? [_schema-type _clj-data] true) - (match-avro? [_ x] + (match-avro? [_schema-type _avro-data] true) - (avro->clj [_ x] x) - (clj->avro [_ x path] x)) + (avro->clj [_schema-type avro-data] + avro-data) + (clj->avro [_schema-type clj-data _path] + clj-data)) ;; UUID :disapprove: @@ -278,7 +280,7 @@ (avro->clj [_ uuid-utf8] (try (UUID/fromString (str uuid-utf8)) - (catch Exception e + (catch Exception _e (str uuid-utf8)))) (clj->avro [this uuid path] (validate-clj! this uuid path "uuid") @@ -639,17 +641,14 @@ (get @coercion-cache avro-schema))))) (defn- coercion-type - [avro-schema {:keys [type-registry - coercion-cache] :as coercion-stack}] + [avro-schema coercion-stack] ((schema->coercion coercion-stack) avro-schema)) (defn as-json "Returns the json representation of the supplied `edn+avro` `edn+avro` is an avro object represented as an edn object (compatible with the jackdaw avro serde)" - [{:keys [type-registry - avro-schema - coercion-cache] :as coercion-stack} edn+avro] + [{:keys [avro-schema] :as coercion-stack} edn+avro] (let [schema (parse-schema-str avro-schema) record (clj->avro (coercion-type schema coercion-stack) edn+avro []) out-stream (ByteArrayOutputStream.) @@ -665,9 +664,7 @@ "Returns the edn representation of the supplied `json+avro` `json+avro` is an avro object represented as a json string" - [{:keys [type-registry - coercion-cache - avro-schema] :as coercion-stack} json+avro] + [{:keys [avro-schema] :as coercion-stack} json+avro] (let [schema (parse-schema-str avro-schema) decoder (.jsonDecoder ^DecoderFactory (DecoderFactory.) ^Schema schema diff --git a/src/jackdaw/streams/extras.clj b/src/jackdaw/streams/extras.clj index 1bf75ed9..7646702c 100644 --- a/src/jackdaw/streams/extras.clj +++ b/src/jackdaw/streams/extras.clj @@ -46,15 +46,15 @@ (^void onBatchRestored [_ ^TopicPartition topicPartition - ^String storeName - ^long batchEndOffset - ^long numRestored] + ^String _storeName + ^long _batchEndOffset + ^long _numRestored] (log/warnf "Restored a batch from (%s.%d)" (.topic topicPartition) (.partition topicPartition))) (^void onRestoreEnd [_ ^TopicPartition topicPartition ^String storeName - ^long totalRestored] + ^long _totalRestored] (let [start-date (get @restore-tracker storeName) elapsed-sec (.getSeconds (Duration/between start-date (Instant/now)))] diff --git a/src/jackdaw/streams/lambdas.clj b/src/jackdaw/streams/lambdas.clj index 7f2341e9..a6f4d0b6 100644 --- a/src/jackdaw/streams/lambdas.clj +++ b/src/jackdaw/streams/lambdas.clj @@ -18,7 +18,7 @@ (deftype FnAggregator [aggregator-fn] Aggregator - (apply [this agg-key value aggregate] + (apply [_this agg-key value aggregate] (aggregator-fn aggregate [agg-key value]))) (defn aggregator @@ -28,7 +28,7 @@ (deftype FnForeachAction [foreach-action-fn] ForeachAction - (apply [this key value] + (apply [_this key value] (foreach-action-fn [key value]) nil)) @@ -39,7 +39,7 @@ (deftype FnInitializer [initializer-fn] Initializer - (apply [this] + (apply [_this] (initializer-fn))) (defn initializer @@ -49,7 +49,7 @@ (deftype FnKeyValueMapper [key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (key-value (key-value-mapper-fn [key value])))) (defn key-value-mapper @@ -59,7 +59,7 @@ (deftype FnSelectKeyValueMapper [select-key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (select-key-value-mapper-fn [key value]))) (defn select-key-value-mapper @@ -70,7 +70,7 @@ (deftype FnKeyValueFlatMapper [key-value-flatmapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (mapv key-value (key-value-flatmapper-fn [key value])))) (defn key-value-flatmapper @@ -83,7 +83,7 @@ (deftype FnMerger [merger-fn] Merger - (apply [this agg-key aggregate1 aggregate2] + (apply [_this agg-key aggregate1 aggregate2] (merger-fn agg-key aggregate1 aggregate2))) (defn merger @@ -93,7 +93,7 @@ (deftype FnPredicate [predicate-fn] Predicate - (test [this key value] + (test [_this key value] (boolean (predicate-fn [key value])))) (defn predicate @@ -103,7 +103,7 @@ (deftype FnReducer [reducer-fn] Reducer - (apply [this value1 value2] + (apply [_this value1 value2] (reducer-fn value1 value2))) (defn reducer @@ -113,7 +113,7 @@ (deftype FnValueJoiner [value-joiner-fn] ValueJoiner - (apply [this value1 value2] + (apply [_this value1 value2] (value-joiner-fn value1 value2))) (defn value-joiner @@ -123,7 +123,7 @@ (deftype FnValueMapper [value-mapper-fn] ValueMapper - (apply [this value] + (apply [_this value] (value-mapper-fn value))) (defn value-mapper @@ -133,7 +133,7 @@ (deftype FnStreamPartitioner [stream-partitioner-fn] StreamPartitioner - (partition [this topic-name key val partition-count] + (partition [_this topic-name key val partition-count] (stream-partitioner-fn topic-name key val partition-count))) (defn stream-partitioner @@ -157,7 +157,7 @@ (deftype FnProcessorSupplier [processor-supplier-fn] ProcessorSupplier - (get [this] + (get [_this] (processor processor-supplier-fn))) (defn processor-supplier @@ -167,7 +167,7 @@ (deftype FnTransformerSupplier [transformer-supplier-fn] TransformerSupplier - (get [this] + (get [_this] (transformer-supplier-fn))) (defn transformer-supplier @@ -177,7 +177,7 @@ (deftype FnValueTransformerSupplier [value-transformer-supplier-fn] ValueTransformerSupplier - (get [this] + (get [_this] (value-transformer-supplier-fn))) (defn value-transformer-supplier @@ -187,10 +187,10 @@ (deftype FnTransformer [context xfm-fn] Transformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this k v] + (close [_this]) + (transform [_this k v] (xfm-fn @context k v))) (defn transformer-with-ctx @@ -211,10 +211,10 @@ (deftype FnValueTransformer [context xfm-fn] ValueTransformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this v] + (close [_this]) + (transform [_this v] (xfm-fn @context v))) (defn value-transformer-with-ctx diff --git a/src/jackdaw/test.clj b/src/jackdaw/test.clj index d2f39965..93133d3b 100644 --- a/src/jackdaw/test.clj +++ b/src/jackdaw/test.clj @@ -76,7 +76,7 @@ consumer producer] java.io.Closeable - (close [this] + (close [_this] (doseq [hook exit-hooks] (hook)) (log/info "destroyed test machine"))) diff --git a/src/jackdaw/test/commands/base.clj b/src/jackdaw/test/commands/base.clj index 579c1185..502feed6 100644 --- a/src/jackdaw/test/commands/base.clj +++ b/src/jackdaw/test/commands/base.clj @@ -7,13 +7,13 @@ (def command-map {:stop (constantly true) - :sleep (fn [machine [sleep-ms]] + :sleep (fn [_machine [sleep-ms]] (Thread/sleep sleep-ms)) - :println (fn [machine params] + :println (fn [_machine params] (println (apply str params))) - :pprint (fn [machine params] + :pprint (fn [_machine params] (pprint/pprint params)) :do (fn [machine [do-fn]] diff --git a/src/jackdaw/test/commands/write.clj b/src/jackdaw/test/commands/write.clj index aed0eb21..6f77440a 100644 --- a/src/jackdaw/test/commands/write.clj +++ b/src/jackdaw/test/commands/write.clj @@ -5,7 +5,7 @@ (set! *warn-on-reflection* true) -(defn default-partition-fn [topic-map topic-name k v partition-count] +(defn default-partition-fn [topic-map _topic-name k _v partition-count] (int (partitioning/default-partition topic-map k nil partition-count))) (defn create-message [topic-map message opts] diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 67c03a16..354904ad 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -30,27 +30,27 @@ (defn- create-topics "Creates " - [client kafka-config topic-config] + [client topic-config] (let [required (->> topic-config - (filter (fn [[k v]] + (filter (fn [[_k v]] (not (.contains (-> (list-topics client) .names .get) (:topic-name v))))) - (map (fn [[k v]] + (map (fn [[_k v]] (new-topic v))))] (-> (.createTopics client required) (.all)))) (defn- delete-topics - [client kafka-config topic-config] + [client topic-config] (let [deletable (->> topic-config - (filter (fn [[k v]] + (filter (fn [[_k v]] (.contains (-> (list-topics client) .names .get) (:topic-name v)))) - (map (fn [[k v]] + (map (fn [[_k v]] (:topic-name v))))] (-> (.deleteTopics client deletable) (.all)))) @@ -64,7 +64,7 @@ ([kafka-config topic-config timeout-ms] (fn [t] (with-open [client (AdminClient/create kafka-config)] - (-> (create-topics client kafka-config topic-config) + (-> (create-topics client topic-config) (.get timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)) (log/info "topic-fixture: created topics: " (keys topic-config)) (t))))) @@ -111,7 +111,7 @@ (defn- set-error [error] (reify Thread$UncaughtExceptionHandler - (uncaughtException [_ t e] + (uncaughtException [_this _thread e] (log/error e (.getMessage e)) (reset! error e)))) @@ -218,7 +218,6 @@ (if-not (class-exists? 'kafka.tools.StreamsResetter) (throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool")) (let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter")) - app-id (get app-config "application.id") args (concat ["--application-id" (get app-config "application.id") "--bootstrap-servers" (get app-config "bootstrap.servers")] reset-args) diff --git a/src/jackdaw/test/journal.clj b/src/jackdaw/test/journal.clj index d703ee84..ae1a10d7 100644 --- a/src/jackdaw/test/journal.clj +++ b/src/jackdaw/test/journal.clj @@ -23,7 +23,7 @@ (remove-watch journal id) (deliver p {:result :found :info result})))] - (add-watch journal id (fn [k r old new] + (add-watch journal id (fn [_k _r _old new] (check-condition new))) ;; don't rely on watcher to 'check-condition' ;; in case journal is already in a final, good state diff --git a/src/jackdaw/test/serde.clj b/src/jackdaw/test/serde.clj index 23933509..e2d7f9e5 100644 --- a/src/jackdaw/test/serde.clj +++ b/src/jackdaw/test/serde.clj @@ -90,7 +90,7 @@ "Returns a map of topics to the corresponding deserializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (deserializer v)])) (into {}))) @@ -99,7 +99,7 @@ "Returns a map of topic to the corresponding serializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (serializer v)])) (into {}))) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index f4aa376a..55f9598a 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -116,7 +116,7 @@ {:process (d/loop [consumer (subscription kafka-config (vals topic-metadata))] (d/chain (d/future consumer) - (fn [c] + (fn [_c] (when-not (realized? started?) (deliver started? true) (log/infof "started kafka consumer: %s" @@ -174,7 +174,7 @@ (defn producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([kafka-config topic-config serializers] + ([kafka-config _topic-config serializers] (let [producer (kafka/producer kafka-config byte-array-serde) messages (s/stream 1 (map (fn [x] (try diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index c7d13cea..a2440fa5 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -109,7 +109,7 @@ poll (poller messages topic-config)] {:process (d/loop [cont? @continue?] - (d/chain cont? (fn [d] + (d/chain cont? (fn [_d] (when-not (realized? started?) (log/info "started mock consumer: %s" {:driver driver}) (deliver started? true)) diff --git a/src/jackdaw/test/transports/rest_proxy.clj b/src/jackdaw/test/transports/rest_proxy.clj index 570b9be8..dd6716d3 100644 --- a/src/jackdaw/test/transports/rest_proxy.clj +++ b/src/jackdaw/test/transports/rest_proxy.clj @@ -164,13 +164,11 @@ (assoc client :base-uri base-uri, :instance-id instance-id)))))))) (defn with-subscription - [{:keys [base-uri group-id instance-id] :as client} topic-metadata] + [{:keys [base-uri] :as client} topic-metadata] (let [url (format "%s/subscription" base-uri) topics (map :topic-name (vals topic-metadata)) headers {"Accept" (content-types :json) - "Content-Type" (content-types :json)} - body {:topics topics}] - + "Content-Type" (content-types :json)}] (d/chain (handle-proxy-request (:post *http-client*) url headers {:topics topics}) (fn [response] (if (:error response) @@ -182,7 +180,7 @@ "Returns a function that takes a consumer and puts any messages retrieved by polling it onto the supplied `messages` channel" [consumer] - (let [{:keys [base-uri group-id instance-id]} consumer + (let [{:keys [base-uri]} consumer url (format "%s/records" base-uri) headers {"Accept" (content-types :byte-array)} body nil] @@ -237,7 +235,7 @@ (s/close! messages) (destroy-consumer client) (log/infof "stopped rest-proxy consumer: %s" (proxy-client-info client)))) - (d/chain client (fn [client] + (d/chain client (fn [_client] (s/put-all! messages msgs) (log/infof "collected %s messages from kafka" (count msgs)) (Thread/sleep 500) @@ -271,7 +269,7 @@ (defn rest-proxy-producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([config topics serializers] + ([config _topics serializers] (let [producer (rest-proxy-client config) messages (s/stream 1 (map (fn [x] (try diff --git a/test/jackdaw/admin_test.clj b/test/jackdaw/admin_test.clj index eeb4ab09..ba5dba8b 100644 --- a/test/jackdaw/admin_test.clj +++ b/test/jackdaw/admin_test.clj @@ -13,9 +13,9 @@ (extend MockAdminClient admin/Client (-> admin/client-impl - (merge {:alter-topics* (fn [this topics] + (merge {:alter-topics* (fn [_this topics] (d/future [:altered topics])) - :describe-configs* (fn [this configs] + :describe-configs* (fn [_this configs] (d/future (into {} (map #(vector % {"some-key" "some-value"}) configs))))}))) @@ -45,12 +45,11 @@ (def test-cluster (take 3 (node-seq 0 "test-host"))) (defn with-mock-admin-client [cluster f] - (let [effects (atom []) - client (MockAdminClient. cluster (first cluster))] + (let [client (MockAdminClient. cluster (first cluster))] (f client))) (deftest test-new-topic - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (let [t (data/map->NewTopic info) msg (format "cannot create topic from %s" t)] (is (instance? org.apache.kafka.clients.admin.NewTopic t) msg)))) @@ -73,7 +72,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (is (admin/topic-exists? client info)))))) (deftest test-retry-exists? @@ -99,7 +98,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[topic-name topic-info] (admin/describe-topics client)] + (doseq [[_topic-name topic-info] (admin/describe-topics client)] (is (set= [:is-internal? :partition-info] (keys topic-info))))))) diff --git a/test/jackdaw/client_test.clj b/test/jackdaw/client_test.clj index b786c37e..9e2eee14 100644 --- a/test/jackdaw/client_test.clj +++ b/test/jackdaw/client_test.clj @@ -86,7 +86,7 @@ (testing "producer callbacks" (testing "success" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok))))] @@ -96,7 +96,7 @@ (testing "failure" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok)))) @@ -117,7 +117,7 @@ (testing "send with callback" (let [msg (data/->ProducerRecord {:topic-name "foo"} "1" "one") on-callback (promise) - result (client/send! producer msg (fn [meta ex] + result (client/send! producer msg (fn [_meta ex] (if ex (deliver on-callback ex) (deliver on-callback :ok))))] @@ -195,47 +195,44 @@ (deftest ^:integration partitions-for-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (testing "partition info" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "partition info" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (let [[pinfo] (-> (client/partitions-for consumer bar-topic) (data/datafy))] (is (response-ok? :partitions-for pinfo)))))) - (testing "single-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "single-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (is (= 1 (client/num-partitions consumer bar-topic)))))) - (testing "multi-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [high-partition-topic])) + (testing "multi-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [high-partition-topic])) (fn [consumer] (is (= 15 (client/num-partitions consumer high-partition-topic)))))) - (testing "single-partition producer" - (with-producer (client/producer (producer-config)) + (testing "single-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] (is (= 1 (client/num-partitions producer bar-topic)))))) - (testing "multi-partition producer" - (with-producer (client/producer (producer-config)) + (testing "multi-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] - (is (= 15 (client/num-partitions producer high-partition-topic))))))))) + (is (= 15 (client/num-partitions producer high-partition-topic)))))))) (defn mock-consumer "Returns a consumer that will return the supplied items (as ConsumerRecords) in response to successive calls of the `poll` method" [queue] (reify Consumer - (^ConsumerRecords poll [this ^long ms] + (^ConsumerRecords poll [_this ^long ms] (.poll queue ms TimeUnit/MILLISECONDS)) - (^ConsumerRecords poll [this ^Duration duration] + (^ConsumerRecords poll [_this ^Duration duration] (.poll queue (.toMillis duration) TimeUnit/MILLISECONDS)))) (defn poll-result [topic data] @@ -251,27 +248,24 @@ consumer (mock-consumer q)] (.put q (poll-result "test-topic" [[1 1] [2 2]])) (let [results (client/poll consumer 1000)] - (are [k v] (first results) + (are [_k _v] (first results) :topic "test-topic" :key 1 :value 1) - (are [k v] (second results) + (are [_k _v] (second results) :topic "test-topic" :key 2 :value 2)))) (deftest ^:integration position-all-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] ;; without an initial `poll`, there is no position info (client/poll consumer 0) (is (= {{:topic-name "bar" :partition 0} 0} - (client/position-all consumer)))))))) + (client/position-all consumer))))))) (defn with-topic-data "Helper for creating a randomly named topic and seeding it with data diff --git a/test/jackdaw/serdes/fressian_test.clj b/test/jackdaw/serdes/fressian_test.clj index d52b2489..86d2bae2 100644 --- a/test/jackdaw/serdes/fressian_test.clj +++ b/test/jackdaw/serdes/fressian_test.clj @@ -38,7 +38,7 @@ (def read-handlers (-> (merge {uri-tag (reify ReadHandler - (read [_ reader tag component-count] + (read [_ reader _tag _component-count] (URI. (.readObject reader))))} fressian/clojure-read-handlers) fressian/associative-lookup)) diff --git a/test/jackdaw/serdes/json_test.clj b/test/jackdaw/serdes/json_test.clj index 9bfa598c..c0a4e357 100644 --- a/test/jackdaw/serdes/json_test.clj +++ b/test/jackdaw/serdes/json_test.clj @@ -26,8 +26,7 @@ (deftest reverse-json-roundtrip-test (testing "JSON bytes are the same after deserialization and serialization." - (let [s (slurp (io/resource "resources/pass1.json")) - b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] + (let [b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] (is (= (into [] b) (into [] (->> (.deserialize (jsj/deserializer) nil b) (.serialize (jsj/serializer) nil)))))))) diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 330a8cd6..94a95e3e 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -146,7 +146,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter (fn [[k v]] (> v 1))) + (k/filter (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -161,7 +161,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter-not (fn [[k v]] (> v 1))) + (k/filter-not (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -261,7 +261,7 @@ driver (mock/build-driver (fn [builder] (let [[pos-stream neg-stream] (-> builder (k/kstream topic-a) - (k/branch [(fn [[k v]] + (k/branch [(fn [[_k v]] (<= 0 v)) (constantly true)]))] (k/to pos-stream topic-pos) @@ -397,7 +397,7 @@ records (atom []) driver (mock/build-driver (fn [builder] (-> (k/kstream builder topic-a) - (k/process! (fn [ctx k v] + (k/process! (fn [_ctx _k v] (swap! records conj v)) [])))) publish-a (partial mock/publish driver topic-a)] @@ -411,7 +411,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/select-key (fn [[k v]] + (k/select-key (fn [[k _v]] (inc k))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -564,7 +564,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter (fn [[k v]] + (k/filter (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -584,7 +584,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter-not (fn [[k v]] + (k/filter-not (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -911,7 +911,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce + topic-a) (k/to-kstream) (k/to topic-b)))) @@ -933,7 +933,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce +) (k/to-kstream) (k/to topic-b)))) @@ -955,9 +955,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v)) + (fn [acc [_k v]] (+ acc v)) topic-a) (k/to-kstream) (k/to topic-b)))) @@ -1006,9 +1006,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v))) + (fn [acc [_k v]] (+ acc v))) (k/to-kstream) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -1032,7 +1032,7 @@ (-> in (k/group-by-key) (k/aggregate (constantly []) - (fn [acc [k v]] + (fn [acc [_k v]] (concat [(last acc)] [v])) (assoc topic-in @@ -1059,7 +1059,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-time (TimeWindows/of 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1107,7 +1107,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-session (SessionWindows/with 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1137,10 +1137,10 @@ (k/group-by-key) (k/window-by-session (SessionWindows/with 1000)) (k/aggregate (constantly 0) - (fn [agg [k v]] + (fn [agg [_k v]] (+ agg v)) ;; Merger - (fn [k agg1 agg2] + (fn [_k agg1 agg2] (+ agg1 agg2)) topic-a) (k/to-kstream) @@ -1175,8 +1175,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v)) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v)) topic-b) (k/to-kstream) (k/to topic-b))))] @@ -1232,8 +1232,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v))) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v))) (k/to-kstream) (k/to topic-b))))] (let [publish (partial mock/publish driver topic-a)] @@ -1336,7 +1336,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/join-global k-table - (fn [[k v]] + (fn [[k _v]] k) +) (k/to topic-c)))))] @@ -1359,7 +1359,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/left-join-global k-table - (fn [[k v]] + (fn [[k _v]] k) safe-add) (k/to topic-c)))))] @@ -1392,7 +1392,7 @@ (publisher 100 {:val 10}) - (let [[[k v]] (mock/get-keyvals driver output-t)] + (let [[[_k v]] (mock/get-keyvals driver output-t)] (is (= 11 (:new-val v))) (is (= "input-topic" (:topic v))))))))) diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index d8812135..51eae7d1 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -114,7 +114,6 @@ :partition-count 5 :key-serde :long :value-serde :json}) - opts {} msg {:id 1 :a 2 :b 3 :payload "yolo"}] (testing "partition must be >= 0" diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 9e6875ad..5825623d 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -80,7 +80,7 @@ (.write *err* "helpful error message\n") (.write *out* "essential application info\n") 1)} - (fn [{:keys [resetter reset-args error-data]}] + (fn [{:keys [resetter error-data]}] (is (instance? kafka.tools.StreamsResetter resetter)) (is (= 1 (:status error-data))) (is (= "helpful error message\n" (:err error-data))) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index c5d5cf4c..aa100db5 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -81,7 +81,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -107,7 +106,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index f53ddf3e..04cd7ae3 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -74,7 +74,7 @@ (deftest test-driver-closed-after-use (let [driver-closed? (atom false) driver (reify java.io.Closeable - (close [this] + (close [_this] (reset! driver-closed? true))) transport (trns/transport {:type :mock :driver driver @@ -90,7 +90,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -112,7 +111,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index 5377ed47..d741132d 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -101,7 +101,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -127,7 +126,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index 05db7e55..b188b145 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -57,7 +57,7 @@ (deftest test-run-test (testing "the run test machinery" - (let [m {:executor (-> (fn [m c] + (let [m {:executor (-> (fn [_machine c] (let [[cmd & params] c] (apply ({:min (fn [v] {:result {:result (apply min v)}}) :max (fn [v] {:result {:result (apply max v)}}) @@ -70,7 +70,7 @@ :journal (atom {})}] (testing "works properly" - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:max [1 2 3]] [:is-1 1]])] @@ -88,7 +88,7 @@ (testing "execution stops on an unknown command" (is (thrown? NullPointerException - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:foo 2] [:max [1 2 3]]])] @@ -216,17 +216,17 @@ [in out] (fn [builder] (let [in (-> (k/kstream builder in) - (k/map (fn [[k v]] + (k/map (fn [_record] (throw (ex-info "bad topology" {})))))] (k/to in out) builder))) (defn bad-key-fn - [msg] + [_msg] (throw (ex-info "bad-key-fn" {}))) (defn bad-watch-fn - [journal] + [_journal] (throw (ex-info "bad-watch-fn" {}))) (deftest test-machine-happy-path From ac04de9c15a519a7f383d196524fc5ac40ebe05c Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Tue, 7 Jun 2022 13:28:21 +0100 Subject: [PATCH 24/31] [lint] Remove trickier unused bindings --- test/jackdaw/test/middleware_test.clj | 9 +++------ test/jackdaw/test/transports/mock_test.clj | 2 +- test/jackdaw/test/transports/rest_proxy_test.clj | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/test/jackdaw/test/middleware_test.clj b/test/jackdaw/test/middleware_test.clj index 5a41fcc3..b11ffdf8 100644 --- a/test/jackdaw/test/middleware_test.clj +++ b/test/jackdaw/test/middleware_test.clj @@ -16,12 +16,9 @@ (defn with-identity-transport [{:keys [test-id transport]} f] (with-open [machine (jd.test/test-machine (transport))] - (when test-id - (log/info "begin" test-id)) - - (let [result (f machine)] - (when test-id - (log/info "end" test-id))))) + (when test-id (log/info "begin" test-id)) + (f machine) + (when test-id (log/info "end" test-id)))) (deftest test-with-status diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 04cd7ae3..05115a4a 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -79,7 +79,7 @@ transport (trns/transport {:type :mock :driver driver :topics {}})] - (with-open [machine (jd.test/test-machine transport)] + (with-open [_machine (jd.test/test-machine transport)] (is (not @driver-closed?))) (is @driver-closed?))) diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index d741132d..fd0b916a 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -167,7 +167,7 @@ (deftest test-rest-proxy-group-config (let [http-reqs (atom [])] (binding [proxy/*http-client* {:post (mock-http-client http-reqs)}] - (let [client (-> (proxy/rest-proxy-client (-> (rest-proxy-config "test-group-config") + (let [_client (-> (proxy/rest-proxy-client (-> (rest-proxy-config "test-group-config") (assoc :group-config {:auto.offset.reset "earliest" :fetch.min.bytes 100 :consumer.fetch.timeout.ms 200}))) From 3c84e084d0ec6034daedd429f6818218d5a6a47a Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 10:37:38 +0100 Subject: [PATCH 25/31] [lint] Locally ignore unresolved namespace for Datafy backport --- src/jackdaw/data.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/jackdaw/data.clj b/src/jackdaw/data.clj index cb438d4a..8b610094 100644 --- a/src/jackdaw/data.clj +++ b/src/jackdaw/data.clj @@ -34,7 +34,8 @@ (datafy [o] o))))) ;;; Just vendor this - not worth the footwork to import the "real" one - +;; Ignore clj-kondo's warning: Unresolved namespace clojure.core.protocols. Are you missing a require? +#_{:clj-kondo/ignore [:unresolved-namespace]} (defn datafy "Attempts to return x as data. From 335f55b7f4755822f26e4148cfa23d56ffd8c3ab Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 11:27:03 +0100 Subject: [PATCH 26/31] [lint] Remove unused private var The private function jackdaw.test.fixtures/delete-topics isn't used --- src/jackdaw/test/fixtures.clj | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 354904ad..63c73032 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -42,19 +42,6 @@ (-> (.createTopics client required) (.all)))) -(defn- delete-topics - [client topic-config] - (let [deletable (->> topic-config - (filter (fn [[_k v]] - (.contains (-> (list-topics client) - .names - .get) - (:topic-name v)))) - (map (fn [[_k v]] - (:topic-name v))))] - (-> (.deleteTopics client deletable) - (.all)))) - (defn topic-fixture "Returns a fixture function that creates all the topics named in the supplied topic config before running a test function." From 8b53e303ca0452bfeb9c2bce88592ce32c5ade94 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 14:35:01 +0100 Subject: [PATCH 27/31] [lint] Fix unresolved symbol Ignore thrown-with-msg-and-data which extends the `clojure.test/is` macro --- .clj-kondo/config.edn | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index 9196f567..93403c2c 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -1,5 +1,8 @@ {:linters {:unused-binding { ;; ignore unused :as binding. - :exclude-destructured-as true}} + :exclude-destructured-as true} + :unresolved-symbol { ;; `thrown-with-msg-and-data?` is a legit extension to the `is` macro + ;; via an `assert-expr` defmethod (see clojure.test doc) + :exclude [(clojure.test/is [thrown-with-msg-and-data?])]}} :lint-as {clojure.test.check.clojure-test/defspec clojure.core/def jackdaw.data/defn->data clojure.core/defn From 033fa9cda952bd7cfd34bddfbf1df39224f15971 Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 14:36:15 +0100 Subject: [PATCH 28/31] [lint] Fix redefined vars We purposely redefine the fns created by defmacro. --- src/jackdaw/serdes/avro.clj | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index b53616ca..66ae89ae 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -308,6 +308,8 @@ (clj->avro element-coercion x (conj path i))) clj-seq)))) + +#_{:clj-kondo/ignore [:redefined-var]} (defn ->ArrayType "Wrapper by which to construct a `ArrayType` which handles the structural recursion of building the handler stack so that the @@ -341,6 +343,7 @@ (mangle) (GenericData$EnumSymbol. schema)))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->EnumType [_schema->coercion ^Schema schema] (EnumType. schema)) @@ -382,6 +385,8 @@ [k (clj->avro value-coercion v (conj path k))])) clj-map))) + +#_{:clj-kondo/ignore [:redefined-var]} (defn ->MapType "Wrapper by which to construct a `MapType` which handles the structural recursion of building the handler stack so that the @@ -454,6 +459,7 @@ (throw (ex-info (str (.getMessage e)) {:path path, :clj-data clj-map} e))))))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->RecordType "Wrapper by which to construct a `RecordType` which handles the structural recursion of building the handler stack so that the @@ -492,6 +498,7 @@ {:path path, :clj-data clj-data} (AvroTypeException. "Type Error")))))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->UnionType "Wrapper by which to construct a `UnionType` which handles the structural recursion of building the handler stack so that the From 385560a5d7c846a122d31d4a148397b1704847dd Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 14:58:36 +0100 Subject: [PATCH 29/31] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff993e86..63ca63c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +### Unreleased + +- Add clj-kondo and fix all lint warnings and errors [#323](https://github.com/FundingCircle/jackdaw/pull/323) + ### [0.9.5] - [2022-05-26] * Move away from deprecated class ConsumerRecordFactory (to prepare migration to Kafka Streams 3.2.0) From a360d0908439e4cd7a217c01e1c78388ae27ce0b Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Wed, 8 Jun 2022 17:43:50 +0100 Subject: [PATCH 30/31] Remove extra whitespace to trigger another build and see if the FC checks are still there --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ca63c5..d474a367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Unreleased -- Add clj-kondo and fix all lint warnings and errors [#323](https://github.com/FundingCircle/jackdaw/pull/323) +- Add clj-kondo and fix all lint warnings and errors [#323](https://github.com/FundingCircle/jackdaw/pull/323) ### [0.9.5] - [2022-05-26] From 1de72498879416a34faa6b4b0d2ea88acced87db Mon Sep 17 00:00:00 2001 From: Gilles Philippart Date: Fri, 10 Jun 2022 11:54:21 +0100 Subject: [PATCH 31/31] [lint] allow :refer :all in specific places --- src/jackdaw/streams/configured.clj | 14 +++----------- src/jackdaw/streams/interop.clj | 11 +++-------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 1bbebf92..ac646437 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -2,17 +2,9 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) - (:require [jackdaw.streams.protocols :refer [IGlobalKTable IKGroupedBase IKGroupedStream IKGroupedTable IKStream - IKStreamBase IKTable ISessionWindowedKStream IStreamsBuilder - ITimeWindowedKStream aggregate branch count filter filter-not flat-map - flat-map-values for-each! global-ktable global-ktable* group-by - group-by-key join join-global join-windowed kgroupedtable* kstream - kstream* kstreams ktable ktable* left-join left-join-global - left-join-windowed map map-values merge outer-join outer-join-windowed - peek print! process! reduce select-key source-topics streams-builder* - suppress through to! to-kstream transform transform-values - windowed-by-session windowed-by-time]] - [jackdaw.streams.configurable :refer [IConfigurable]])) + #_{:clj-kondo/ignore [:refer-all]} + (:require [jackdaw.streams.protocols :refer :all] + [jackdaw.streams.configurable :refer :all])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index e1570f39..e853087b 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -2,14 +2,9 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) - (:require [jackdaw.streams.protocols :refer [IGlobalKTable IKGroupedBase IKGroupedStream IKGroupedTable IKStream - IKStreamBase IKTable ISessionWindowedKStream IStreamsBuilder - ITimeWindowedKStream flat-transform flat-transform-values global-ktable* - kstream* ktable* transform transform-values]] - [jackdaw.streams.lambdas :refer [->FnStreamPartitioner aggregator foreach-action initializer - key-value-flatmapper key-value-mapper merger predicate processor-supplier - reducer select-key-value-mapper transformer-supplier value-joiner - value-mapper value-transformer-supplier]]) + #_{:clj-kondo/ignore [:refer-all]} + (:require [jackdaw.streams.protocols :refer :all] + [jackdaw.streams.lambdas :refer :all]) (:import [java.util Collection] [java.util.regex