Data pipelines continue to do the heavy-lifting in data integration. In order to immediately act on insights, companies turn to Apache Kafka® to replace legacy, batch-based pipelines with streaming data pipelines. There are still major challenges with building reusable data pipelines on open-source Kafka, like an overreliance on specialized Kafka engineering talent and cumbersome development cycles. These bottlenecks limit the speed at which teams can set data in motion.
Confluent’s Stream Designer is a new visual canvas for rapidly building, testing, and deploying streaming data pipelines powered by Kafka. You can quickly and easily build pipelines graphically or with SQL, leveraging built-in integrations with fully managed connectors, ksqlDB for stream processing, and Kafka topics. This demo walks you through building streaming data pipelines in minute!
Watch the demo during Current 2022 keynote: Reimagining Data Pipelines for the Streaming Era.
This demo uses two data sources (Microsoft SQL Server database and clickstream data through a Python publisher) and streams enriched data to two destinations (MongoDB Atlas database and a different organization through Stream Sharing).
In order to successfully complete this demo you need to install few tools before getting started.
-
This demo uses Stream Designer in Confluent Cloud. If you don't have a Confluent Cloud account, sign up for a free trial here.
-
Install Confluent Cloud CLI by following the instructions here.
-
An AWS account with permissions to create resources. Sign up for an account here.
-
(Optional) Install a database tool. This demo uses DBeaver Community.
-
This demo uses Python 3.9.13 version.
-
This demo uses pyodbc module. You can install this module through
pip
.pip3 install pyodbc
-
Download and Install Terraform here
Note: This demo was built and validate on a Mac (x86).
-
Sign up for a Confluent Cloud account here.
-
After verifying your email address, access Confluent Cloud sign-in by navigating here.
-
When provided with the username and password prompts, fill in your credentials.
Note: If you're logging in for the first time you will see a wizard that will walk you through the some tutorials. Minimize this as you will walk through these steps in this guide.
-
Create Confluent Cloud API keys by following this guide.
Note: This is different than Kafka cluster API keys.
-
This demo uses a Microsoft SQL Server Standard Edition hosted on AWS. Change Data Capture (CDC) is only supported on Enterprise, Developer, Enterprise Evaluation, and Standard editions.
-
This demo uses Amazon RDS Microsoft SQL Server that is publicly accessible.
-
Download and install Microsoft ODBC driver for your operating system from here.
-
Sign up for a free MongoDB Atlas account here.
-
Create an API key pair so Terraform can create resources in the Atlas cluster. Follow the instructions here.
-
Clone and enter this repository.
git clone https://github.com/confluentinc/demo-stream-designer.git cd demo-stream-designer
-
Create an
.accounts
file by running the following command.echo "CONFLUENT_CLOUD_EMAIL=add_your_email\nCONFLUENT_CLOUD_PASSWORD=add_your_password\nexport TF_VAR_confluent_cloud_api_key=\"add_your_api_key\"\nexport TF_VAR_confluent_cloud_api_secret=\"add_your_api_secret\"\nexport TF_VAR_mongodbatlas_public_key=\"add_your_public_key\"\nexport TF_VAR_mongodbatlas_private_key=\"add_your_private_key\"\nexport TF_VAR_mongodbatlas_org_id=\"add_your_org_id\"" > .accounts
Note: This repo ignores
.accounts
file -
Update the
.accounts
file for the following variables with your credentials.CONFLUENT_CLOUD_EMAIL=<replace> CONFLUENT_CLOUD_PASSWORD=<replace> export TF_VAR_confluent_cloud_api_key="<replace>" export TF_VAR_confluent_cloud_api_secret="<replace>" export TF_VAR_mongodbatlas_public_key="<replace>" export TF_VAR_mongodbatlas_private_key="<replace>" export TF_VAR_mongodbatlas_org_id="<replace>"
-
Navigate to the home directory of the project and run
create_env.sh
script. This bash script copies the content of.accounts
file into a new file called.env
and append additional variables to it.cd demo-stream-designer ./create_env.sh
-
Source
.env
file.source .env
Note: if you don't source
.env
file you'll be prompted to manually provide the values through command line when running Terraform commands.
-
Navigate to the repo's terraform directory.
cd terraform
-
Log into your AWS account through command line.
-
Initialize Terraform within the directory.
terraform init
-
Create the Terraform plan.
terraform plan
-
Apply the plan to create the infrastructure.
terraform apply
Note: Read the
main.tf
configuration file to see what will be created. -
Write the output of
terraform
to a JSON file. Thesetup.sh
script will parse the JSON file to update the.env
file.terraform output -json > ../resources.json
-
Run the
setup.sh
script.cd demo-stream-designer ./setup.sh
The
env.sh
creates the following resources- API key pair for the Python client
- API key pair for Schema Registry
- Tags and business metadata
Additionally, it updates the
.env
file to include correct values for the following variables
- CCLOUD_API_KEY
- CCLOUD_API_SECRET
- CCLOUD_BOOTSTRAP_ENDPOINT
- CCLOUD_SCHEMA_REGISTRY_API_KEY
- CCLOUD_SCHEMA_REGISTRY_API_SECRET
- CCLOUD_SCHEMA_REGISTRY_URL
-
This script achieves the following:
- Creates an API key pair that will be used in connectors' configuration files for authentication purposes.
- Creates an API key pair for Schema Registry
- Creates Tags and business metadata
- Updates the
.env
file to replace the remaining variables with the newly generated values.
-
Source
.env
file.source .env
- Run the script to enable change data capture (CDC) on all tables of the database
cd demo-stream-designer/sql_scripts python3 prepare_sqlserver.py
- Log into Confluent Cloud and navigate to ksqlDB tab and step into your cluster.
- Change
auto.offset.reset = Earliest
. - Create a ksqlDB stream off of
click_stream
topic.CREATE STREAM click_stream (user_id VARCHAR, product_id VARCHAR, view_time INTEGER, page_url VARCHAR, ip_address VARCHAR) WITH (KAFKA_TOPIC='click_stream', KEY_FORMAT ='JSON', VALUE_FORMAT='JSON');
- Verify the
click_stream
stream is populated correctly.SELECT * FROM click_stream EMIT CHANGES;
- We need to reserialize
click_stream
stream so Schema Registry can track all the changes to the schema.CREATE STREAM clickstreams_global WITH (KAFKA_TOPIC='clickstreams_global', PARTITIONS=1, REPLICAS=3, KEY_FORMAT ='JSON', VALUE_FORMAT='JSON_SR') AS SELECT * FROM CLICK_STREAM EMIT CHANGES;
- Verify the
clickstreams_global
stream is populated correctly.SELECT * FROM clickstreams_global EMIT CHANGES;
- Use Stream Catalog and search for
clickstreams_global
and click on the topic. - On the right side of the screen add
prod
in Tags section. - Click on +Add business metadata and from the drop down list select Domain and add the following information
- Team_owner: Web
- Slack_contact: #web-optimization
- Name: user clickstreams
-
Open a Terminal window and run the script to create new clickstreams data.
cd demo-stream-designer/clickstreams_scripts python3 produce_clickstream.py
-
Open a second Terminal window and run the script to create new purchase orders.
cd demo-stream-designer/sql_scripts python3 produce_orders.py
-
Log into Confluent Cloud and navigate to Stream Designer and step into the pipeline you created earlier.
-
Click on Start with SQL to open the code editor and paste the following code.
-
The code adds the below components to the canvas
- SQL Server source connector which captures all data changes in our source database and streams it to Confluent Cloud in near real time(doc).
sql.dbo.orders
andsql.dbo.products
as connector's output topics.orders_stream
andproducts_stream
that are ksqlDB streams based on output topics.products_table
which is a ksqlDB table that has the latest information for each product.- We need to re-partition the
orders_stream
and useproduct_id
as the key so we can join the stream withproducts_table
. This stream is calledorders_stream_productid_rekeyed
. - Join
products_table
andorders_stream_productid_rekeyed
and call the resulting streamorders_and_products
.
CREATE SOURCE CONNECTOR "SqlServerCdcSourceConnector_0" WITH ( "after.state.only"='true', "connector.class"='SqlServerCdcSource', "database.dbname"='public', "database.hostname"='<SQL_SERVER_HOSTNAME>', "database.password"='<SQL_SERVER_PASSWORD>', "database.port"='1433', "database.server.name"='sql', "database.user"='admin', "kafka.api.key"='<KAFKA_API_KEY>', "kafka.api.secret"='<KAFKA_API_SECRET>', "kafka.auth.mode"='KAFKA_API_KEY', "max.batch.size"='1', "output.data.format"='JSON_SR', "output.key.format"='JSON', "poll.interval.ms"='1', "snapshot.mode"='initial', "table.include.list"='dbo.products, dbo.orders', "tasks.max"='1' ); CREATE OR REPLACE STREAM "orders_stream" (CUSTOMER_ID STRING, ORDER_ID STRING KEY, PRODUCT_ID STRING, PURCHASE_TIMESTAMP STRING) WITH (kafka_topic='sql.dbo.orders', partitions=1, key_format='JSON', value_format='JSON_SR'); CREATE OR REPLACE STREAM "products_stream" (PRODUCT_ID STRING KEY, PRODUCT_NAME STRING, PRODUCT_RATING DOUBLE, SALE_PRICE INTEGER) WITH (kafka_topic='sql.dbo.products', partitions=1, key_format='JSON', value_format='JSON_SR'); CREATE OR REPLACE TABLE "products_table" WITH (kafka_topic='products_table', partitions=1, key_format='JSON', value_format='JSON_SR') AS SELECT EXTRACTJSONFIELD(PRODUCT_ID, '$.product_id') AS PRODUCT_ID, LATEST_BY_OFFSET(PRODUCT_NAME) AS PRODUCT_NAME, LATEST_BY_OFFSET(PRODUCT_RATING) AS PRODUCT_RATING, LATEST_BY_OFFSET(SALE_PRICE) AS SALE_PRICE FROM "products_stream" GROUP BY EXTRACTJSONFIELD(PRODUCT_ID, '$.product_id'); CREATE OR REPLACE STREAM "orders_stream_productid_rekeyed" WITH (kafka_topic='orders_stream_productid_rekeyed', partitions=1, key_format='JSON', value_format='JSON_SR') AS SELECT CUSTOMER_ID, EXTRACTJSONFIELD(ORDER_ID, '$.order_id') AS ORDER_ID, PRODUCT_ID, PURCHASE_TIMESTAMP FROM "orders_stream" PARTITION BY PRODUCT_ID; CREATE OR REPLACE STREAM "orders_and_products" WITH (kafka_topic='orders_and_products', partitions=1, key_format='JSON', value_format='JSON_SR') AS SELECT * FROM "orders_stream_productid_rekeyed" O INNER JOIN "products_table" P ON O.PRODUCT_ID = P.PRODUCT_ID;
-
Update the following variables to match your environment:
database.hostname database.password kafka.api.key kafka.api.secret
-
Click on Activate pipeline and wait until all components are activated and the source connector is in Running state.
Note: you might have to Activate or Re-activate the pipeline if your topics and operations were activated before your source connector was in the running state.
-
Click on each topic to verify they are populated correctly.
-
We want to see how Big Bend Shoes are selling in our store. In order to do that, we need to apply a filter to
orders_and_products
stream. -
Click on the right edge of
orders_and_products
stream and hit on Filter from list of options. -
Create a new filter with the following properties and hit Save
query name: shoes filter name: shoes filter expression: LCASE(P_PRODUCT_NAME) LIKE '%big bend shoes%'
-
Click on the right edge of Filter component and create a new Kafka topic and ksqlDB stream with the following properties and hit Save
topic name: big_bend_shoes stream name: big_bend_shoes
-
Re-activate the pipeline.
-
big_bend_shoes
is now data as a product. We can share this stream with our partner retailers so they can see how their products are performing in our stores. We will use Stream Sharing to do that.
Stream Sharing
-
Click on
big_bend_shoes
topic and then click on +Share icon on the top right corner of side panel. -
Enter a valid recipient email address and double check to see
big_bend_shoes-value
is the selected schema subject. -
Hit the Invite button.
-
The recipient can access the shared stream through the Access button in their email.
-
Next, they pick the client and download a configuration file.
-
Now they can use the configuration file and their consumer application to consume from the shared topic.
-
In this demo we use
kafka-console-consumer
from Confluent Platform. An example of configuration file is provided in file. -
Open a new terminal and run the following command to start consuming
kafka-console-consumer --consumer.config stream_sharing/client.properties --topic big_bend_shoes --from-beginning --bootstrap-server <REPLACE-WITH-YOUR-BOOSTRAP-SERVER>
-
Next, we want to send promotional materials to our online users based on their order and browsing history on our website. To do so, we need do data enrichment.
-
We will use Stream Catalog to find the right clickstreams data.
-
Click on Stream Catalog search bar and search for
clickstreams_global
and click on the topic. -
Click on Schema tab and expand the properties and apply PII tag to
IP_ADDRESS
. -
Verify the tags and business metadata listed on the right hand-side are correct.
-
Go back to Stream Designer and step into your pipeline to continue adding more components.
-
Add a new Topic to the canvas and click on
configure
link in the topic box and click on Configuration tab and click on Choose an existing topic instead. -
Select
clickstreams_global
from the list of topics and hit Save. -
Click on
configure
link in the stream name and add the following properties and hit SaveName: clickstreams_global Key format: JSON Value format: JSON_SR Columns for the stream: IP_ADDRESS STRING, PAGE_URL STRING, PRODUCT_ID STRING , USER_ID STRING , VIEW_TIME INTEGER
-
Re-activate the pipeline.
-
Now we can do our data enrichment by doing a Stream-Stream join on
orders_stream
andclickstreams_global
. -
Initiate a join by clicking on the right edge of
orders_stream
and hit on Join from list of options. -
Add the second stream by innitiating a connection from the right edge of
clickstreams_global
stream. -
Create a new join with the following properties and hit Save
query name: orders_clickstreams join name: orders_clickstreams left input source: orders_stream alias of the left: o input source: clickstreams_global alias of the input source: c join type: INNER join on clause: o.customer_id = c.user_id window duration: 1 duration unit: HOUR grace period duration: 1 grace period unit: MINUTE
-
Click on the right edge of the
Join
component and select Stream from the list and create a new Kafka topic and ksqlDB stream with the following properties and hit Savetopic name: orders_enriched stream name: orders_enriched
-
Re-activate the pipeline.
-
The marketing team decided to use MongoDB Atlas as their cloud-native database and we can easily send
orders_enriched
stream to that database by leveraging our full-managed connector. -
Click on the right edge of
orders_enriched
Kafka topic and hit on Sink Connector. -
Look for and provision a MongoDB Atlas Sink Connector.
-
Re-activate the pipeline and once all components are activated verify the data is showing up in MongoDB database correctly.
For more information and detailed instructions refer to our doc page.
Congratulations on building your streaming data pipeline with Stream Designer. Your complete pipeline should resemble the following one.
- You can build the entire demo by pasting the complete ksqlDB code directly into the code editor. To do so, run
cd demo-stream-designer
./generate_pipeline.sh
-
The
generate_pipeline.sh
first creates apipeline_template.sql
file which doesn't include your credentials (such as Kafka API keys, database endpoints, etc). Then, it creates thepipeline.sql
and update the credentials using the.env
file.Note: This repo ignores
pipeline.sql
. -
Copy the content of
pipeline.sql
and paste into Code Import and hit Apply Changes and then Activate.
Ensure all the resources that were created for the demo are deleted so you don't incur additional charges.
-
If you shared a stream through Stream Sharing, navigate to Confluent Cloud web UI. Click on Stream shares on the left hand-side and follow the instructions to stop sharing streams.
-
The following script will de-activate and delete the pipeline. Note that doing so, will delete your topics and you can't restore them afterwards.
cd demo-stream-designer ./teardown_pipeline.sh
-
You can delete the rest of the resources that were created during this demo by executing the following command.
Terraform destory