Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Word Count Modernization #191

Merged
merged 4 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 0 additions & 148 deletions examples/dev/user.clj

This file was deleted.

40 changes: 24 additions & 16 deletions examples/word-count/deps.edn
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
{:deps
{fundingcircle/jackdaw {:mvn/version "0.6.4" :exclusions [org.apache.zookeeper/zookeeper]}
org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
org.clojure/clojure {:mvn/version "1.10.0"}
org.clojure/algo.generic {:mvn/version "0.1.3"}
{:paths
["src" "resources"]

:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-word_count_modernization-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}
:aliases
{:dev
{:extra-paths ["dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}}}

:paths
["src" "test" "dev" "../dev" "resources" "../resources"]
:test
{:extra-paths ["test"]
:extra-deps {

:aliases
{:test {:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}}
com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
70 changes: 0 additions & 70 deletions examples/word-count/dev/system.clj

This file was deleted.

118 changes: 118 additions & 0 deletions examples/word-count/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
(ns user
(:require [clojure.java.shell :refer [sh]]
[clojure.string :as str]
[jackdaw.admin :as ja]
[jackdaw.repl :refer :all]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.describe :as jsd]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[word-count])
(:import [clojure.lang ILookup Associative]))


(def config
{:streams-config {:application-id "word-count"
:bootstrap-servers "localhost:9092"
:default-key-serde "jackdaw.serdes.EdnSerde"
:default-value-serde "jackdaw.serdes.EdnSerde"
:cache-max-bytes-buffering "0"}

:topology {:topology-builder-fn :word-count/topology-builder}

:topics {:streams-config (ig/ref :streams-config)
:topology (ig/ref :topology)}

:app {:streams-config (ig/ref :streams-config)
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly config))


(deftype FakeTopicMetadata []
ILookup
(valAt [this key]
{:topic-name (name key)
:partition-count 1
:replication-factor 1
:key-serde (js/edn-serde)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should use a string serde for the key to make it easier to join with other topics. See https://github.com/FundingCircle/jackdaw-repl/commit/1ddd902e3ec325c862bdb2ef06f01050255b665e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind this is just for prototyping. We shouldn't use this or EDN keys in production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we could add a new arity that takes an argument so the user can set their own default. Would that address your use case?

:value-serde (js/edn-serde)})

Associative
(assoc [this key val]
this))

(defn new-fake-topic-metadata []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use a single instance of FakeTopicMetadata, so we can drop this function and update topic-metadata to be:

(def topic-metadata (FakeTopicMetadata.))

Then we can just use topic-metadata where new-fake-topic-metadata is currently used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do. Does this still make sense if we add an arity to allow the user to set their own default?

(FakeTopicMetadata.))

(def topic-metadata
(new-fake-topic-metadata))


(defn topology->topic-metadata
[topology streams-config]
(->> (jsd/describe-topology (.build (j/streams-builder* topology)) streams-config)
(map :nodes)
(reduce concat)
(filter #(= :topic (:type %)))
(remove (fn [x] (re-matches #".*STATE-STORE.*" (:name x))))
(map :name)
(reduce (fn [acc x] (assoc acc (keyword x) (get (new-fake-topic-metadata) x))) {})))

(defn re-delete-topics
"Takes an instance of java.util.regex.Pattern and deletes any Kafka
topics that match."
[config re]
(with-open [client (ja/->AdminClient config)]
(let [topics-to-delete (->> (ja/list-topics client)
(filter #(re-find re (:topic-name %))))]
(ja/delete-topics! client topics-to-delete))))

(defn destroy-state-stores
"Takes an application config and deletes local files associated with
internal state."
[streams-config]
(sh "rm" "-rf" (str "/tmp/kafka-streams/" (:application-id streams-config))))


(defmethod ig/init-key :streams-config [_ streams-config]
(let [bootstrap-servers (or (System/getenv "BOOTSTRAP_SERVERS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this logic to where the config map is instantiated, and get rid of this init-key method

(:bootstrap-servers streams-config))]
(assoc streams-config :bootstrap-servers bootstrap-servers)))

(defmethod ig/init-key :topology [_ {:keys [topology-builder-fn]}]
(let [topology-builder (deref (find-var (symbol topology-builder-fn)))
builder (j/streams-builder)]
((topology-builder (new-fake-topic-metadata)) builder)))

(defmethod ig/init-key :topics [_ {:keys [streams-config topology] :as opts}]
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))
topic-metadata (topology->topic-metadata topology streams-config)]
(with-open [client (ja/->AdminClient config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology (word-count/propertize streams-config))]
(j/start streams-app)
(assoc opts :streams-app streams-app)))


(defmethod ig/halt-key! :topics [_ {:keys [streams-config topic-metadata]}]
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))]
(re-delete-topics config (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")")))))

(defmethod ig/halt-key! :app [_ {:keys [streams-config streams-app]}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with that, and then separated out the cleanup or resources because sometimes you just want to stop the app but not delete everything so you can inspect the topics and state stores. https://github.com/FundingCircle/jackdaw-repl/commit/0ba5f6f70ab2a08f8f4d11819c5540d3e093574c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you'd want a full reset to be able to start clean. Could we halt just the app with a different config structure?

(j/close streams-app)
(destroy-state-stores streams-config)
(let [config (word-count/propertize (select-keys streams-config [:bootstrap-servers]))]
(re-delete-topics config (re-pattern (str "("
(:application-id streams-config)
")")))))
1 change: 1 addition & 0 deletions examples/word-count/resources/logback.xml
Loading