This ia Proof of Concept showing different use cases with Apache Kafka and Kafka Streams, with a real world examples.
The point is to show how to use Kafka Streams to transform data in real time.
GitHub: https://github.com/davamigo/kafka-streams-poc
We have three entities: commercial-orders
, products
and members
.
-
Each
commercial-order
has themember-uuid
, thesell-date
and one or morecommercial-order-lines
. -
Each
commercial-order-line
contains theproduct-uuid
, thesell-price
and thequantity
.
We want to get create purchase-orders
per country and day.
-
Each
purchase-order
will have thecountry
, theday
and a list orpurchase-order-lines
. -
Each
purchase-order-line
will have theproduct-uuid
, theproduct-name
, theproduct-price
and the sum of thequantities
.
Also we want to get create warehouse-orders
per country and day.
-
Each
warehouse-order
will have thecountry
, theday
and a list orwarehouse-order-lines
. -
Each
warehouse-order-line
will have theproduct-legacy-id
, theproduct-barcode
and the sum of thequantities
. -
The
product-legacy-id
is not in theproduct
entity, but can be recovered from a REST API.
This PoC consist in a Producer to generate random data and send it to Kafka topics; and some Kafka Stream processes to convert the generated data into something else.
The whole project is designed to have a separate microservice for each process, but it is programmed as a monolith because this is just a PoC.
Produces a commercial order with random data.
- Random member: creates a new member or uses an existing one.
- Random order lines number (1 to 10).
- Random products: creates a new product or uses an existing one.
- Random price for the new products (1 to 100).
- Random quantity for the order line (1 to 5).
- Publish the new commercial order in a
t.commercial-orders.new
topic. - Publish new members in
t.members.new
topic. - Publish new products in
t.products.new
topic.
There are some consumers who write in a mongoDB database and a small front end to show the contents of the collections in mongoDB.
Joins each commercial order with the member data.
Also computes some fields like the total amount of the order.
The target stream won't have the billing address nor the order lines.
The key in the new stream will be the same (the uuid
of the commercial order).
- From
t.commercial-orders.new
(KStream). - Join with
t.members.new
(GlobalKTable). - To
t.commercial-orders.converted
(KStream).
Extracts all the order lines from the commercial orders and joins each commercial order line with the product data.
Each order line will generate one or more message in the target stream.
The key of the new stream will be the same (the uuid
of the commercial order) to allow grouping.
- From
t.commercial-orders.new
(KStream). - Join with
t.products.new
(GlobalKTable). - To
t.commercial-order-lines.split
(KStream).
Reduces the commercial order lines by adding the quantities for the same country, product and day to generate the purchase order lines.
One purchase order line will be generated per per country, product and day.
The key of the new stream will be the concatenation of contry-code
, date(yyyy-mm-dd)
and product-uuid
.
- From
t.commercial-order-lines.split
. - To
t.purchase-order-lines.aggregated
.
Generates one purchase order per country and day by aggregating the purchase order lines.
The purchase order will have a list of all order lines.
The key of the new stream will be the concatenation of contry-code
and date(yyyy-mm-dd)
.
- From
t.purchase-order-lines.aggregated
. - To
t.purchase-orders.generated
.
Generates the warehouse order lines from the purchase order lines.
The key will be the uuid
of the new warehouse order line.
- From
t.purchase-order-lines.aggregated
. - To
t.warehouse-order-lines.generated
.
In this example we are assuming the WMS (Warehouse Management System) needs a legacy product id which is stored in another topic, but it's possible this legacy_id
is not there.
So we are doing a leftJoin
operation to not loose any product.
The output are two topics (matched or unmatched), depending on the legacy product id was found or not.
- From
t.warehouse-order-lines.generated
. - Left join with
t.product-legacy-ids.cache
. - To
t.warehouse-order-lines.matched
. - To
t.warehouse-order-lines.unmatched
.
Takes the unmatched warehouse order lines and tries to recover the product legacy id from an external API.
The output are two topics (recovered or failed), depending on the legacy product id was found or not.
- From
t.warehouse-orders-lines.unmatched
- To
t.warehouse-orders-lines.recovered
- To
t.warehouse-orders-lines.failed
Merges the matched warehouse order lines and the recovered warehouse order lines streams into one larger stream. The order is not guaranteed!
- From
t.warehouse-order-lines.matched
- Merge
t.warehouse-order-lines.recovered
- To
t.warehouse-order-lines.new
Generates the warehouse orders by aggregating the warehouse order lines.
- From
t.warehouse-order-lines.new
- To
t.warehouse-orders.generated
This Kafka Stream process fills the product legacy id topic with the relation between the product uuid and the product legacy id. This topic was used before in the Generate warehouse order lines Kafka Streams process.
- From
t.product-legacy-ids.cache
- To
t.warehouse-order-lines.recovered
t.members.new
: All data of the member. The key is the member uuid.t.products.new
: All data of the product. The key is the product uuid.t.commercial-orders.new
: All the commercial orders. No key.t.commercial-orders.converted
: Commercial orders with member data, but without order line. The key is the commercial order uuid.t.commercial-order-lines.split
: Commercial order lines. The key is the commercial order uuid.t.purchase-order-lines.aggregated
: Purchase order lines created from the aggregation of commercial order lines per country day and product. The key is the concatenation ofcontry-code
,date(yyyy-mm-dd)
andproduct-uuid
.t.purchase-orders.generated
: Purchase orders with all the purchase order lines per country and day. The key is the concatenation ofcontry-code
anddate(yyyy-mm-dd)
.t.warehouse-order-lines.generated
: Warehouse order lines generated from the purchase order line. The key is uuid of the warehouse order line.t.warehouse-order-lines.matched
: Warehouse order lines matched with theproduct-legacy-id
. The key is uuid of the warehouse order line.t.warehouse-order-lines.unmatched
: Warehouse order lines not matched with theproduct-legacy-id
. The key is uuid of the warehouse order line.t.warehouse-order-lines.recovered
: Warehouse order lines not matched but recovered with theproduct-legacy-id
. The key is uuid of the warehouse order line.t.warehouse-order-lines.failed
: Warehouse order lines not matched neither recovered. The key is uuid of the warehouse order line.t.warehouse-order-lines.new
: Merge result from warehouse order lines matched and recovered. The key is uuid of the warehouse order line.t.product-legacy-ids.cache
: Cache topic for product legacy ids. The key is theproduct-uuid
and the value is theproduct-legacy-id
.
uuid
:string
firstName
:string
lastName
:string
addresses
:Array[MemberAddress]
country
:string
state
:string
, nullable, defaultnull
city
:string
zipCode
:string
street
:string
, nullable, defaultnull
number
:string
, nullable, defaultnull
extra
:string
, nullable, defaultnull
uuid
:string
name
:string
type
:string
barCode
:string
, nullable, defaultnull
price
:float
uuid
:string
datetime
:long
, logicalTypetimestamp-millis
memberUuid
:string
shippingAddress
:CommercialOrderAddress
billingAddress
:CommercialOrderAddress
, nullable, defaultnull
lines
:array[CommercialOrderLine]
uuid
:string
commercialOrderUuid
:string
productUuid
:string
price
:float
quantity
:int
, default1
country
:string
state
:string
, nullable, defaultnull
city
:string
zipCode
:string
street
:string
, nullable, defaultnull
number
:string
, nullable, defaultnull
extra
:string
, nullable, defaultnull
uuid
:string
datetime
:long
, logicalTypetimestamp-millis
memberUuid
:string
memberFirstName
:string
memberLastName
:string
shippingCountry
:string
shippingCity
:string
shippingZipCode
:string
totalAmount
:float
totalQuantity
:int
uuid
:string
commercialOrderUuid
:string
commercialOrderDatetime
:long
, logicalTypetimestamp-millis
shippingCountry
:string
memberUuid
:string
productUuid
:string
productName
:string
productType
:string
productBarCode
:string
, nullable, defaultnull
productPrice
:float
orderLinePrice
:float
quantity
:int
, default1
uuid
:string
aggregationKey
:string
country
:string
date
:long
, logicalTypetimestamp-millis
lines
:array[PurchaseOrderLineCondensed]
totalAmount
:float
totalQuantity
:int
uuid
:string
aggregationKey
:string
productUuid
:string
price
:float
quantity
:int
, default1
uuid
:string
aggregationKey
:string
country
:string
date
:long
, logicalTypetimestamp-millis
productUuid
:string
productName
:string
productType
:string
productBarCode
:string
, nullable, defaultnull
productPrice
:float
quantity
:int
, default1
uuid
:string
aggregationKey
:string
country
:string
date
:long
, logicalTypetimestamp-millis
lines
:array[WarehouseOrderLineCondensed]
uuid
:string
productUuid
:string
productLegacyId
:int
productName
:string
productBarCode
:string
, nullable, defaultnull
quantity
:int
, default1
uuid
:string
country
:string
date
:long
, logicalTypetimestamp-millis
productUuid
:string
productLegacyId
:int
. nullable, defaultnull
productName
:string
productBarCode
:string
, nullable, defaultnull
quantity
:int
, default1
Yo can run the project from the command line in a Linux or MacOS computer.
The pre-requisites are:
- Have docker installed.
- Have docker-compose installed.
- Have Java >= 8 installed.
> Go to project root folder
$ cd kafka-streams-poc
> Start the Kafka and MongoDB containers
$ docker/docker-compose.sh up -d
> Create all the topics with the right configuration
$ docker/kafka-topics-create-all.sh
> Build the project
$ ./gradlew build
> Start the project
$ ./gradlew bootRun
Then open http://localhost:8080/demo/long in the browser.