See also:
Tested with Confluent Platform 6.1, ksqlDB 0.15, Kafka Connect JDBC connector 5.5.3
This uses Docker Compose to run the Kafka Connect worker.
-
Bring the Docker Compose up
docker-compose up -d
-
Make sure everything is up and running
➜ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------------- broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp kafka-connect bash -c cd /usr/share/conf ... Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp kafkacat /bin/sh -c apk add jq; Up wh ... ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp mysql docker-entrypoint.sh mysqld Up 0.0.0.0:3306->3306/tcp, 33060/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
-
Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)
docker exec -it ksqldb ksql http://ksqldb:8088
CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR) WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO'); INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',2,'BAR');
SHOW TOPICS; PRINT test01 FROM BEGINNING LIMIT 2;
-
Create the Sink connector
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \ -H "Content-Type: application/json" -d '{ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url" : "jdbc:mysql://mysql:3306/demo", "topics" : "test01", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter" : "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user" : "connect_user", "connection.password" : "asgard", "auto.create" : true, "auto.evolve" : true, "insert.mode" : "insert", "pk.mode" : "record_key", "pk.fields" : "MESSAGE_KEY" }'
Things to customise for your environment:
-
topics
: the source topic(s) you want to send to S3 -
key.converter
: match the serialisation of your source data (see here) -
value.converter
: match the serialisation of your source data (see here)Check data in target db
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
mysql> SHOW TABLES; +----------------+ | Tables_in_demo | +----------------+ | test01 | +----------------+ 1 row in set (0.00 sec) mysql> SELECT * FROM test01; +------+------+-------------+ | COL1 | COL2 | MESSAGE_KEY | +------+------+-------------+ | 1 | FOO | X | | 2 | BAR | Y | +------+------+-------------+ 2 rows in set (0.00 sec)
-
-
Insert some more data. Notice what happens if you re-use a key.
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Z',1,'WOO'); INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',4,'PFF');
The Kafka Connect tasks status shows that there’s a duplicate key error
$ curl -s http://localhost:8083/connectors/sink-jdbc-mysql-01/tasks/0/status | jq '.' { "id": 0, "state": "FAILED", "worker_id": "kafka-connect:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\nCaused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)\n\t... 11 more\n" }
-
Update the Sink connector to use
UPSERT
modecurl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "test01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "MESSAGE_KEY" }'
Now the row for key
Y
updates, whilstZ
is added:mysql> SELECT * FROM test01; +------+------+-------------+ | COL1 | COL2 | MESSAGE_KEY | +------+------+-------------+ | 1 | FOO | X | | 4 | PFF | Y | | 1 | WOO | Z | +------+------+-------------+ 3 rows in set (0.00 sec)
-
Drop fields, add fields - note how the target schema evolves in-place
# Drop the existing connector curl -X DELETE http://localhost:8083/connectors/sink-jdbc-mysql-01 # Create a new one, reading from the same topic with new config # Because it's got a new name, the connector will re-read all the messages # from the topic. curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "test01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "MESSAGE_KEY", "transforms": "dropSome,addSome", "transforms.dropSome.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.dropSome.blacklist": "COL2", "transforms.addSome.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addSome.partition.field": "_partition", "transforms.addSome.timestamp.field" : "RECORD_TS" }'
mysql> describe test01; +-------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+---------+-------+ | COL1 | int | YES | | NULL | | | COL2 | varchar(256) | YES | | NULL | | | MESSAGE_KEY | varchar(256) | NO | PRI | NULL | | | _partition | int | YES | | NULL | | | RECORD_TS | datetime(3) | YES | | NULL | | +-------------+--------------+------+-----+---------+-------+ 5 rows in set (0.00 sec) mysql> select * from test01; +------+------+-------------+------------+-------------------------+ | COL1 | COL2 | MESSAGE_KEY | _partition | RECORD_TS | +------+------+-------------+------------+-------------------------+ | 1 | FOO | X | 0 | 2021-03-11 11:50:00.759 | | 4 | PFF | Y | 0 | 2021-03-11 11:50:47.761 | | 1 | WOO | Z | 0 | 2021-03-11 11:50:47.682 | +------+------+-------------+------------+-------------------------+ 3 rows in set (0.00 sec)
-
Write some CSV and JSON to new topics
docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_json_data <<EOF { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } { "ID": 2, "Artist": "asdfasd", "Song": "dsfjfghg" } EOF docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_json_data_with_a_schema <<EOF { "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } } EOF docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_csv_data <<EOF 1,Rick Astley,Never Gonna Give You Up EOF
-
Stream the JSON data that has a schema to DB:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \ -H "Content-Type: application/json" -d '{ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url" : "jdbc:mysql://mysql:3306/demo", "topics" : "some_json_data_with_a_schema", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "connection.user" : "connect_user", "connection.password" : "asgard", "auto.create" : true, "auto.evolve" : true, "insert.mode" : "insert" }'
-
Use ksqlDB to apply a schema to the CSV and schemaless-JSON, and show off
INSERT INTO
for merging two topics into one with a common schemaCREATE STREAM SOME_JSON (ID INT, ARTIST VARCHAR, SONG VARCHAR) WITH (KAFKA_TOPIC='some_json_data', VALUE_FORMAT='JSON'); SET 'auto.offset.reset' = 'earliest'; CREATE STREAM SOME_JSON_AS_AVRO WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM SOME_JSON; CREATE STREAM SOME_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) WITH (KAFKA_TOPIC='some_csv_data', VALUE_FORMAT='DELIMITED'); INSERT INTO SOME_JSON_AS_AVRO SELECT * FROM SOME_CSV;
-
Create a sink for the reserialized data
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-avro/config \ -H "Content-Type: application/json" -d '{ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url" : "jdbc:mysql://mysql:3306/demo", "topics" : "SOME_JSON_AS_AVRO", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter" : "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user" : "connect_user", "connection.password" : "asgard", "auto.create" : true, "auto.evolve" : true, "insert.mode" : "insert" }'
References