This uses Docker Compose to run the Kafka Connect worker.
-
Create the S3 bucket, make a note of the region
-
Obtain your access key pair
-
Update
aws_credentials
-
Alternatively, uncomment the
environment
lines forAWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
and set the values here instead
-
-
Bring the Docker Compose up
docker-compose up -d
-
Make sure everything is up and running
$ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------- broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp kafka-connect bash -c # Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp echo "Installing ... ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
-
Create the Sink connector
curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-voluble/config \ -d ' { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "tasks.max": "1", "topics": "cats", "s3.region": "us-east-1", "s3.bucket.name": "rmoff-voluble-test", "flush.size": "65536", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "transforms": "AddMetadata", "transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.AddMetadata.offset.field": "_offset", "transforms.AddMetadata.partition.field": "_partition" } '
Things to customise for your environment:
If you want to create the data generator and view the data in ksqlDB:
docker exec -it ksqldb ksql http://ksqldb:8088
CREATE SOURCE CONNECTOR s WITH (
'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
'genkp.owners.with' = '#{Internet.uuid}',
'genv.owners.name.with' = '#{Name.full_name}',
'genv.owners.creditCardNumber.with' = '#{Finance.credit_card}',
'genk.cats.name.with' = '#{FunnyName.name}',
'genv.cats.owner.matching' = 'owners.key',
'genk.diets.catName.matching' = 'cats.key.name',
'genv.diets.dish.with' = '#{Food.vegetables}',
'genv.diets.measurement.with' = '#{Food.measurements}',
'genv.diets.size.with' = '#{Food.measurement_sizes}',
'genk.adopters.name.sometimes.with' = '#{Name.full_name}',
'genk.adopters.name.sometimes.matching' = 'adopters.key.name',
'genv.adopters.jobTitle.with' = '#{Job.title}',
'attrk.adopters.name.matching.rate' = '0.05',
'topic.adopters.tombstone.rate' = '0.10',
'global.history.records.max' = '100000'
);
SHOW TOPICS;
PRINT cats;
References