Skip to content

Latest commit

 

History

History
172 lines (107 loc) · 9.2 KB

README.md

File metadata and controls

172 lines (107 loc) · 9.2 KB

image

Overview

  1. Demonstrate various ways, with and without Kafka Connect, to get data into Kafka topics and then loaded for use by the Kafka Streams API KStream
  2. Show some basic usage of the stream processing API

image

Additional Reading

Prerequisites

  • Common demo prerequisites
  • Confluent Platform 5.4
  • Maven command mvn to compile Java code
  • By default the timeout command is available on most Linux distributions but not Mac OS. This timeout command is used by the bash scripts to terminate consumer processes after a period of time. To install it on a Mac:
# Install coreutils
brew install coreutils

# Add a "gnubin" directory to your PATH
PATH="/usr/local/opt/coreutils/libexec/gnubin:$PATH"

Description

Example 1: Kafka console producer -> Key:String and Value:String

Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples

  • Command line confluent local produce produces String keys and String values to a Kafka topic.
  • Client application reads from the Kafka topic using Serdes.String() for both key and value.

image

Notes

KAFKA-2526: one cannot use the --key-serializer argument in the confluent local produce to serialize the key as a Long. As a result, in this example the key is serialized as a String. As a workaround, you could write your own kafka.common.MessageReader (e.g. check out the default implementation of LineMessageReader) and then you can specify --line-reader argument in the confluent local produce.

Example 2: JDBC source connector with Single Message Transformations -> Key:Long and Value:JSON

Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples

  • Kafka Connect JDBC source connector produces JSON values, and inserts the key using single message transformations, also known as SMTs. This is helpful because by default JDBC source connector does not insert a key.
  • Client application reads from the Kafka topic using Serdes.String() for key and a custom JSON Serde for the value.

image

Notes

This example uses a few SMTs including one to cast the key to an int64. The key uses the org.apache.kafka.connect.converters.LongConverter provided by KAFKA-6913.

Example 3: JDBC source connector with SpecificAvro -> Key:String(null) and Value:SpecificAvro

Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples

  • Kafka Connect JDBC source connector produces Avro values, and null String keys, to a Kafka topic.
  • Client application reads from the Kafka topic using SpecificAvroSerde for the value and then the map function to convert the stream of messages to have Long keys and custom class values.

image

Notes

This example uses a simple message transformation SetSchemaMetadata with code that has a fix for KAFKA-5164, allowing the connector to set the namespace in the schema. If you do not have the fix for KAFKA-5164, see Example 4 that uses GenericAvro instead of SpecificAvro.

Example 4: JDBC source connector with GenericAvro -> Key:String(null) and Value:GenericAvro

Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples

  • Kafka Connect JDBC source connector produces Avro values, and null String keys, to a Kafka topic.
  • Client application reads from the Kafka topic using GenericAvroSerde for the value and then the map function to convert the stream of messages to have Long keys and custom class values.

image

Notes

This example currently uses GenericAvroSerde and not SpecificAvroSerde for a specific reason. JDBC source connector currently doesn't set a namespace when it generates a schema name for the data it is producing to Kafka. For SpecificAvroSerde, the lack of namespace is a problem when trying to match reader and writer schema because Avro uses the writer schema name and namespace to create a classname and tries to load this class, but without a namespace, the class will not be found.

Example 5: Java client producer with SpecificAvro -> Key:Long and Value:SpecificAvro

Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples

  • Java client produces Long keys and SpecificAvro values to a Kafka topic.
  • Client application reads from the Kafka topic using Serdes.Long() for key and SpecificAvroSerde for the value.

image

Example 6: JDBC source connector with Avro to KSQL -> Key:Long and Value:Avro

  • Kafka Connect JDBC source connector produces Avro values, and null keys, to a Kafka topic.
  • KSQL reads from the Kafka topic and then uses PARTITION BY to create a new stream of messages with BIGINT keys.

image

Stream processing

All examples in this repo demonstrate the Kafka Streams API methods count and reduce.

Notes

KAFKA-5245: one needs to provide the Serdes twice, (1) when calling StreamsBuilder#stream() and (2) when calling KStream#groupByKey()

PR-531: Confluent distribution provides packages for GenericAvroSerde and SpecificAvroSerde

KAFKA-2378: adds APIs to be able to embed Kafka Connect into client applications

Run

What Should I see?

After you run ./start.sh:

  • You should see each of the examples run end-to-end
  • If you are running Confluent Platform, open your browser and navigate to the Control Center web interface Management -> Kafka Connect tab at http://localhost:9021/management/connect to see the two deployed connectors
  • Beyond that, the real value of this demo is to see the provided configurations and client code

Original Dataset

Dataset

1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700

image

Expected Results

Count

1|Raleigh|3
2|Dusseldorf|2
3|Moscow|3
4|Sydney|1
5|Chennai|1

image

Sum

1|Raleigh|1600
2|Dusseldorf|500
3|Moscow|1100
4|Sydney|200
5|Chennai|400

image