Skip to content

Commit

Permalink
Merge pull request #260 from FundingCircle/headers
Browse files Browse the repository at this point in the history
Add kafka message header support for test machine
  • Loading branch information
Matt Searle authored Jul 15, 2020
2 parents c99dfc0 + 716b394 commit daa8387
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 7 deletions.
6 changes: 4 additions & 2 deletions src/jackdaw/test/commands/write.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
partn (if-let [explicit-partition (:partition opts)]
explicit-partition
(partition-fn (:topic-name topic-map) k message (:partition-count topic-map)))
timestamp (:timestamp opts (System/currentTimeMillis))]
timestamp (:timestamp opts (System/currentTimeMillis))
headers (:headers opts)]
(if (or (< partn 0)
(> partn (dec (:partition-count topic-map))))
(throw (ex-info "Invalid partition number for topic"
Expand All @@ -45,7 +46,8 @@
:key k
:value message
:partition partn
:timestamp timestamp})))
:timestamp timestamp
:headers headers})))

(defn do-write
([machine topic-name message]
Expand Down
3 changes: 2 additions & 1 deletion src/jackdaw/test/serde.clj
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
:key (deserialize-key (:key m) topic)
:value (deserialize-value (:value m) topic)
:partition (:partition m 0)
:offset (:offset m 0)}))
:offset (:offset m 0)
:headers (:headers m {})}))

(defn deserializers
"Returns a map of topics to the corresponding deserializer"
Expand Down
14 changes: 13 additions & 1 deletion src/jackdaw/test/transports/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
serde-map
byte-array-serde]])
(:import
org.apache.kafka.common.header.Header
org.apache.kafka.clients.consumer.Consumer
org.apache.kafka.streams.KafkaStreams$StateListener
org.apache.kafka.clients.consumer.ConsumerRecord
Expand Down Expand Up @@ -78,7 +79,11 @@
:serializedValueSize (.serializedValueSize consumer-record)
:timestamp (.timestamp consumer-record)
:topic (.topic consumer-record)
:value (.value consumer-record)}))
:value (.value consumer-record)
:headers (reduce (fn [header-map header]
(assoc header-map
(.key ^Header header)
(.value ^Header header))) {} (.headers consumer-record))}))

(defn ^ProducerRecord mk-producer-record
"Creates a kafka ProducerRecord for use with `send!`."
Expand Down Expand Up @@ -135,6 +140,12 @@
(reset! (:continue? consumer) false)
(deref (:process consumer)))

(defn set-headers [^ProducerRecord producer-record headers]
(let [record-headers (.headers producer-record)]
(doseq [[k v] headers]
(.add record-headers k v)))
producer-record)

(defn build-record
"Builds a Kafka Producer and assoc it onto the message map"
[m]
Expand All @@ -143,6 +154,7 @@
(:timestamp m)
(:key m)
(:value m))]
(set-headers rec (:headers m))
(assoc m :producer-record rec)))

(defn deliver-ack
Expand Down
17 changes: 14 additions & 3 deletions src/jackdaw/test/transports/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@
;; For this reason, we try to make things a bit more meaningful by using
;; terms like "input-record" and "output-record".

(defn set-headers [consumer-record headers]
(let [record-headers (.headers consumer-record)]
(doseq [[k v] headers]
(.add record-headers k v)))
consumer-record)

(defn with-input-record
"Creates a kafka ConsumerRecord to be fed directly into a topology source
node by the TopologyTestDriver"
node by the TopologyTestDriver"
[_topic-config]
(fn [m]
(let [record (ConsumerRecord. (get-in m [:topic :topic-name])
Expand All @@ -48,15 +54,20 @@
0)
(:key m)
(:value m))]
(assoc m :input-record record))))
(set-headers record (:headers m))
(assoc m :input-record record))))

(defn with-output-record
[_topic-config]
(fn [r]
{:topic (.topic r)
:key (.key r)
:value (.value r)
:partition (or (.partition r) -1)}))
:partition (or (.partition r) -1)
:headers (reduce (fn [header-map header]
(assoc header-map
(.key header)
(.value header))) {} (.headers r))}))

(defn- poller
"Returns a function for polling the results of a TopologyTestDriver
Expand Down
1 change: 1 addition & 0 deletions src/jackdaw/test/transports/rest_proxy.clj
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@
[config topic-metadata deserializers]
(let [continue? (atom true)
xform (comp
#(assoc % :headers {}) ; cannot read headers over the Rest API
#(assoc % :topic (j/reverse-lookup topic-metadata (:topic %)))
#(apply-deserializers deserializers %)
#(undatafy-record topic-metadata %))
Expand Down
34 changes: 34 additions & 0 deletions test/jackdaw/test_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,40 @@
(-> ((by-id "foo" "msg1") journal)
:value)))))))))

(deftest test-write-then-read-with-headers
(testing "write then watch"
(fix/with-fixtures [(fix/topic-fixture kafka-config {"foo" foo-topic})]
(with-open [t (jd.test/test-machine (kafka-transport))]
(let [write (cmd/write! "foo" {:id "msg1" :payload "yolo"} {:headers
{"MESSAGE.DATE" (.getBytes "1970-01-01")
"MESSAGE.VERSION" (.getBytes "1.0.1")}})
watch (cmd/watch (by-id "foo" "msg1")
{:info "failed to find foo with id=msg1"})
{:keys [results journal]} (jd.test/run-test t [write watch])
[write-result watch-result] results]

(testing "write result"
(is (= :ok (:status write-result)))

(doseq [record-meta record-meta-fields]
(is (contains? (:result write-result) record-meta))))

(testing "watch result"
(is (= :ok (:status watch-result))))

(testing "written records are journalled"
(is (= {:id "msg1" :payload "yolo"}
(-> ((by-id "foo" "msg1") journal)
:value))))

(testing "written record headers are journalled"
(is (= {"MESSAGE.DATE" "1970-01-01"
"MESSAGE.VERSION" "1.0.1"}
(-> ((by-id "foo" "msg1") journal)
:headers
(update "MESSAGE.DATE" (fn [v] (String. v)))
(update "MESSAGE.VERSION" (fn [v] (String. v))))))))))))

(deftest test-reuse-machine
(fix/with-fixtures [(fix/topic-fixture kafka-config {"foo" foo-topic})]
(with-open [t (jd.test/test-machine (kafka-transport))]
Expand Down

0 comments on commit daa8387

Please sign in to comment.