Welcome to the source code for Kafka Streams in Action. Here you'll find directions for running the example code from the book. If any of the examples fail to produce output make sure
you have created the topics needed. For those running on Max/*nix there is the create-topic.sh
file in the bin
directory which creates all required topics
ahead of time.
If you have any issues with the examples you can post a question in the Manning Authors forum at https://forums.manning.com/forums/kafka-streams-in-action
The examples in Chapter 9 are more involved and require some extra steps to run. The first example we'll go over is the Kafka-Connect and Kafka Streams integration.
To run the Kafka Connect and Kafka Streams example you'll need to do the following:
-
Update the
plugin.path
property in theconnect-standalone.properties
file to the path where you cloned this repository. Theplugin.path
property contains the path to the upber-jar file with the Confluent JDBC connector and the H2 database classes. Make sure just to update the base location of where you installed the source code, but leave the rest of the path in place. -
Copy both the
connector-jdbc.properties
andconnect-standalone.properties
files to the<kafka install dir>/config
directory. -
Open a terminal window and cd into the base directory of the source code, the run
./gradlew runDatabaseInserts
this will start the H2 database servers and start inserting data into a table that Kafka-Connect monitors. -
In another terminal window cd into
<kafka install dir>/bin
and run./connect-standalone.sh ../config/connect-standalone.properties ../config/connector-jdbc.properties
this will start Kafka Connect and it will start pulling data from the database table into Kafka. -
Open a third terminal window from the base of the source code install and run
./gradlew runStreamsConnectIntegration_Chapter_9
and this will start the Kafka Streams application that will start stream processing data from a database table via Connect!
For this example to work properly you must start the database server/insert process before starting Kafka-Connect.
To clean up or start the example over remove the Connect offsets (stored in the file /tmp/connect.offsets
by default) and remove the H2 database file
file (findata.mv.db
) stored in your home directory.
To prepare for the interactive queries, you'll need to increase the partitions on the stock-transactions
topic to two partitions with the following command:
<kafka base dir>/bin/kafka-topics.sh --alter --partitions 2 --zookeeper localhost:2181 --topic stock-transactions
NOTE
The Interactive Query example makes use of the stock-transactions
topic which is used in previous examples. If you go back to any of the
earlier examples that use the stock-transactions
topic, you'll need to delete it topic an create it again.
The to run the interactive query examples you'll execute 3 commands, each in a separate terminal, from the base directory of the source code install:
./gradlew runProducerInteractiveQueries
runs the producer sending./gradlew runInteractiveQueryApplicationOne
starts a Streams instance with an embedded web server onlocalhost:4567
./gradlew runInteractiveQueryApplicationTwo
starts a Streams instance with an embedded web server onlocalhost:4568
After all three are running you can try out some of the REST calls from your browser. It does'nt matter which port you choose, you'll retrieve the same results. Here are some examples:
http://localhost:4567/kv/TransactionsBySector
shows aggregated transactions by market sectorhttp://localhost:4567/window/NumberSharesPerPeriod/XXXX
shows the number of shares traded over 10 second windows for the last 30 seconds for a given stock symbol. Just replace theXXXX
with one of the followingAEBB, VABC, ALBC, EABC, BWBC, BNBC, MASH, BARX, WNBC, WKRP
http://localhost:4567/session/CustomerPurchaseSessions/NNNNNNNNN
displays the average dollar amount spent per share by customers over one hour sessions. Just replace theNNNNNNNNN
with one of the following customer ids12345678, 222333444, 33311111, 55556666, 4488990011, 77777799, 111188886,98765432, 665552228, 660309116
However the best way to watch Interactive Queries in action is to point your browser to localhost:4568/iq
or localhost:4567/iq
.
This will launch a web application that updates results for all parameters over all three stores TransactionsBySector, NumberSharesPerPeriod, CustomerPurchaseSessions
every
7 seconds via ajax requests to the REST endpoints and are displayed in the web application.
Again it does not matter which port you point to you'll get the same results.
Here are some notes regarding the source code:
-
Since chapter 8 is all about testing, all examples run as tests from within your IDE, although if you want you can run tests from the command line via a
./gradlew clean test
command. -
All unit tests use JUnit 5.0, so it's good opportunity to learn the changes that have come with the new version.
-
Chapter 7 examples are concerned with observing performance so at least one of the examples will continue to run until you explicitly stop the program from running. Most of the related results are found in log files or viewed JMX.
-
For the examples in Chapters 5 and 6 since they rely more on timestamps and potential joins, sometimes it takes a few seconds for data to show up. Additionally random data is generated for each run of an example so example runs produce better data than others. Please be patient.
-
If at first you don't get any results, re-run the example. It could be a I missed adding a topic name to the create-topics.sh script and the topic does not exist yet, but Kafka is configured to create topics automatically.
This project assumes and requires the following
- Java 8
- Gradle
If you don't have gradle installed, that's ok, this project uses the gradle wrapper. This means the first time you run the ./gradlew or gradlew command gradle will be installed for you.
- kafka_2.12-1.0.0.tgz
Kafka itself (version 2.12-1.0.0) is included as a convenience.
All other dependencies are taken care of via gradle.
The gradle eclipse and intellij plugins are included in the build.gradle file.
- To set up for eclipse run ./gradlew eclipse (for windows gradlew eclipse) from the base directory of this repo.
- To set up for intellij run ./gradlew idea (for windows gradlew idea) from the base directory of this repo.
Run tar xvzf kafka_2.12-1.0.0.tgz some where on your computer.
- To start kafka go to /kafka_2.12-1.0.0/bin
- Run zookeeper-server-start.sh
- Run kafka-server-start.sh
If you are on windows, go to the /kafka_2.12-1.0.0/bin/windows directory and run the .bat files with the same name and in the same order.
- To start kafka go to /kafka_2.12-1.0.0/bin
- Run kafka-server-stop.sh
- Run zookeeper-server-stop.sh
If you are on windows, go to the /kafka_2.12-1.0.0/bin/windows directory and run the .bat files with the same name and in the same order.
All the code from the book can be found in the directory corresponding to the chapter where the book introduced or demonstrated the concept/code. Code that is not in a directory named "chapter_N" is either common code used across all chapters, or utility code.
All of the example programs can be run from within an IDE or from the command line. There are gradle tasks for each of the examples we have so far. The provided Kafka will need to be running before you can start any of the examples. Also there is a script in the bin directory (create-topics.sh) that creates all topics required (I think I've added all topics, but may have missed one or two). If you don't run the script that's fine, Kafka auto-creates topics by default. For the purposes of our examples that is fine.
All examples should print to the console by default. Some may write out to topics and print to standard-out but if you don't see anything in the console you should check the source code to make sure I did'nt miss adding a print statement.
To run any of the example programs, I recommend running them through the set gradle tasks. Remember if you are windows use gradlew.bat instead ./gradlew to run the program. All the example programs are located in the build.gradle file. For your convenience here are the commands to run sample programs for chapters 1-7:
- ./gradlew runYellingApp (Kafka Streams version of Hello World)
- ./gradlew runZmartFirstAppChapter_3
- ./gradlew runZmartAdvancedChapter_3
- ./gradlew runAddStateAppChapter_4
- ./gradlew runJoinsExampleAppChapter_4
- ./gradlew runAggregationsChapter_5
- ./gradlew runCountingWindowingChapter_5
- ./gradlew runGlobalKtableChapter_5
- ./gradlew runKStreamKTableChapter_5
- ./gradlew runPopsHopsChapter_6
- ./gradlew runStockPerformance_Chapter_6
- ./gradlew runStockPerformanceStreamsProcess_Chapter_6
- ./gradlew runCoGrouping_Chapter_6
- ./gradlew runCoGroupinStateRetoreListener_Chapter_7
- ./gradlew runStockPerformanceConsumerInterceptor_Chapter_7
- ./gradlew runZmartJmxAndProducerInterecptor_Chapter_7
When running the examples, the program will generate data to flow through Kafka and into the sample streams program. The data generation occurs in the background. The Kafka Streams programs will run for approximately one minute each. The sample programs write results to the console as well as topics. While you are free to use the ConsoleConsumer or your own Consumer, it's much easier to view the results flowing to the console.