This project contains common transformations for every day use cases with Kafka Connect.
The following command can be used to install the plugin directly from the Confluent Hub using the Confluent Hub Client.
confluent-hub install jcustenborder/kafka-connect-transform-common:latest
The zip file that is deployed to the Confluent Hub is available under
target/components/packages/
. You can manually extract this zip file which includes all dependencies. All the dependencies
that are required to deploy the plugin are under target/kafka-connect-target
as well. Make sure that you include all the dependencies that are required
to run the plugin.
- Create a directory under the
plugin.path
on your Connect worker. - Copy all of the dependencies under the newly created subdirectory.
- Restart the Connect worker.
Key
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value
The charset to use when creating the output string.
Importance: HIGH
Type: STRING
Default Value: UTF-8
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value
The format to move from
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase
This transformation is used to change the case of a topic.
This transformation will convert a topic name like 'TOPIC_NAME' to topicName
, or topic_name
.
The format of the incoming topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
The format of the outgoing topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value
The field on the child struct containing the field to be extracted. For example if you wanted the extract address.state
you would use state
.
Importance: HIGH
Type: STRING
The field on the parent struct containing the child struct. For example if you wanted the extract address.state
you would use address
.
Importance: HIGH
Type: STRING
The field to place the extracted value into.
Importance: HIGH
Type: STRING
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value
This transformation is used to use a field from the input data to override the timestamp for the record.
The field to pull the timestamp from. This must be an int64 or a timestamp.
Importance: HIGH
Type: STRING
Key
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value
The mapping of the header to the field in the message.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value
This transformation is used to convert older schema versions to the latest schema version. This works by keying all of the schemas that are coming into the transformation by their schema name and comparing the version() of the schema. The latest version of a schema will be used. Schemas are discovered as the flow through the transformation. The latest version of a schema is what is used.
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Value
The regex to test the message with.
Importance: HIGH
Type: STRING
Validator: com.github.jcustenborder.kafka.connect.utils.config.validators.PatternValidator@4170ee0f
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value
Importance: HIGH
Type: STRING
Importance: HIGH
Type: STRING
Importance: LOW
Type: LIST
Default Value: [CASE_INSENSITIVE]
Validator: [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES]
Key
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Value
This transformation is used to take the name from the schema for the key or value and replace the topic with this value.
Key
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Value
This transformation is used to ensure that all decimal fields in a struct are below the maximum precision specified.
The Confluent AvroConverter uses a default precision of 64 which can be too large for some database systems.
The maximum precision allowed.
Importance: HIGH
Type: INT
Validator: [1,...,64]
Key
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Value
com.github.jcustenborder.kafka.connect.transform.common.TimestampNow
This transformation is used to override the timestamp of the incoming record to the time the record is being processed.
Key
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value
This transformation is used to set a field with the current timestamp of the system running the transformation.
The field(s) that will be inserted with the timestamp of the system.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value
The connect schema type to output the converted JSON as.
Importance: MEDIUM
Type: STRING
Default Value: STRING
Validator: [STRING, BYTES]
Flag to determine if the JSON data should include the schema.
Importance: MEDIUM
Type: BOOLEAN
Key
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Value
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Value
The field to insert the topic name.
Importance: HIGH
Type: STRING
mvn clean package
Contributions are always welcomed! Before you start any development please create an issue and
start a discussion. Create a pull request against your newly created issue and we're happy to see
if we can merge your pull request. First and foremost any time you're adding code to the code base
you need to include test coverage. Make sure that you run mvn clean package
before submitting your
pull to ensure that all of the tests, checkstyle rules, and the package can be successfully built.