Skip to content

Commit

Permalink
Rewrite Word Count example using FakeTopicMetadata and Integrant
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles Reese committed Sep 13, 2019
1 parent f404699 commit 132c60f
Show file tree
Hide file tree
Showing 9 changed files with 605 additions and 433 deletions.
148 changes: 0 additions & 148 deletions examples/dev/user.clj

This file was deleted.

40 changes: 24 additions & 16 deletions examples/word-count/deps.edn
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
{:deps
{fundingcircle/jackdaw {:mvn/version "0.6.4" :exclusions [org.apache.zookeeper/zookeeper]}
org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
org.clojure/clojure {:mvn/version "1.10.0"}
org.clojure/algo.generic {:mvn/version "0.1.3"}
{:paths
["src" "resources"]

:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-word_count_modernization-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}
:aliases
{:dev
{:extra-paths ["dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}}}

:paths
["src" "test" "dev" "../dev" "resources" "../resources"]
:test
{:extra-paths ["test"]
:extra-deps {

:aliases
{:test {:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}}
com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
70 changes: 0 additions & 70 deletions examples/word-count/dev/system.clj

This file was deleted.

118 changes: 118 additions & 0 deletions examples/word-count/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
(ns user
(:require [clojure.java.shell :refer [sh]]
[clojure.string :as str]
[jackdaw.admin :as ja]
[jackdaw.repl :refer :all]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.describe :as jsd]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[word-count])
(:import [clojure.lang ILookup Associative]))


(def config
{:streams-config {:application-id "word-count"
:bootstrap-servers "localhost:9092"
:default-key-serde "jackdaw.serdes.EdnSerde"
:default-value-serde "jackdaw.serdes.EdnSerde"
:cache-max-bytes-buffering "0"}

:topology {:topology-builder-fn :word-count/topology-builder}

:topics {:streams-config (ig/ref :streams-config)
:topology (ig/ref :topology)}

:app {:streams-config (ig/ref :streams-config)
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly config))


(deftype FakeTopicMetadata []
ILookup
(valAt [this key]
{:topic-name (name key)
:partition-count 1
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)})

Associative
(assoc [this key val]
this))

(defn new-fake-topic-metadata []
(FakeTopicMetadata.))

(def topic-metadata
(new-fake-topic-metadata))


(defn topology->topic-metadata
[topology streams-config]
(->> (jsd/describe-topology (.build (j/streams-builder* topology)) streams-config)
(map :nodes)
(reduce concat)
(filter #(= :topic (:type %)))
(remove (fn [x] (re-matches #".*STATE-STORE.*" (:name x))))
(map :name)
(reduce (fn [acc x] (assoc acc (keyword x) (get (new-fake-topic-metadata) x))) {})))

(defn re-delete-topics
"Takes an instance of java.util.regex.Pattern and deletes any Kafka
topics that match."
[config re]
(with-open [client (ja/->AdminClient config)]
(let [topics-to-delete (->> (ja/list-topics client)
(filter #(re-find re (:topic-name %))))]
(ja/delete-topics! client topics-to-delete))))

(defn destroy-state-stores
"Takes an application config and deletes local files associated with
internal state."
[streams-config]
(sh "rm" "-rf" (str "/tmp/kafka-streams/" (:application-id streams-config))))


(defmethod ig/init-key :streams-config [_ streams-config]
(let [bootstrap-servers (or (System/getenv "BOOTSTRAP_SERVERS")
(:bootstrap-servers streams-config))]
(assoc streams-config :bootstrap-servers bootstrap-servers)))

(defmethod ig/init-key :topology [_ {:keys [topology-builder-fn]}]
(let [topology-builder (deref (find-var (symbol topology-builder-fn)))
builder (j/streams-builder)]
((topology-builder (new-fake-topic-metadata)) builder)))

(defmethod ig/init-key :topics [_ {:keys [streams-config topology] :as opts}]
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))
topic-metadata (topology->topic-metadata topology streams-config)]
(with-open [client (ja/->AdminClient config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology (word-count/propertize streams-config))]
(j/start streams-app)
(assoc opts :streams-app streams-app)))


(defmethod ig/halt-key! :topics [_ {:keys [streams-config topic-metadata]}]
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))]
(re-delete-topics config (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")")))))

(defmethod ig/halt-key! :app [_ {:keys [streams-config streams-app]}]
(j/close streams-app)
(destroy-state-stores streams-config)
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))]
(re-delete-topics config (re-pattern (str "("
(:application-id streams-config)
")")))))
1 change: 1 addition & 0 deletions examples/word-count/resources/logback.xml
Loading

0 comments on commit 132c60f

Please sign in to comment.