Skip to content

Commit

Permalink
Merge pull request #296 from FundingCircle/louiseklodt/fix-consumer-d…
Browse files Browse the repository at this point in the history
…ocs-for-poll

Fix Client API docs to use poll instead of log and add sample app for Client API
  • Loading branch information
LouiseKlodt authored Jun 17, 2021
2 parents 2d1b0a0 + 0f7f73a commit 144748e
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pom.xml.asc
.lein-*
.cpcache
.nrepl-port
/examples/*/target
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ You can find all the documentation on [cljdoc](https://cljdoc.org/d/fundingcircl
- [Pipe](https://github.com/FundingCircle/jackdaw/tree/master/examples/pipe)
- [Word Count](https://github.com/FundingCircle/jackdaw/tree/master/examples/word-count)
- [Simple Ledger](https://github.com/FundingCircle/jackdaw/tree/master/examples/simple-ledger)
- [Roll Dice](https://github.com/FundingCircle/jackdaw/tree/master/examples/rolldice)

## Contributing

Expand All @@ -38,7 +39,7 @@ Anyone with the appropriate credentials can "cut a release" of jackdaw using the
5. In the "describe this release" field, enter the contents of the Changelog and add a credit to the contributors of the release
6. When happy, use the "Publish Release" button to publish the release in github which creates a corresponding git tag
7. Once the tag is seen by circleci, a deployment build is triggered which builds the project and deploys a release to clojars

Steps 2 to 6 is essentially `git tag $version -m "$title\n\n$description" && git push --tags`


Expand Down
61 changes: 48 additions & 13 deletions doc/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The Jackdaw Client API wraps the core Kafka `Producer`<sup>[1](#producerapi)</su
`Consumer`<sup>[2](#consumerapi)</sup> APIs and provides functions for building or
unpacking some of the supporting objects like Callbacks, Serdes, ConsumerRecords etc.

Higher level concepts in the kafka ecosystem like Kafka Streams, Kafka Connect, and KSQL all
Higher level concepts in the Kafka ecosystem like Kafka Streams, Kafka Connect, and KSQL all
build on these core APIs so acquiring a deep understanding will be rewarded with increased
understanding of the many associated technologies.

Expand Down Expand Up @@ -83,12 +83,51 @@ this example, the Consumer is minimally configured just to illustrate a few impo
Consumers are usually created using the `with-open` macro so that they are automatically
closed either when evaluation reaches the end of the body or an exception is thrown. By default
the StringDeserializer is used to deserialize the key and value before being made available
in the ConsumerRecord. In this example, the consumer will see all records written to the "foo"
topic due to the use of `jc/subscribe`.
in the ConsumerRecord.

The `jackdaw.client.log/log` function takes a consumer instance that has already been subscribed
The first step is to create a consumer and subscribe it to a list of topics. We can use the `jc/subscribed-consumer` function:
```
(with-open [consumer (jc/subscribed-consumer consumer-config [topic-config-1 topic-config-2 ...])
```
`subscribed-consumer` takes a `consumer-config` and a vector of `topic-configs` and returns a `consumer` that is subscribed to all of the given topics.

To create a polling loop for the consumer, the main body of a consumer loop might look as follows:

```
(ns consumer-example
(:require
[jackdaw.client :as jc]))
(def consumer-config
{"bootstrap.servers" "localhost:9092"
"group.id" "com.foo.my-consumer"})
(def topic-config
{:topic-name "foo"})
(defn poll-and-loop!
"Continuously fetches records every `poll-ms`, processes them with `processing-fn` and commits offset after each poll."
[consumer processing-fn continue?]
(let [poll-ms 5000]
(loop []
(if @continue?
(let [records (jc/poll consumer poll-ms)]
(when (seq records)
(processing-fn records)
(.commitSync consumer))
(recur))))))
(defn process-messages! [topic-config processing-fn]
(let [continue? (atom true)]
(with-open [consumer (jc/subscribed-consumer consumer-config [topic-config])]
(poll-and-loop! consumer processing-fn continue?))))
```
Here, we create a consumer and subscribe it to the "foo" topic. The `poll-and-loop` function continuously fetches records every `poll-ms`, processes them with `processing-fn` (app specific) and commits offset after each poll. For a simple sample app using the Client API see examples/rolldice<sup>[7](#clientapiexample)</sup>).

The `jackdaw.client.log/log` function can be useful for testing. It takes a consumer instance that has already been subscribed
to one or more topics, a polling interval in ms, and optionally a `fuse-fn`, and returns a lazy infinite sequence of "datafied" records in the order
that they were received by calls to the Consumer's `.poll` method. If `fuse-fn` was provided, it stops after `fuse-fn` returns false. In this example, we just
that they were received by calls to the Consumer's `.poll` method. If `fuse-fn` was provided, it stops after `fuse-fn` returns false, otherwise it keeps polling. In this example, the consumer will see all records written to the "foo"
topic due to the use of `jc/subscribe`. We just
write the record to standard out to demonstrate the keys that are available in each record. To
see what other keys are available, see data/consumer.clj<sup>[6](#consumerdata)</sup>

Expand All @@ -99,9 +138,7 @@ provide more detailed information about how the consumer works behind the scenes
(ns consumer-example
(:require
[jackdaw.client :as jc]
[jackdaw.client.log :as jl])
(:import
(org.apache.kafka.common.serialization Serdes)))
[jackdaw.client.log :as jl]))
(def consumer-config
{"bootstrap.servers" "localhost:9092"
Expand All @@ -120,10 +157,7 @@ provide more detailed information about how the consumer works behind the scenes
(println "offset: " offset)))
```

There is also a convenience function `subscribed-consumer` which takes a `consumer-config` and a vector of `topic-configs` and returns a `consumer` that is subscribed to all of the given topics. Note that in this case all topics subscribed to by a single consumer must use the same pair of key and value serde instances. This is because the serdes of the first topic from `topic-configs` are used (or if none are provided those from the `consumer-config`), and therefore all other topics are expected to be able to use same serdes.
```
(with-open [my-consumer (jc/subscribed-consumer consumer-config [topic-config-1 topic-config-2 ...])
```
Note that when using `subscribed-consumer` all topics subscribed to by the consumer must use the same pair of key and value serde instances. This is because the serdes of the first topic from the `topic-configs` vector are used (or if none are provided those from the `consumer-config`), and therefore all topics are expected to be able to use same serdes.

## References

Expand All @@ -132,4 +166,5 @@ There is also a convenience function `subscribed-consumer` which takes a `consum
<a name="producerconfig">3</a>: https://kafka.apache.org/documentation/#producerconfigs <br />
<a name="serdesdirectory">4</a>: https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/serdes <br />
<a name="consumerconfig">5</a>: https://kafka.apache.org/documentation/#consumerconfigs <br />
<a name="consumerdata">6</a>: https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/data/consumer.clj
<a name="consumerdata">6</a>: https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/data/consumer.clj <br />
<a name="clientapiexample">7</a>: https://github.com/FundingCircle/jackdaw/blob/master/examples/rolldice <br />
13 changes: 13 additions & 0 deletions examples/rolldice/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Roll dice

This repo contains a simple example app to demonstrate usage of Jackdaw's Client API (Consumer and Producer API). The app rolls a dice `n` number of times where `n` is provided by the user. A Jackdaw Producer writes the numbers to the input topic `rolldice`. A Jackdaw consumer continuously reads from the topic, adds up the numbers and prints out the result. To exit the consumer loop, press Ctr+c.

## Installation

Clone this repo.

## Usage

1. Bring up the Kafka services by running `docker-compose up -d`. Alternatively, run the Kafka services (broker and zookeeper) locally, following instructions in the [Apache Kafka Quickstart](https://kafka.apache.org/quickstart).

2. From the root directory of the repo, run: `lein run`
32 changes: 32 additions & 0 deletions examples/rolldice/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
16 changes: 16 additions & 0 deletions examples/rolldice/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
(defproject rolldice "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.10.1"]
[fundingcircle/jackdaw "0.7.8"]
[proto-repl "0.3.1"]
[pjstadig/humane-test-output "0.11.0"]
[com.taoensso/timbre "4.2.0"]
[org.slf4j/slf4j-nop "1.7.30"]]
:main ^:skip-aot roll-dice.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all
:jvm-opts ["-Dclojure.compiler.direct-linking=true"]}
:dev {:plugins [[com.jakemccrary/lein-test-refresh "0.24.1"]]}})
38 changes: 38 additions & 0 deletions examples/rolldice/src/roll_dice/core.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
(ns roll-dice.core
"This sample app is a simple Kafka application to demonstrate how to use
Jackdaw's Client API (Producer and Consumer).
It rolls a dice `n` number of times (input by the user).
A Jackdaw Producer then writes the numbers to the input topic `rolldice`.
A Jackdaw Consumer reads from the topic and adds up the numbers
and prints out the result."
(:require
[roll-dice.kafka :as kafka]))

(defn roll-dice [n]
(repeatedly n
#(let [rnd-side (fn [] (inc (rand-int 6)))]
[(rnd-side) (rnd-side)])))

(defn process-records [records]
(let [numbers (:value (first records))
sums (mapv #(apply + %) numbers)]
(println "numbers: " numbers)
(println "sums: " sums)))

(defn start-consumer-thread! [topic group-id]
(-> (Thread. #(kafka/process-messages! topic group-id process-records))
(.start)))

(defn -main []
(let [topic "rolldice"
group-id "rolldice-consumer"]
(start-consumer-thread! topic group-id)
(loop []
(println "How many times would you like to roll the dice? \nPlease enter a positive integer:")
(let [n (Integer/parseInt (read-line))
roll-n-times! #(roll-dice n)]
(kafka/produce-message! topic roll-n-times!)
(Thread/sleep 2000)
(println "\nPress Ctr+c to exit or keep rolling.\n")
(recur)))))
81 changes: 81 additions & 0 deletions examples/rolldice/src/roll_dice/kafka.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
(ns roll-dice.kafka
(:require
[jackdaw.client :as jc]
[jackdaw.serdes :refer [string-serde edn-serde]]
[taoensso.timbre :refer [error info]])
(:import [org.apache.kafka.common.errors WakeupException]))

(def kafka-config {"bootstrap.servers" "localhost:9092"})

(defn producer-config [topic]
(merge kafka-config
{"acks" "all"
"client.id" (str "producer-" (name topic))}))

(defn consumer-config [topic group-id]
(merge kafka-config
{"group.id" group-id
"client.id" (str "consumer-" (name topic))
"auto.offset.reset" "earliest"}))

(defn topic-config [topic]
{:topic-name topic
:key-serde (string-serde)
:value-serde (edn-serde)})

(defn poll-and-loop!
"Continuously fetches records every `poll-ms`, processes them and commits offset after each poll."
[consumer processing-fn continue?]
(let [poll-ms 5000]
(loop []
(if @continue?
(let [records (jc/poll consumer poll-ms)]
(when (seq records)
(processing-fn records)
(info "commit sync at offset" (-> records last :offset inc))
(.commitSync consumer))
(recur))))))

(defn stop-and-close-consumer!
"Stops the consumer polling loop and closes the consumer."
[consumer continue?]
(reset! continue? false)
(.close consumer)
(info "Closed Kafka Consumer"))

(defn start-consumer!
"Starts consumer loop to process events read from `topic`"
[consumer processing-fn continue?]
(try
(poll-and-loop! consumer processing-fn continue?)
(catch WakeupException e) ;; ignore for shutdown
(finally
(stop-and-close-consumer! consumer continue?))))

(defn add-shutdown-hook-consumer!
"Registers a shutdown hook to exit the consumer cleanly"
[consumer continue?]
(.addShutdownHook (Runtime/getRuntime)
(Thread. (fn []
(info "Stopping Kafka Consumer...")
(reset! continue? false)
(.wakeup consumer))))) ; wakeup causes consumer to break out of polling

(defn process-messages!
"Creates Kafka Consumer and shutdown hook, and starts the consumer"
[topic group-id processing-fn]
(let [topic-config (topic-config topic)
consumer-config (consumer-config topic group-id)
continue? (atom true)
consumer (jc/subscribed-consumer consumer-config [topic-config])]
(add-shutdown-hook-consumer! consumer continue?)
(start-consumer! consumer processing-fn continue?)))

(defn produce-message!
"Creates a Kafka Producer and writes message to `topic` by calling `producer-fn`"
[topic producer-fn]
(let [topic-config (topic-config topic)
producer-config (producer-config topic)]
(with-open [producer (jc/producer producer-config topic-config)]
(let [values (producer-fn)]
@(jc/produce! producer topic-config values)))))
4 changes: 4 additions & 0 deletions examples/rolldice/test/roll_dice/core_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(ns roll-dice.core-test
(:require
[clojure.test :refer [deftest is testing]]
[roll-dice.core :as core]))

0 comments on commit 144748e

Please sign in to comment.