diff --git a/examples/streams-examples/.gitignore b/examples/streams-examples/.gitignore
new file mode 100644
index 00000000..1f6abb76
--- /dev/null
+++ b/examples/streams-examples/.gitignore
@@ -0,0 +1,17 @@
+pom.xml
+pom.xml.asc
+*.jar
+*.class
+/lib/
+/classes/
+/target/
+/checkouts/
+/test-results/
+/logs/
+.lein-deps-sum
+.lein-repl-history
+.lein-plugins/
+.lein-failures
+.lein-env
+.nrepl-port
+.cpcache/
diff --git a/examples/streams-examples/README.md b/examples/streams-examples/README.md
new file mode 100644
index 00000000..330e1efb
--- /dev/null
+++ b/examples/streams-examples/README.md
@@ -0,0 +1,2 @@
+# Kafka Streams Jackdaw
+A typical example of the use of Kafka Streams, Jackdaw, Test Machine et al
diff --git a/examples/streams-examples/deps.edn b/examples/streams-examples/deps.edn
new file mode 100644
index 00000000..e92ae4b4
--- /dev/null
+++ b/examples/streams-examples/deps.edn
@@ -0,0 +1,31 @@
+{:paths ["resources" "src"]
+
+ :deps {org.clojure/clojure {:mvn/version "1.10.3"}
+
+ danlentz/clj-uuid {:mvn/version "0.1.9"}
+ org.clojure/tools.logging {:mvn/version "1.1.0"}
+ ch.qos.logback/logback-classic {:mvn/version "1.2.7"}
+
+ ;; Kafka Helpers
+ fundingcircle/topology-grapher {:mvn/version "0.1.3"
+ :exclusions [org.clojure/data.priority-map]}
+
+ ;; Kafka
+ ;; Explicitly bring in Kafka, rather than transitively
+ org.apache.kafka/kafka-clients {:mvn/version "2.8.1"}
+ org.apache.kafka/kafka-streams {:mvn/version "2.8.1"}
+ fundingcircle/jackdaw {:mvn/version "0.9.1"
+ :exclusions [com.fasterxml.jackson.core/jackson-annotations
+ com.fasterxml.jackson.core/jackson-databind
+ com.thoughtworks.paranamer/paranamer
+ joda-time/joda-time
+ org.apache.kafka/kafka-clients
+ org.apache.kafka/kafka-streams
+ org.apache.zookeeper/zookeeper
+ org.slf4j/slf4j-log4j12]}}
+
+ :aliases {:test {:extra-paths ["test"]}
+ :dev {:extra-paths ["test"]}}
+
+ :mvn/repos {"confluent" {:url "https://packages.confluent.io/maven/"}
+ "mulesoft" {:url "https://repository.mulesoft.org/nexus/content/repositories/public/"}}}
diff --git a/examples/streams-examples/project.clj b/examples/streams-examples/project.clj
new file mode 100644
index 00000000..8f881bad
--- /dev/null
+++ b/examples/streams-examples/project.clj
@@ -0,0 +1,44 @@
+(defproject streams "0.1.0-SNAPSHOT"
+ :description "streams-examples"
+
+ :target-path "target/%s"
+ :uberjar-name "streams-examples.jar"
+ :main streams.main
+
+ :url "https://github.com/FundingCircle/jackdaw"
+
+ :dependencies [[org.clojure/clojure "1.10.3"]
+
+ [danlentz/clj-uuid "0.1.9"]
+ [org.clojure/tools.logging "1.1.0"]
+ [ch.qos.logback/logback-classic "1.2.7"]
+
+ ;; Kafka Helpers
+ [fundingcircle/topology-grapher "0.1.3"
+ :exclusions [org.clojure/data.priority-map]]
+
+ ;; Kafka
+ ;; Explicitly bring in Kafka, rather than transitively
+ [org.apache.kafka/kafka-clients "2.8.1"]
+ [org.apache.kafka/kafka-streams "2.8.1"]
+ [fundingcircle/jackdaw "0.9.1"
+ :exclusions [com.fasterxml.jackson.core/jackson-annotations
+ com.fasterxml.jackson.core/jackson-databind
+ com.thoughtworks.paranamer/paranamer
+ joda-time
+ org.apache.kafka/kafka-clients
+ org.apache.kafka/kafka-streams
+ org.apache.zookeeper/zookeeper
+ org.slf4j/slf4j-log4j12]]]
+
+ :resource-paths ["resources" "resources/avro-schemas"]
+
+ :repl-options {:init-ns streams.main}
+
+ :profiles
+ {:dev {:resource-paths ["test/resources"]
+ :dependencies [[org.apache.kafka/kafka-streams-test-utils "2.8.1"]
+ [org.apache.kafka/kafka_2.13 "2.8.1"]]}}
+
+ :repositories
+ {"confluent" {:url "https://packages.confluent.io/maven/"}})
diff --git a/examples/streams-examples/resources/logback.xml b/examples/streams-examples/resources/logback.xml
new file mode 100644
index 00000000..935487f1
--- /dev/null
+++ b/examples/streams-examples/resources/logback.xml
@@ -0,0 +1,13 @@
+
+
+
+ %d %level %logger: %msg%n
+ true
+
+
+
+
+
+
+
+
diff --git a/examples/streams-examples/src/streams/config.clj b/examples/streams-examples/src/streams/config.clj
new file mode 100644
index 00000000..8c841f18
--- /dev/null
+++ b/examples/streams-examples/src/streams/config.clj
@@ -0,0 +1,40 @@
+(ns streams.config)
+
+(def ^:private base-event-topic-config
+ {:partition-count 3
+ :key-serde {:serde-keyword :jackdaw.serdes/string-serde}
+ :value-serde {:serde-keyword :jackdaw.serdes.json/serde}})
+
+(def ^:private base-changelog-topic-config
+ {:partition-count 3
+ :key-serde {:serde-keyword :jackdaw.serdes/string-serde}
+ :value-serde {:serde-keyword :jackdaw.serdes.json/serde}
+ ;; Changelog
+ :config {"cleanup.policy" "compact"
+ "segment.ms" "86400000"}})
+
+(defn config []
+ {:app-name "streams"
+
+ :streams-settings
+ {:schema-registry-url "http://localhost:8081"
+ :bootstrap-servers "localhost:9092"
+ :commit-interval "1"
+ :num-stream-threads "3"
+ :cache-max-bytes-buffering "0"
+ :replication-factor "1"
+ :state-dir "/tmp/kafka-streams"
+ :sidecar-port "9090"
+ :sidecar-host "0.0.0.0"}
+
+ :topics
+ {:input (merge {:topic-name "input-1"} base-event-topic-config)
+ :output (merge {:topic-name "output-1"} base-event-topic-config)
+ :state (merge {:topic-name "state-1"} base-changelog-topic-config)}
+
+ :stores
+ {:state-store {:store-name "state-store-1"
+ :key-serde {:serde-keyword :jackdaw.serdes/string-serde}
+ :value-serde {:serde-keyword :jackdaw.serdes.json/serde}}}
+
+ :global-stores {}})
diff --git a/examples/streams-examples/src/streams/core.clj b/examples/streams-examples/src/streams/core.clj
new file mode 100644
index 00000000..77926906
--- /dev/null
+++ b/examples/streams-examples/src/streams/core.clj
@@ -0,0 +1,173 @@
+(ns streams.core
+ (:require
+ [clojure.tools.logging :as log]
+ [clojure.java.io :as io]
+ [clojure.string :as string]
+ [jackdaw.serdes.resolver :as jd-resolver]
+ [jackdaw.streams :as js]
+ [streams.config :refer [config]]
+ [topology-grapher.describe :as td]
+ [topology-grapher.render :as tr])
+ (:import
+ (jackdaw.streams.interop CljStreamsBuilder)
+ (java.time Duration)
+ (java.util Properties)
+ (org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler)
+ (org.apache.kafka.streams KafkaStreams KafkaStreams$State KafkaStreams$StateListener StreamsBuilder Topology)))
+
+
+;; Config
+
+(defn load-config []
+ (config))
+
+(defn props-for ^Properties [config-map]
+ (doto (Properties.)
+ (.putAll (reduce-kv (fn [m k v]
+ (assoc m (-> (name k)
+ (string/replace "-" "."))
+ (str v)))
+ {} config-map))))
+
+;; Some Jackdaw serdes sugar
+
+(defn- resolve-serde
+ [schema-reg-url schema-reg-client serde-config]
+ (reduce (fn [conf serdes-key]
+ (assoc conf serdes-key ((jd-resolver/serde-resolver
+ :schema-registry-url schema-reg-url
+ :schema-registry-client schema-reg-client)
+ (serdes-key serde-config))))
+ serde-config [:key-serde :value-serde]))
+
+(defn reify-serdes-config
+ "Converts the serdes references in the topic & store configs into
+ actual jackdaw serdes implementations, so the config can be used
+ in the jackdaw API calls."
+ ([config]
+ (reify-serdes-config config nil))
+ ([config schema-reg-client]
+ ;; If no schema-reg-client is provided, Jackdaw will construct
+ ;; one as needed using the :streams-settings :schema-registry-url in the kafka config
+ ;; for the app (see config.edn).
+ ;; Typically a schema-reg-client is passed here for testing (to pass a mock)
+ (let [schema-reg-url (get-in config [:streams-settings :schema-registry-url])
+ topics (into {} (map (fn [[k t]]
+ [k (resolve-serde schema-reg-url schema-reg-client t)])
+ (:topics config)))
+ stores (into {} (map (fn [[k s]]
+ [k (resolve-serde schema-reg-url schema-reg-client s)])
+ (:stores config)))
+ global-stores (into {} (map (fn [[k v]]
+ (if-let [resolved-topic (get topics (:source-topic v))]
+ [k (assoc v :source-topic resolved-topic)]
+ (throw (ex-info (str "Source topic not found for global store " k) v))))
+ (:global-stores config)))]
+ (assoc config
+ :topics topics
+ :stores stores
+ :global-stores global-stores))))
+
+;; Some Running-a-topology Sugar
+
+(defn exit-on-error-handler ^StreamsUncaughtExceptionHandler []
+ (reify StreamsUncaughtExceptionHandler
+ (handle [_ ex]
+ (try
+ (future (System/exit 1))
+ (finally (throw ex))))))
+
+(defn logging-error-handler ^StreamsUncaughtExceptionHandler []
+ (reify StreamsUncaughtExceptionHandler
+ (handle [_ ex]
+ (log/error ex "Kafka Streams error!"))))
+
+(defn startup-listener [startup-result]
+ (reify KafkaStreams$StateListener
+ (onChange [_ new-state old-state]
+ (when-not (realized? startup-result)
+ (cond
+ (= KafkaStreams$State/RUNNING new-state)
+ (deliver startup-result :running)
+ (or (= KafkaStreams$State/ERROR new-state)
+ (= KafkaStreams$State/PENDING_SHUTDOWN new-state))
+ (deliver startup-result :failed)
+ :else nil)))))
+
+;; Main interfaces to build and run a topology
+
+(defn build-topology
+ "Given one of the various ways of building a topology object for
+ kafka streams, returns a built Topology."
+ [stream-build-fn config]
+ (let [configured-stream (stream-build-fn config)]
+ (condp instance? configured-stream
+ ;; If we built a Topology already, e.g use of the Processor API, just return it
+ Topology configured-stream
+ ;; Jackdaw Streams DSL, wrapped streams builder
+ CljStreamsBuilder (.build ^StreamsBuilder (js/streams-builder* configured-stream))
+ ;; Kafka Streams DSL
+ StreamsBuilder (.build ^StreamsBuilder configured-stream)
+ ;; Erk
+ (throw (Exception. (str "Unknown builder type: " (type configured-stream)))))))
+
+(defn start-topology
+ "Given a topology and the streams config to run it, create and run a
+ KafkaStreams for ever and ever. Accepts optional state listener and
+ exception handler."
+ (^KafkaStreams [^Topology topology config]
+ (start-topology topology config nil))
+ (^KafkaStreams [^Topology topology config extra-setup-fn]
+ (let [kafka-streams (KafkaStreams. ^Topology topology
+ (props-for (:streams-settings config)))]
+ (when extra-setup-fn
+ (extra-setup-fn kafka-streams))
+ (.start kafka-streams)
+ (.addShutdownHook (Runtime/getRuntime)
+ (Thread. (let [main-thread (Thread/currentThread)]
+ (fn []
+ (.close kafka-streams (Duration/ofSeconds 60))
+ (.join main-thread 45000)))
+ "Shutdown thread"))
+ kafka-streams)))
+
+;;
+;; topology rendering
+;;
+(def topology-domain-meta {:domain "jackdaw"
+ :subdomain "examples"
+ :application "stream"})
+
+(defn render-topology [stream-build-fn stream-config]
+ (let [topology [{:application-name (get-in stream-config [:streams-settings :application-id])
+ :topology (build-topology stream-build-fn stream-config)}]
+ graph (td/gen-topologies topology topology-domain-meta)]
+ {:topic-level (tr/render-graph (vals graph) {:fmt "pdf" :mode "topics" :cache false})
+ :detail (tr/render-graph (vals graph) {:fmt "pdf" :mode "detail" :cache false})}))
+
+;; The whole app config is passed through the application here.
+;; This config contains:
+;; - the kafka streams configuration
+;; - the topic configurations, in jackdaw format
+;; - the state store configurations, in jackdaw format
+;; All serdes should be reified here
+
+(defn run-topology ^KafkaStreams [stream-build-fn stream-config]
+ (let [startup-result (promise)
+ error-handler-fn (if (true? (:hard-exit-on-error stream-config))
+ exit-on-error-handler
+ logging-error-handler)
+ stream (-> (build-topology stream-build-fn stream-config)
+ (start-topology stream-config
+ (fn [^KafkaStreams kafka-streams]
+ (doto kafka-streams
+ (.setUncaughtExceptionHandler
+ ^StreamsUncaughtExceptionHandler (error-handler-fn))
+ (.setStateListener
+ (startup-listener startup-result))))))]
+ (let [startup-state @startup-result]
+ (if (= :running startup-state)
+ (log/info (:streams-settings stream-config) "Started topology")
+ (throw (ex-info "Failed to start topology :(" {:state startup-state
+ :config stream-config}))))
+ stream))
diff --git a/examples/streams-examples/src/streams/main.clj b/examples/streams-examples/src/streams/main.clj
new file mode 100644
index 00000000..584ddea4
--- /dev/null
+++ b/examples/streams-examples/src/streams/main.clj
@@ -0,0 +1,34 @@
+(ns streams.main
+ (:require
+ [clojure.tools.logging :as log]
+ [streams.stack-calculator :as stack-calculator]
+ [streams.core :as core]))
+
+(defn execute!
+ [topology-config-fn stream-build-fn action-fn]
+ (let [stream-config (-> (core/load-config)
+ (assoc :hard-exit-on-error true)
+ (topology-config-fn)
+ (core/reify-serdes-config))]
+ (action-fn stream-build-fn stream-config)))
+
+(defn render-basic-stream-dsl [topology-name]
+ (case topology-name
+ "stack-calculator"
+ (execute! stack-calculator/configure-topology
+ stack-calculator/build-stream
+ core/render-topology)
+ (str "Unknown topology " topology-name)))
+
+(defn -main [& args]
+ (log/info {:args args} "Running application ...")
+ (let [base-config (assoc (core/load-config)
+ :hard-exit-on-error true)]
+ (doseq [topology-name args]
+ (log/info {:topology-name topology-name} "Starting topology")
+ (case topology-name
+ "stack-calculator"
+ (execute! stack-calculator/configure-topology
+ stack-calculator/build-stream
+ core/run-topology)
+ (throw (ex-info (str "Unknown topology " topology-name) {:args args}))))))
diff --git a/examples/streams-examples/src/streams/stack_calculator.clj b/examples/streams-examples/src/streams/stack_calculator.clj
new file mode 100644
index 00000000..b6737fe7
--- /dev/null
+++ b/examples/streams-examples/src/streams/stack_calculator.clj
@@ -0,0 +1,48 @@
+(ns streams.stack-calculator
+ (:require
+ [clojure.tools.logging :as log]
+ [jackdaw.streams :as js]))
+
+(def application-id "basic-stream-dsl")
+
+(def op-map {"+" +
+ "-" -
+ "*" *
+ "/" /})
+
+(defn- apply-opfn [op [a b & vl]]
+ (concat (list (op b a)) vl))
+
+(defn stack-reducer [acc [_ v]]
+ (if-let [opfn (op-map v)]
+ (apply-opfn opfn acc)
+ (concat [v] acc)))
+
+(defn stack-calculator-topology [builder
+ {:keys [input
+ output] :as topics}
+ stores]
+ (-> builder
+ (js/kstream input)
+ (js/peek (fn [m]
+ (log/info ::received m)))
+ (js/group-by-key)
+ (js/aggregate list
+ stack-reducer
+ output)
+ (js/to-kstream)
+ (js/peek (fn [m]
+ (log/info ::produced m)))
+ (js/to output))
+ builder)
+
+(defn configure-topology [config]
+ (assoc-in config
+ [:streams-settings :application-id]
+ application-id))
+
+(defn build-stream
+ ([config]
+ (build-stream (js/streams-builder) config))
+ ([builder {:keys [topics stores] :as config}]
+ (stack-calculator-topology builder topics stores)))
diff --git a/examples/streams-examples/test/resources/journal.edn b/examples/streams-examples/test/resources/journal.edn
new file mode 100644
index 00000000..60b68647
--- /dev/null
+++ b/examples/streams-examples/test/resources/journal.edn
@@ -0,0 +1,15 @@
+{:topics
+ {:input
+ ({:topic :input, :key "key-1", :value 1, :partition 0, :offset 37, :headers {}}
+ {:topic :input, :key "key-1", :value 2, :partition 0, :offset 38, :headers {}}
+ {:topic :input, :key "key-1", :value "+", :partition 0, :offset 39, :headers {}}
+ {:topic :input, :key "key-2", :value 3, :partition 0, :offset 40, :headers {}}
+ {:topic :input, :key "key-2", :value 4, :partition 0, :offset 41, :headers {}}
+ {:topic :input, :key "key-2", :value "+", :partition 0, :offset 42, :headers {}}),
+ :output
+ ({:topic :output, :key "key-1", :value [1], :partition 0, :offset 7, :headers {}}
+ {:topic :output, :key "key-1", :value [2 1], :partition 0, :offset 8, :headers {}}
+ {:topic :output, :key "key-1", :value [3], :partition 0, :offset 9, :headers {}}
+ {:topic :output, :key "key-2", :value [3], :partition 0, :offset 10, :headers {}}
+ {:topic :output, :key "key-2", :value [4 3], :partition 0, :offset 11, :headers {}}
+ {:topic :output, :key "key-2", :value [7], :partition 0, :offset 12, :headers {}})}}
diff --git a/examples/streams-examples/test/resources/logback-test.xml b/examples/streams-examples/test/resources/logback-test.xml
new file mode 100644
index 00000000..cd54da3d
--- /dev/null
+++ b/examples/streams-examples/test/resources/logback-test.xml
@@ -0,0 +1,21 @@
+
+
+ logs/application.log
+
+
+ logs/application.%d{yyyy-MM-dd}.log
+
+ 10
+ 1GB
+
+
+
+ %d %level %logger: %msg%n
+ true
+
+
+
+
+
+
+
diff --git a/examples/streams-examples/test/streams/core_test.clj b/examples/streams-examples/test/streams/core_test.clj
new file mode 100644
index 00000000..fbca8db5
--- /dev/null
+++ b/examples/streams-examples/test/streams/core_test.clj
@@ -0,0 +1,43 @@
+(ns streams.core-test
+ (:require
+ [clj-uuid :as uuid]
+ [clojure.test :refer :all]
+ [jackdaw.test.journal :as j]
+ [streams.core :as core]
+ [streams.framework.integration-test :as itc]))
+
+(deftest props-for-test
+ (testing "Empty map to java Properties"
+ (let [p (core/props-for {})]
+ (is (= java.util.Properties (type p)))
+ (is (.isEmpty p))))
+ (testing "Config map to java Properties"
+ (let [p (core/props-for {:name-one 42
+ :name_two "foo"
+ "string-key" true
+ "key_string" 0
+ :foo.bar :baz
+ "hello.there" 12})]
+ (is (= java.util.Properties (type p)))
+ (is (not (.isEmpty p)))
+
+ (is (= "42" (.getProperty p "name.one")))
+ (is (= "foo" (.getProperty p "name_two")))
+ (is (= "true" (.getProperty p "string.key")))
+ (is (= "0" (.getProperty p "key_string")))
+ (is (= ":baz" (.getProperty p "foo.bar")))
+ (is (= "12" (.getProperty p "hello.there")))
+ (is (nil? (.getProperty p "not.found"))))))
+
+(deftest journal-test
+ (let [j (itc/slurp-journal "test/resources/journal.edn")]
+ (testing "Journal messages by key"
+ (let [mi (map :value (itc/msg-for-key j :input "key-1"))
+ mo (map :value (itc/msg-for-key j :output "key-2"))]
+ (is (= [1 2 "+"] mi))
+ (is (= [[3] [4 3] [7]] mo))))
+ (testing "Journal watchers"
+ (is (true? ((itc/watch-msg-count :input 6) j)))
+ (is (false? ((itc/watch-msg-count :input 5) j)))
+ (is (true? ((itc/watch-msg-count-for-key :output "key-2" 3) j)))
+ (is (false? ((itc/watch-msg-count-for-key :output "key-2" 4) j))))))
diff --git a/examples/streams-examples/test/streams/framework/admin_tools.clj b/examples/streams-examples/test/streams/framework/admin_tools.clj
new file mode 100644
index 00000000..47636979
--- /dev/null
+++ b/examples/streams-examples/test/streams/framework/admin_tools.clj
@@ -0,0 +1,70 @@
+(ns streams.framework.admin-tools
+ (:require
+ [clojure.tools.logging :as log]
+ [clojure.reflect :refer [resolve-class]])
+ (:import
+ (org.apache.kafka.clients.admin AdminClient ListTopicsResult NewTopic)
+ (org.apache.kafka.common KafkaFuture)))
+
+(defn new-topic ^NewTopic [t]
+ (doto
+ (NewTopic. (:topic-name t)
+ (int (:partition-count t))
+ (short 1)) ;; Replication factor 1
+ (.configs (:config t))))
+
+(defn list-topics ^ListTopicsResult [^AdminClient client]
+ (.listTopics client))
+
+(defn create-topics-future
+ "Returns a KafkaFuture which wraps the async creation of the topics in the
+ supplied map of topics"
+ ^KafkaFuture [^AdminClient client topic-config]
+ (let [required (->> topic-config
+ (filter (fn [[_ v]]
+ (not (.contains (.get (.names (list-topics client)))
+ (:topic-name v)))))
+ (map (fn [[_ v]]
+ (new-topic v))))]
+ (.all (.createTopics client required))))
+
+(defn create-topics
+ "Creates the topics listed in the config against a (remote) Kafka Broker from the config"
+ [{:keys [streams-settings topics] :as config}]
+ (log/info "Creating topics")
+ (with-open [client (AdminClient/create
+ {"bootstrap.servers" (:bootstrap-servers streams-settings)
+ "request.timeout.ms" "10000"
+ "client.id" "topic-fixture-admin"})]
+ (-> (create-topics-future client topics)
+ (.get 10000 java.util.concurrent.TimeUnit/MILLISECONDS))
+ (log/info "Created topics OK!")))
+
+(defn- reset-fn
+ [^kafka.tools.StreamsResetter rt args]
+ (.run rt (into-array String args)))
+
+(defn- class-exists? [c]
+ (resolve-class (.getContextClassLoader (Thread/currentThread)) c))
+
+(defn reset-application
+ "Runs the kafka.tools.StreamsResetter with the supplied `reset-args` as parameters"
+ [{:keys [streams-settings] :as config}]
+ (if-not (class-exists? 'kafka.tools.StreamsResetter)
+ (throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool"))
+ (let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter"))
+ args ["--application-id" (:application-id streams-settings)
+ "--bootstrap-servers" (:bootstrap-servers streams-settings)]
+ _ (log/info "Resetting application for test")
+ result (with-open [out-str (java.io.StringWriter.)
+ err-str (java.io.StringWriter.)]
+ (binding [*out* out-str
+ *err* err-str]
+ (let [status (reset-fn rt args)]
+ (flush)
+ {:status status
+ :out (str out-str)
+ :err (str err-str)})))]
+ (if (zero? (:status result))
+ (log/info "Application reset OK!")
+ (throw (ex-info "failed to reset application. check logs for details" result))))))
diff --git a/examples/streams-examples/test/streams/framework/integration_test.clj b/examples/streams-examples/test/streams/framework/integration_test.clj
new file mode 100644
index 00000000..bce236db
--- /dev/null
+++ b/examples/streams-examples/test/streams/framework/integration_test.clj
@@ -0,0 +1,203 @@
+(ns streams.framework.integration-test
+ (:require
+ [clj-uuid :as uuid]
+ [clojure.edn :as edn]
+ [clojure.java.io :as io]
+ [clojure.pprint :as pp]
+ [clojure.test :as c.t]
+ [jackdaw.test :as jd.test]
+ [jackdaw.test.commands.watch :as watch]
+ [jackdaw.test.journal :as j]
+ [streams.core :as core]
+ [streams.framework.admin-tools :as admin])
+ (:import
+ (io.confluent.kafka.schemaregistry.client MockSchemaRegistryClient)
+ (java.io File)
+ (java.time LocalDateTime)
+ (org.apache.kafka.streams Topology TopologyTestDriver)))
+
+;; Journal Helpers
+
+(defn msg-for-key [journal topic k]
+ (filter (fn [m]
+ (= (:key m) k)) (j/raw-messages journal topic)))
+
+(defn watch-msg-count [topic c]
+ (fn [journal]
+ (let [ms (j/messages journal topic)]
+ (= c (count ms)))))
+
+(defn watch-msg-count-for-key [topic k c]
+ (fn [journal]
+ (let [ms (msg-for-key journal topic k)]
+ (= c (count ms)))))
+
+(defn result-ok? [tm-result]
+ (doseq [err (remove (fn [r]
+ (= :ok (:status r))) tm-result)]
+ (pp/pprint err))
+ (every? #(= :ok %) (map :status tm-result)))
+
+(defn export-journal
+ ([journal] (export-journal journal "./test-results"))
+ ([journal ^String directory]
+ (let [rdir (File. directory)
+ rfile (File. rdir (str "journal-" (LocalDateTime/now)))]
+ (when (or (.exists rdir) (.mkdir rdir))
+ (println (str "writing results to \"" rfile "\""))
+ (println)
+ (with-open [f (io/writer rfile)]
+ (pp/pprint journal f))))))
+
+(defn slurp-journal [file]
+ (edn/read-string (slurp file)))
+
+(defn summarise [journal]
+ (pp/print-table
+ (sort-by :topic
+ (map (fn [[k v]]
+ {:topic k :messages (count v)}) (:topics journal)))))
+
+(defn summarise-and-export [journal]
+ (summarise journal)
+ (export-journal journal))
+
+;; Fixtures
+
+(defn topic-fixture
+ "Returns a fixture function that creates all the topics named in the app's topics
+ config before running a test function."
+ [config]
+ (fn [t]
+ (admin/create-topics config)
+ (t)))
+
+(defn reset-application-fixture
+ "Returns a fixture that runs the kafka.tools.StreamsResetter with the supplied
+ `reset-args` as parameters"
+ [config]
+ (fn [t]
+ (admin/reset-application config)
+ (t)))
+
+(def ^:dynamic *test-machine-transport-fn* nil)
+
+(defn- kafka-transport [{:keys [topics streams-settings] :as config}]
+ (jd.test/kafka-transport
+ {"bootstrap.servers" (:bootstrap-servers streams-settings)
+ "group.id" (str "test-machine-consumer-" (uuid/v4))}
+ topics))
+
+(defn run-topology-fixture
+ "Returns a fixture which runs the passed topology as a kafka stream, and binds
+ a remote broker test machine transport to *test-machine-transport-fn*"
+ [stream-build-fn config]
+ (fn [t]
+ (let [stream (core/run-topology stream-build-fn (assoc config :hard-exit-on-error false))]
+ (try
+ (binding [*test-machine-transport-fn* (fn []
+ (kafka-transport config))]
+ (t))
+ (finally
+ (.close stream))))))
+
+(defn- mock-transport [^Topology topology
+ {:keys [topics streams-settings] :as config}]
+ (jd.test/mock-transport
+ {:driver (TopologyTestDriver. topology
+ (core/props-for streams-settings))}
+ topics))
+
+(defn mock-topology-fixture
+ "Returns a fixture which creates a Topology Test Driver and binds a test machine
+ transport using the driver to *test-machine-transport-fn*"
+ [stream-build-fn config]
+ (fn [t]
+ (let [topology (core/build-topology stream-build-fn config)]
+ (binding [*test-machine-transport-fn* (fn [] (mock-transport topology config))
+ watch/*default-watch-timeout* 5000]
+ (t)))))
+
+;; mode of test
+;; :mock - run with TopologyTestDriver (requires no running kafka)
+;; :integrated - creates required topics and starts topology, runs tests
+;; against kafka. Requires externally runnign kafka platform.
+;; :remote - assumes the app is already running, and just runs the tests
+;; against kafka.
+(def test-machine-mode (atom :mock))
+
+;; All-in-one integration test fixture
+
+(defn integration-fixture
+ "Runs a topology for a test. Will run in one of two modes based on
+ `test-machine-mode`. It will set up the config passed in accordingly.
+ Best used as a :once fixture.
+
+ :mock
+ Runs the topology using the TopologyTestDriver
+
+ :integrated
+ Runs the topology using the core streams running code, i.e. as an
+ actual independent KafkaStreams instance. For this, a separately
+ running instance of the confluent platform is required (broker, schema-reg)
+ This mode will create any missing topics for the test
+ and reset the topology it it already exists. The confluent platform
+ must be started separately somehow (the fixture does not start it).
+
+ :remote
+ Assumes the topology under test is already running somehow else, and
+ only sets up the test machine for writign to and observing Kafka. Not
+ strictly useful for tests - but would allow a CI integration test to
+ then also be run against a remote UAT env etc."
+ [topology-build-fn config]
+ (c.t/join-fixtures
+ (condp = @test-machine-mode
+ :mock
+ ;; Resolve serdes with MockSchemaRegistryClient
+ ;; Set the backing state stores location to a unique place
+ (let [config-for-test (-> config
+ (core/reify-serdes-config (MockSchemaRegistryClient.))
+ (update-in [:streams-settings :state-dir]
+ (fn [dir]
+ (str dir "/" (uuid/v4)))))]
+ [(mock-topology-fixture topology-build-fn config-for-test)])
+
+ :integrated
+ ;; Resolve serdes with real SchemaRegistryClient
+ (let [config-for-test (core/reify-serdes-config config)]
+ [(topic-fixture config-for-test)
+ (reset-application-fixture config-for-test)
+ (run-topology-fixture topology-build-fn config-for-test)])
+
+ :remote
+ ;; Resolve serdes with real SchemaRegistryClient, and bind a remote kafka
+ ;; transport with a fixture
+ (let [config-for-test (core/reify-serdes-config config)]
+ [(fn [t]
+ (binding [*test-machine-transport-fn* (fn [] (kafka-transport config-for-test))]
+ (t)))])
+
+ ;; unknown mode
+ (throw (ex-info (str "Unknown integration mode " @test-machine-mode)
+ {:mode @test-machine-mode})))))
+
+;; Running Tests
+
+(defn run-test
+ "Runs a test machine test. Must be called wrapped with one of the fixtures
+ which binds *test-machine-transport-fn*, either:
+ - mock-topology-fixture
+ - run-topology-fixture
+ The `integration-fixture` will call one of those also, based on the value
+ of *test-machine-mode*"
+ ([commands] (run-test commands c.t/*testing-contexts* ))
+ ([commands descriptions]
+ (when (not *test-machine-transport-fn*)
+ (throw (ex-info "run-test must be called wrapped by a fixture which binds *test-machine-transport-fn*, see docs" {})))
+ (doseq [m descriptions] (println "*" m))
+ (jd.test/with-test-machine
+ (*test-machine-transport-fn*)
+ (fn [machine]
+ (let [result (jd.test/run-test machine commands)]
+ (summarise-and-export (:journal result))
+ result)))))
diff --git a/examples/streams-examples/test/streams/stack_calculator_test.clj b/examples/streams-examples/test/streams/stack_calculator_test.clj
new file mode 100644
index 00000000..649788de
--- /dev/null
+++ b/examples/streams-examples/test/streams/stack_calculator_test.clj
@@ -0,0 +1,73 @@
+(ns streams.stack-calculator-test
+ (:require
+ [clj-uuid :as uuid]
+ [clojure.test :refer :all]
+ [jackdaw.test.journal :as j]
+ [streams.stack-calculator :as sut]
+ [streams.core :as app]
+ [streams.framework.integration-test :as itc]))
+
+;; alter this to run against a local kafka broker etc.
+(reset! itc/test-machine-mode :mock)
+
+(use-fixtures :once (itc/integration-fixture sut/build-stream
+ (sut/configure-topology (app/load-config))))
+
+
+(deftest stack-reducer-fn-test
+ (testing "the stack calculator topologies reducer fn"
+ (let [to-topic (fn [l]
+ (map (fn [v]
+ [:k v]) l))]
+ (is (= [3 2 1] (reduce sut/stack-reducer [] (to-topic [1 2 3]))))
+ (is (= [3] (reduce sut/stack-reducer [] (to-topic [1 2 "+"]))))
+ (is (= [-2] (reduce sut/stack-reducer [] (to-topic [2 4 "-"]))))
+ (is (= [8] (reduce sut/stack-reducer [] (to-topic [2 4 "*"]))))
+ (is (= [1/2] (reduce sut/stack-reducer [] (to-topic [2 4 "/"]))))
+ (is (= [6] (reduce sut/stack-reducer [] (to-topic [1 2 3 "+" "+"])))))))
+
+(deftest calculation-test
+ (testing "Test the stack calculator"
+ (let [test-id (str (uuid/v4))
+ {:keys [results journal]}
+ (itc/run-test
+ [[:write! :input 1 {:key test-id}]
+ [:write! :input 2 {:key test-id}]
+ [:write! :input "+" {:key test-id}]
+
+ [:write! :input 3 {:key test-id}]
+ [:write! :input 4 {:key test-id}]
+ [:write! :input "+" {:key test-id}]
+
+ [:write! :input "*" {:key test-id}]
+ [:watch (itc/watch-msg-count-for-key :output test-id 7)]])]
+ (is (itc/result-ok? results))
+ (is (= [21] (-> journal
+ (j/messages :output)
+ (last)))))))
+
+;; TODO more dynamic key mapping / counts
+(deftest regression-test
+ (testing "Assert the calculator using a journal from a previous run"
+ (let [test-id-1 (str (uuid/v4))
+ test-id-2 (str (uuid/v4))
+ input-journal (itc/slurp-journal "test/resources/journal.edn")
+ {:keys [results journal]}
+ (itc/run-test
+ ;; Build the input from the loaded journals input topic
+ ;; alter the keys though so they are unique to this test run
+ (concat
+ (map (fn [v]
+ [:write! :input (:value v) {:key test-id-1}])
+ (itc/msg-for-key input-journal :input "key-1"))
+ (map (fn [v]
+ [:write! :input (:value v) {:key test-id-2}])
+ (itc/msg-for-key input-journal :input "key-2"))
+ [[:watch (itc/watch-msg-count-for-key :output test-id-1 3)]
+ [:watch (itc/watch-msg-count-for-key :output test-id-2 3)]]))]
+ (is (itc/result-ok? results))
+ ;; assert the output is the same as the loaded journals output
+ (is (= (map :value (itc/msg-for-key input-journal :output "key-1"))
+ (map :value (itc/msg-for-key journal :output test-id-1))))
+ (is (= (map :value (itc/msg-for-key input-journal :output "key-2"))
+ (map :value (itc/msg-for-key journal :output test-id-2)))))))