Skip to content

Latest commit

 

History

History
 
 

event-driven-microservices-workshop

Workshop — Developing Event-driven Microservices with Spring Boot, Confluent Cloud, and Java.

Developing Event-driven Microservices with Spring Boot, Confluent Cloud, and Java.

Workshop prerequisites and setup

Prerequisites

Ensure you install the following toolset on your computer:

Setup

Before you proceed, be sure to complete the following steps:

Getting code
git clone https://github.com/confluentinc/demo-scene    #(1)
cd event-driven-microservices-workshop                                      #(2)
  1. Clone the repository

  2. Change directory of the workshop folder

Getting only what you need

If you will follow steps below you should checkout only directory that has source code relevant to this post.

mkdir ~/temp/demo-scene
cd ~/temp/demo-scene
git init .
git remote add origin -f https://github.com/confluentinc/demo-scene/
git config core.sparsecheckout true
echo "event-driven-microservices-workshop/*" >> .git/info/sparse-checkout
git pull --depth=2 origin master
cd event-driven-microservices-workshop
ls -lh

0️⃣ Provisioning Confluent Cloud cluster

$ cd scripts/ccloud
$ ccloud login --save       #(1)
$ ./ccloud_stack_create.sh  #(2)
  1. Login to your Confluent Cloud account.

  2. The CCloud Stack script will ask you to login to your CCloud account.

It will automatically provision Kafka and ksqlDB cluster.

Among other things, this script generates a config that we need to pass to the docker-compose start command to connector container connect to the cloud Kafka cluster.

When ready, move to the next section, where you will generate some reference data.

1️⃣ Loading referential data with Kafka Connect

To leverage the full power of stream processing, it is best to preload the required data in topics. Kafka Streams and ksqlDB will allow you to join and lookup data from your events with any other topic.

This section of the workshop will set up a Kafka Connect JDBC Source connector instance that will synchronize any data from a PostgreSQL instance to an account topic in Kafka.

This exercise simulates a Change Data Capture pattern where we bridge an existing data source to Kafka real-time.

JDBC Source Connector

Start JDBC connector in Docker

./start_connect.sh stack-configs/java-service-account-103523.config #(1)
  1. Replace with actual service account ID you did get during «Provisioning Confluent Cloud cluster» step.

Start the Data Generator application

Within the workshop project, you will find a data-generator folder containing an application designed to generate some random accounts in our PostgreSQL Account DB. This utility application will generate about 1000 test accounts. The Data Generator also contains a REST endpoint to help us submit transaction requests to Kafka later during the workshop.

Data Generator
Note
Open a new terminal window in the workshop project folder.
The data generator can be launched by running the following commands:
$ source ./scripts/ccloud/delta_configs/env.delta
$ ./gradlew :data-generator:build                        #(1)
$ java -jar data-generator/build/libs/data-generator-0.0.1-SNAPSHOT.jar      #(2)
  1. To build.

  2. To run after build.

Note
To run the Data Generator application in your IDE launch the main method from src/main/java/io/confluent/developer/ccloud/demo/kstream/DataGeneratorApplication.java. Make sure you have environment variables set according to the delta_configs/env.delta file.

After the dataset generated, you should see the following output:

2020-08-26 22:58:44.507  INFO 15959 --- [unt-Generator-1] Account Service                          : Generated account number 1000.

Start the connector

Open a new terminal window and run the following command from the root of the workshop project folder:

./scripts/connect/deploy-jdbc-connector.sh   #(1)
  1. This command will start a connector instance.

Note

To validate the status of the connector, you can run

./scripts/connect/connector-status.sh

Monitor the account data flowing in Kafka from Confluent Cloud user interface

  1. Access Confluent Cloud user interface from https://confluent.cloud.

  2. From the main screen, navigate to an environment that looks like`demo-env-<some-number>.`

  3. Inside of this environment, you should see a cluster that looks like`demo-kafka-cluster-<some-number>.` On the left side, click on 'Topics.`

  4. Click on the account topic and access the messages tab.

  5. Click on the offset textbox and type 0 and press Enter the user interface to load all messages from partition 0 starting from 0.

With the connector running, you should see account events in the user interface.

c3-messages
Figure 1. Messages explorer in Confluent Cloud user interface

In the next section, we will implement a highly scalable stream processing application using Kafka Streams.

2️⃣ Implementing a Stream Processor with Kafka Streams

Now is the time to get into the heart of the action. We will implement a Kafka Streams topology to process atomic transactions to any request submitted to the transaction-request topic.

Within the workshop project folder, you will find a kstreams-demo subfolder representing a Kafka Streams application. Spring Boot and the spring-kafka project handled the boilerplate code required to connect to Kafka. This workshop will focus on writing a Kafka Streams topology with the function processing for our use case.

Atomic transaction processing with Kafka Streams

Our business requirement states that we must check whether the funds are sufficient for every request received before updating the balance of the account being processed. We should never have two transactions being processed at the same time for the same account. This would create a race condition for which we have no guarantee we can enforce the balance check before withdrawing funds.

The Data Generator writes transaction requests to the Kafka topic with a key equal to the transaction’s account number. Therefore, we can be sure all messages of an account will be processed by a single thread for our Transaction Service no matter how many instances are concurrently running.

Kafka Streams won’t commit any message offset until it completes our business logic of managing a transaction request.

Transaction Service

Implement the Transaction Transformer

Because of our stream processor’s transaction nature, we require a specific component from Kafka Streams named a Transformer. This utility allows us to process events one by one while interacting with a State Store–another component of Kafka Streams that help us to persist our account balance in a local instance of an embedded database - RocksDB.

Open the io.confluent.developer.ccloud.demo.kstream.TransactionTransformer Java class and implement the transform function to return a TransactionResult based on the validity of the transaction request. The TransactionResult contains a success flag set to true if the funds were successfully updated.

The transform method also updates the store State Store. The class already has utility functions to help you execute our business logic.

TransactionTransformer.transform()
  @Override
  public TransactionResult transform(Transaction transaction) {

    if (transaction.getType().equals(Transaction.Type.DEPOSIT)) {
      return new TransactionResult(transaction,
                                   depositFunds(transaction),
                                   true,
                                   null);
    }

    if (hasEnoughFunds(transaction)) {
      return new TransactionResult(transaction, withdrawFunds(transaction), true, null);
    }

    log.info("Not enough funds for account {}.", transaction.getAccount());

    return new TransactionResult(transaction,
                                 getFunds(transaction.getAccount()),
                                 false,
                                 TransactionResult.ErrorType.INSUFFICIENT_FUNDS);
  }

Implement the Streaming Topology

In Kafka Streams, a Topology is the definition of your data flow. It’s a manifest for all operations and transformations to be applied to your data.

To start a stream processor, Kafka Streams only requires you to build a`Topology` and hand it over. Kafka Streams will take care of managing the underlying consumers and producers.

The io.confluent.developer.ccloud.demo.kstream.KStreamConfig Java class already contains all the boilerplate code required by Kafka Streams to start our processor. In this exercise, we will leverage a StreamsBuilder to define and instantiate a Topology that will handle our transaction processing.

Open the io.confluent.developer.ccloud.demo.kstream.KStreamConfig.defineStreams method and get ready to write your first Kafka Streams Topology.

Create a KStream from the source topic.

Use the stream method of streamsBuilder to turn a topic into a KStream.

KStream<String, Transaction> transactionStream = streamsBuilder.stream("transaction-request");

Leverage the Transformer to process our requests

To inform Kafka Streams that we want to update the funds State Store for all incoming requests atomically, we can leverage the transformValues operator to plugin our TransactionTransformer. This operator requires us to specify the funds State Store that the Transformer will use. This also instructs Kafka Streams to keep track of events from our transaction-request since they will result in a change of state for our store.

KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);

Redirect the transaction result to the appropriate topic.

With a new derived stream containing TransactionResult, we can now use the information contained in the payload to feed a success or failure topic.

We will achieve this by deriving two streams from our resultStream. Each stream will be built by applying a filter and filterNot operator with a predicate on the success flag from our TransactionResult payload. With the two derived streams, we can explicitly call the to operator to instruct Kafka Streams to write the mutated events to their respective topics.

resultStream
  .filter(this::success)
  .to("transaction-successs");

resultStream
  .filterNot(this::success)
  .to("transaction-failed");

The implemented defineStreams method

Use this reference implementation to validate you have the right stream definition.

protected void defineStreams(StreamsBuilder streamsBuilder) {

    KStream<String, Transaction> transactionStream = streamsBuilder.stream(transactionRequestConfiguration.getName());

    final String storeName = fundsStoreConfig.getName();
    KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);

    resultStream
        .filter(this::success)
        .to(transactionSuccessConfiguration.getName());

    resultStream
        .filterNot(this::success)
        .to(transactionFailedConfiguration.getName());
  }

Running the Kafka Streams application

Note
If you are running the application from your IDE, launch the main method from io.confluent.developer.ccloud.demo.kstream.KStreamDemoApplication.

If you want to run with the CLI, you must build the application before launching it.

To build the application, run the following command:
./gradlew :kstreams-demo:build
To run the application run the following command
java -jar kstreams-demo/build/libs/kstreams-demo-0.0.1-SNAPSHOT.jar

Generate some transactions using the Data Generator endpoint

Ensure your Data Generator application is still running from the previous section.

The utility script scripts/generate-transaction.sh will let you generate transactions. Generate a few transactions using the following commands:

scripts/generate-transaction.sh 1 DEPOSIT 100 CAD
scripts/generate-transaction.sh 1 DEPOSIT 200 CAD
scripts/generate-transaction.sh 1 DEPOSIT 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 10000 CAD

scripts/generate-transaction.sh 2 DEPOSIT 100 CAD
scripts/generate-transaction.sh 2 DEPOSIT 50 CAD
scripts/generate-transaction.sh 2 DEPOSIT 300 CAD
scripts/generate-transaction.sh 2 WITHDRAW 300 CAD

The script will pass in the following arguments:

  • The account number.

  • The amount.

  • The type of operation (DEPOSIT or WITHDRAW).

  • The currency.

Monitor the successful transaction results

  1. Access Confluent Cloud user interface from https://confluent.cloud.

  2. From the main screen, navigate to the environment that looks like demo-env-<some-number>.

  3. Inside of the environment, you should see a cluster that looks like demo-kafka-cluster-<some-number>. On the left side, click on Topics.

  4. Click on the transaction-success topic and access the messages tab.

  5. Click on the offset textbox and type 0 and press enter to load all messages from partition 0 starting from offset 0.

You should see transaction-success events in the user interface. If you don’t see any messages, try your luck with partition 1 starting from offset 0.

Monitor the failed transaction results from Control Center

  1. Click on the topic tab from the cluster navigation menu.

  2. Select the transaction-failed topic and access the messages tab.

  3. Click on the offset textbox and type 0 and press enter to load all messages from partition 0 starting from offset 0.

You should see transaction-failed events in the user interface. If you don’t see any messages, try your lock with partition 1 starting from offset 0.

In the next section, we will explore how writing Stream Processor can be simplified with ksqlDB.

3️⃣ Enrich transaction results with ksqlDB

In the first section of this workshop, we configured a JDBC Source Connector to load all account details into an account topic. In the next exercise, we will write a second Stream Processor to generate a detailed transaction statement enriched with account details.

Rather than within this new service as another Kafka Streams application, we will leverage ksqlDB to declare a stream processor that will enrich our transaction data in real-time with our referential data coming from the account topic. The objective of this section is to show how you can use an SQL-like query language to generate streams processors like Kafka Streams, without having to compile and run any custom software.

Transaction Statements
Tip
Connect to ksqlDB with CLI

In this exercise, we’re going to use ksqlDB Cloud UI. But you also can run CLI using docker.

docker run -it confluentinc/ksqldb-cli:0.11.0 ksql -u $KSQL_API_KEY -p $KSQL_API_SECRET $KSQLDB_ENDPOINT

Create the account table

ksqlDB is built on top of Kafka Streams. As such, the KStream and KTable are both key constructs for defining stream processors.

The first step requires us to instruct ksqlDB that we wish to turn the account topic into a Table. This table will allow us to join each transaction-success event with the latest account event of the underlying topic. Run the following command in your ksqlDB CLI terminal:

CREATE TABLE ACCOUNT (
  numkey string PRIMARY KEY,
  number INT,
  cityAddress STRING,
  countryAddress STRING,
  creationDate BIGINT,
  firstName STRING,
  lastName STRING,
  numberAddress STRING,
  streetAddress STRING,
  updateDate BIGINT
) WITH (
  KAFKA_TOPIC = 'account',
  VALUE_FORMAT='JSON'
);
Note

If you are getting error about accessing the accounts topic you need grant access to Kafka topic accounts to ksqlDB server with command

CCLOUD_KSQL_ID=`ccloud ksql app list -ojson | jq -r '.[0].id'`
CCLOUD_KAFKA_ID=`ccloud kafka cluster list -ojson | jq -r '.[0].id'`
ccloud ksql app configure-acls ${CCLOUD_KSQL_ID} "*" --cluster ${CCLOUD_KAFKA_ID}

# or
ccloud ksql app configure-acls `ccloud ksql app list -ojson | jq -r '.[0].id'` "*" --cluster `ccloud kafka cluster list -ojson | jq -r '.[0].id'`

where ccloud ksql app list -ojson | jq -r '.[0].id' gives your id of ksqlDB cluster and ccloud kafka cluster list -ojson | jq -r '.[0].id' gives you id of Kafka cluster

Create the transaction-success stream

Before we create the Transaction Statement stream processor, we must also inform ksqlDB that we wish to turn the transaction-success into a Stream. Run the following command in your ksqlDB CLI terminal:

CREATE STREAM TRANSACTION_SUCCESS (
  numkey string KEY,
  transaction STRUCT<guid STRING, account STRING, amount DOUBLE, type STRING, currency STRING, country STRING>,
  funds STRUCT<account STRING, balance DOUBLE>,
  success boolean,
  errorType STRING
) WITH (
  kafka_topic='transaction-success',
  value_format='json'
);

Create the transaction statement stream

Now that we have all the ingredients of our Transaction Statement stream processor, we can now create a new stream derived from our transaction-success events paired with the latest data from the account topic. We will instruct ksqlDB to create a new stream as a query. By default, ksqlDB will publish any output to a new TRANSACTION_STATEMENT topic. The select query provides the details about with events to subscribe and which table to join each notification. The output of this new stream processor will be a mix of the transaction details coupled with all the matching account details. The key from transaction-success and account will be used as matching criteria for the LEFT JOIN command. EMIT CHANGES informs ksqlDB that the query is long-running and should be kept alive–as if it were a Kafka Streams application to be 100% available to process all events. Run the following command in your ksqlDB CLI prompt:

CREATE STREAM TRANSACTION_STATEMENT AS
  SELECT *
  FROM TRANSACTION_SUCCESS
  LEFT JOIN ACCOUNT ON TRANSACTION_SUCCESS.numkey = ACCOUNT.numkey
  EMIT CHANGES;

Monitor the Transaction Statements in Cloud user interface

  1. Access Confluent Cloud user interface from https://confluent.cloud

  2. From the main screen, navigate to environment that looks like demo-env-<some-number>.

  3. Inside of this environment, you should see a cluster that looks like demo-kafka-cluster-<some-number>.

  4. On the left side, click on Topics.

  5. Click on the TRANSACTION_STATEMENT topic and access the messages tab.

  6. Click on the offset textbox and type 0 and press enter to load all messages from partition 0 starting from offset 0.

c3-transaction-statements

✅ It’s a wrap!

Congratulations! Now you know how to build event-driven microservices using Spring Boot, Kafka Streams, and ksqlDB.

Now next section is very important!

Important
Don’t forget to clean up
$ cd scripts/ccloud
$ docker-compose down -v    #(1)
$ ./ccloud_stack_destroy.sh  stack-configs/java-service-account-103523.config #(2)
  1. Stop a connector and database

  2. Destroy ccloud stack to avoid unexpected charges.

Special Thanks!

This workshop is based on the work of Daniel Lavoie. Much ♥️!