Skip to content

Commit

Permalink
Adds helper functions for working with transducers and two examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles Reese committed Sep 25, 2019
1 parent 9b5bbd0 commit 4dffa12
Show file tree
Hide file tree
Showing 17 changed files with 719 additions and 547 deletions.
1 change: 1 addition & 0 deletions examples/simple-ledger/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
log/
3 changes: 3 additions & 0 deletions examples/simple-ledger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Simple Ledger

This example creates a simple accounting ledger using transducers.
41 changes: 25 additions & 16 deletions examples/simple-ledger/deps.edn
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
{:deps
{danlentz/clj-uuid {:mvn/version "0.1.7"}
fundingcircle/jackdaw {:mvn/version "0.6.4"}
org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
org.clojure/clojure {:mvn/version "1.10.0"}
org.clojure/tools.logging {:mvn/version "0.4.1"}}
{:paths
["src" "resources"]

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}

:paths
["src" "test" "dev" "../dev"]
:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:aliases
{:test {:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}}
{:dev
{:extra-paths ["dev" "../../dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}
danlentz/clj-uuid {:mvn/version "0.1.7"
:exclusions [primitive-math]}}}

:test
{:extra-paths ["test"]
:extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
68 changes: 0 additions & 68 deletions examples/simple-ledger/dev/system.clj

This file was deleted.

90 changes: 90 additions & 0 deletions examples/simple-ledger/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
(ns user
"Use this namespace for interactive development.
This namespace requires libs needed to reset the app and helpers
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or
equivalent) to clean this namespace since these tools cannot tell
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
[simple-ledger :as sl]))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
{:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"])
:topic-metadata {:entry-added
{:topic-name "entry-requested"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:transaction-pending
{:topic-name "transaction-pending"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:transaction-added
{:topic-name "transaction-added"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}}}

:topology {:topology-builder sl/topology-builder
:xforms [#'sl/xf-split-entries #'sl/xf-running-balances]
:swap-fn jxf/kv-store-swap-fn}

:app {:streams-config sl/streams-config
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly repl-config))


(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata))

(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}]
(let [xform-map (reduce-kv (fn [m k v]
(let [k (keyword (str (:ns (meta v)))
(str (:name (meta v))))]
(assoc m k #(v % jxf/kv-store-swap-fn))))
{}
xforms)
streams-builder (j/streams-builder)]
((topology-builder topic-metadata xform-map) streams-builder)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology streams-config)]
(j/start streams-app)
(assoc opts :streams-app streams-app)))

(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}]
(let [re (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")"))]
(re-delete-topics client-config re)))

(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}]
(j/close streams-app)
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))
1 change: 1 addition & 0 deletions examples/simple-ledger/resources/logback.xml
Loading

0 comments on commit 4dffa12

Please sign in to comment.