Conduit for AWS Kinesis.
Run make build
to build the connector.
Run make test-integration
to run the integration tests.
The Docker compose file at test/docker-compose.yml
can be used to run the required resource (AWS Kinesis via Localstack) locally.
The Source connector for AWS Kinesis opens subscriptions to each of the available shards in the stream and pushes records into the buffer until the subscription is up to date (all present source records read), at which point it switches to capturing the latest events in the stream. Every 5 minutes (the lifetime of the subscription), the subscription to the shard is refreshed.
name | description | required | default value |
---|---|---|---|
aws.accessKeyId |
Access Key ID associated with your AWS resources | true | "" |
aws.secretAccessKey |
Secret Access Key associated with your AWS resources | true | "" |
aws.region |
Region associated with your AWS resources | true | "" |
aws.url |
The URL for AWS (useful when testing the connector with localstack). | false | "" |
streamName |
The AWS Kinesis stream name | true | "" |
startFromLatest |
Set this value to true to ignore any records already in the stream | false | false |
Here's an example of a complete configuration pipeline for a Kinesis source connector.
The Destination connector for AWS Kinesis writes records to the stream either to a single shard or to multiple shards through partitionKey
. The size limit for a single record is 1MB, attempting to write a single record's data which is greater than 1MB will result in an error.
By default the partition key will consist of the record key. If the record key exceeds 256 unicode characters, the key will be trimmed at the end to fit the max partition key size.
If given a partition key go template, the key will be generated from the given template, with the record data as the main data context.
The destination supports multicollection mode, where the kinesis stream name is determined at runtime, based on the opencdc.collection
metadata field or a given go template. If no stream name is found, the destination will output a stream not found error.
name | description | required | default value |
---|---|---|---|
aws.accessKeyId |
Access Key ID associated with your AWS resources | true | "" |
aws.secretAccessKey |
Secret Access Key associated with your AWS resources | true | "" |
aws.region |
Region associated with your AWS resources | true | "" |
aws.url |
The URL for AWS (useful when testing the connector with localstack). | false | "" |
streamName |
streamName is the Kinesis stream name. It can contain a Go template that will be executed for each record to determine the stream name. By default, the stream name will come from the opencdc.collection record metadata field. |
false | {{ index .Metadata "opencdc.collection" }} |
partitionKeyTemplate |
The go template that will be used to generate partition keys. By default empty, which will generate partition keys from the record key string representation. | false | "" |
Here's an example of a complete configuration pipeline for a Kinesis destination connector.