diff --git a/README.md b/README.md index 7fd4202..a7a494f 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,33 @@ Either `field.value` or `field.value.pattern` must be defined to apply filter. Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored. +### `ExtractTopicFromValueSchema` + +This transformation checks the record value schema name and if it exists uses it as the topic name. + +- `io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name` - works on value schema name. + +By default (if SchemaNameToTopicMap or the RegExSchemaNameToTopicMap is not set) transformation uses the content of the schema.name field of the record value. + +The transformation defines the following optional configurations which can be used to tamper the schema.name: + +- `schema.name.topic-map` - Map that contains the schema.name value and corresponding new topic name value that should be used instead. Format is "SchemaValue1:NewValue1,SchemaValue2:NewValue2" so key:value pairs as comma separated list. +- `schema.name.regex` - RegEx that should be used to parse the schema.name to desired value. For example for example `(?:[.]|^)([^.]*)$` which parses the name after last dot. + +Here is an example of this transformation configuration (using :schema.name.topic-map) + +```properties +transforms=ExtractTopicFromValueSchema +transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name +transforms.ExtractTopicFromValueSchema.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2 + +``` +And here is an example of this transformation configuration (using :schema.name.regex) +```properties +transforms=ExtractTopicFromValueSchema +transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name +transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$ + ## License This project is licensed under the [Apache License, Version 2.0](LICENSE). @@ -169,3 +196,4 @@ This project is licensed under the [Apache License, Version 2.0](LICENSE). ## Trademarks Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. + diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java index 5a83b5d..6943bbd 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java @@ -56,6 +56,11 @@ final class IntegrationTest { private final TopicPartition newTopicPartition0 = new TopicPartition(TestSourceConnector.NEW_TOPIC, 0); + private final TopicPartition originalTopicValueFromSchema = + new TopicPartition(TopicFromValueSchemaConnector.TOPIC, 0); + + private final TopicPartition newTopicValueFromSchema = + new TopicPartition(TopicFromValueSchemaConnector.NAME, 0); private static File pluginsDir; @Container @@ -92,7 +97,9 @@ static void setUpAll() throws IOException, InterruptedException { assert integrationTestClassesPath.exists(); final Class[] testConnectorClasses = new Class[]{ - TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class + TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class, + TopicFromValueSchemaConnector.class, + TopicFromValueSchemaConnector.TopicFromValueSchemaConnectorTask.class }; for (final Class clazz : testConnectorClasses) { final String packageName = clazz.getPackage().getName(); @@ -127,7 +134,12 @@ void setUp() throws ExecutionException, InterruptedException { final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1); final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1); - adminClient.createTopics(Arrays.asList(originalTopic, newTopic)).all().get(); + final NewTopic originalTopicForExtractTopicFromValue = + new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1); + final NewTopic newTopicForExtractTopicFromValue = + new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1); + adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue, + newTopicForExtractTopicFromValue)).all().get(); connectRunner = new ConnectRunner(pluginsDir, kafka.getBootstrapServers()); connectRunner.start(); @@ -159,19 +171,42 @@ final void testExtractTopic() throws ExecutionException, InterruptedException, I connectorConfig.put("tasks.max", "1"); connectRunner.createConnector(connectorConfig); + checkMessageTopics(originalTopicPartition0, newTopicPartition0); + } + + @Test + @Timeout(10) + final void testExtractTopicFromValueSchemaName() throws ExecutionException, InterruptedException, IOException { + final Map connectorConfig = new HashMap<>(); + connectorConfig.put("name", "test-source-connector"); + connectorConfig.put("connector.class", TopicFromValueSchemaConnector.class.getName()); + connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter.value.subject.name.strategy", + "io.confluent.kafka.serializers.subject.RecordNameStrategy"); + connectorConfig.put("transforms", + "ExtractTopicFromValueSchema"); + connectorConfig.put("transforms.ExtractTopicFromValueSchema.type", + "io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name"); + connectorConfig.put("tasks.max", "1"); + connectRunner.createConnector(connectorConfig); + checkMessageTopics(originalTopicValueFromSchema, newTopicValueFromSchema); + + } + + final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition) + throws InterruptedException { waitForCondition( - () -> consumer - .endOffsets(Arrays.asList(originalTopicPartition0, newTopicPartition0)) - .values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE) - .orElse(false), - 5000, "Messages appear in any topic" + () -> consumer.endOffsets(Arrays.asList(originalTopicPartition, newTopicPartition)) + .values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE) + .orElse(false), 5000, "Messages appear in any topic" ); final Map endOffsets = consumer.endOffsets( - Arrays.asList(originalTopicPartition0, newTopicPartition0)); + Arrays.asList(originalTopicPartition, newTopicPartition)); // The original topic should be empty. - assertEquals(0, endOffsets.get(originalTopicPartition0)); + assertEquals(0, endOffsets.get(originalTopicPartition)); // The new topic should be non-empty. - assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition0)); + assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition)); } private void waitForCondition(final Supplier conditionChecker, diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java new file mode 100644 index 0000000..cce12fa --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java @@ -0,0 +1,121 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A connector needed for testing of ExtractTopicFromValueSchema. + * + *

It just produces a fixed number of struct records with value schema name set. + */ +public class TopicFromValueSchemaConnector extends SourceConnector { + static final int MESSAGES_TO_PRODUCE = 10; + + private static final Logger log = LoggerFactory.getLogger(TopicFromValueSchemaConnector.class); + static final String TOPIC = "topic-for-value-schema-connector-test"; + static final String FIELD = "field-0"; + + static final String NAME = "com.acme.schema.SchemaNameToTopic"; + + @Override + public void start(final Map props) { + } + + @Override + public Class taskClass() { + return TopicFromValueSchemaConnectorTask.class; + } + + @Override + public List> taskConfigs(final int maxTasks) { + return Collections.singletonList(Collections.emptyMap()); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public String version() { + return null; + } + + public static class TopicFromValueSchemaConnectorTask extends SourceTask { + private int counter = 0; + + private final Schema valueSchema = SchemaBuilder.struct() + .field(FIELD, SchemaBuilder.STRING_SCHEMA) + .name(NAME) + .schema(); + private final Struct value = new Struct(valueSchema).put(FIELD, "Data"); + + @Override + public void start(final Map props) { + log.info("Started TopicFromValueSchemaConnector!!!"); + } + + @Override + public List poll() throws InterruptedException { + if (counter >= MESSAGES_TO_PRODUCE) { + return null; // indicate pause + } + + final Map sourcePartition = new HashMap<>(); + sourcePartition.put("partition", "0"); + final Map sourceOffset = new HashMap<>(); + sourceOffset.put("offset", Integer.toString(counter)); + + counter += 1; + + return Collections.singletonList( + new SourceRecord(sourcePartition, sourceOffset, + TOPIC, + valueSchema, value) + ); + } + + @Override + public void stop() { + } + + @Override + public String version() { + return null; + } + } +} diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchema.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchema.java new file mode 100644 index 0000000..1aa3e22 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchema.java @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.Transformation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ExtractTopicFromValueSchema> implements Transformation { + + private static final Logger log = LoggerFactory.getLogger(ExtractTopicFromValueSchema.class); + + private ExtractTopicFromValueSchemaConfig config; + private Optional regex; + + private Optional> schemaNameToTopicMap; + private Pattern pattern; + + @Override + public ConfigDef config() { + return ExtractTopicFromValueSchemaConfig.config(); + } + + @Override + public void configure(final Map configs) { + this.config = new ExtractTopicFromValueSchemaConfig(configs); + schemaNameToTopicMap = config.schemaNameToTopicMap(); + regex = config.regEx(); + if (regex.isPresent()) { + pattern = Pattern.compile(regex.get()); + } + } + + @Override + public R apply(final R record) { + + if (null == record.valueSchema() || null == record.valueSchema().name()) { + throw new DataException(" value schema name can't be null: " + record); + } + // If no extra configs use record.valueSchema().name() -> newTopic + if (null == config) { + return createConnectRecord(record, Optional.ofNullable(record.valueSchema().name()).orElse(record.topic())); + } + // First check schema value name -> desired topic name mapping and use that if it is set. + if (schemaNameToTopicMap.isPresent() && schemaNameToTopicMap.get().containsKey(record.valueSchema().name())) { + return createConnectRecord(record, schemaNameToTopicMap.get().get(record.valueSchema().name())); + } + // Secondly check if regex parsing from schema value name is set and use that. + final Optional regex = config.regEx(); + if (regex.isPresent()) { + final Matcher matcher = pattern.matcher(record.valueSchema().name()); + if (matcher.find() && matcher.groupCount() == 1) { + return createConnectRecord(record, matcher.group(1)); + } + log.trace("No match with pattern {} from schema name {}", pattern.pattern(), record.valueSchema().name()); + } + // If no other configurations are set use value schema name as new topic name. + return createConnectRecord(record, record.valueSchema().name()); + } + + private R createConnectRecord(final R record, final String newTopicName) { + return record.newRecord( + newTopicName, + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp(), + record.headers() + ); + } + + @Override + public void close() { + } + + public static class Name> extends ExtractTopicFromValueSchema { + + @Override + public void close() { + } + } +} diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaConfig.java new file mode 100644 index 0000000..e22f179 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaConfig.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +public class ExtractTopicFromValueSchemaConfig extends AbstractConfig { + public static final String SCHEMA_NAME_TO_TOPIC = "schema.name.topic-map"; + public static final String REGEX_SCHEMA_NAME_TO_TOPIC = "schema.name.regex"; + + public ExtractTopicFromValueSchemaConfig(final Map originals) { + super(config(), originals); + } + + static ConfigDef config() { + return new ConfigDef().define( + SCHEMA_NAME_TO_TOPIC, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + "Map of schema name (key), " + + "new topic name (value) in String format \"key1:value1,key2:value2\"") + .define(REGEX_SCHEMA_NAME_TO_TOPIC, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + "Regular expression that is used to find the " + + "first desired new topic value from value schema name " + + "(for example (?:[.\\t]|^)([^.\\t]*)$ which parses the name after last ." + ); + } + + Optional> schemaNameToTopicMap() { + final String schemaNameToTopicValue = getString(SCHEMA_NAME_TO_TOPIC); + if (null == schemaNameToTopicValue) { + return Optional.empty(); + } + final HashMap schemaNameToTopicMap = + (HashMap) Arrays.asList(schemaNameToTopicValue.split(",")) + .stream().map(entry -> entry.split(":")) + .collect(Collectors.toMap(key -> key[0], value -> value[1])); + if (schemaNameToTopicMap.isEmpty()) { + return Optional.empty(); + } + return Optional.of(schemaNameToTopicMap); + } + + Optional regEx() { + final String rawFieldName = getString(REGEX_SCHEMA_NAME_TO_TOPIC); + if (null == rawFieldName || "".equals(rawFieldName)) { + return Optional.empty(); + } + return Optional.of(rawFieldName); + } +} diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaTest.java new file mode 100644 index 0000000..c8528f4 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicFromValueSchemaTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.sink.SinkRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ExtractTopicFromValueSchemaTest { + @Test + void nullConfigsValueSchemaNameToTopicTest() { + + final Schema keySchema = SchemaBuilder.struct().keySchema(); + final Schema valueSchema = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic").schema(); + final SinkRecord originalRecord = record(keySchema, "key", valueSchema, "{}"); + final SinkRecord transformedRecord = transformation(null).apply(originalRecord); + assertEquals("com.acme.schema.SchemaNameToTopic", transformedRecord.topic()); + + } + + @Test + void emptyConfigsValueSchemaNameToTopicTest() { + + final Schema keySchema = SchemaBuilder.struct().keySchema(); + final Schema valueSchema = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic").schema(); + final SinkRecord originalRecord = record(keySchema, "key", valueSchema, "{}"); + final SinkRecord transformedRecord = transformation(new HashMap<>()).apply(originalRecord); + assertEquals("com.acme.schema.SchemaNameToTopic", transformedRecord.topic()); + + } + + @Test + void configMapValueSchemaNameToTopicTest() { + final Map configs = new HashMap<>(); + configs.put(ExtractTopicFromValueSchemaConfig.SCHEMA_NAME_TO_TOPIC, + "com.acme.schema.SchemaNameToTopic1:TheNameToReplace1," + + "com.acme.schema.SchemaNameToTopic2:TheNameToReplace2," + + "com.acme.schema.SchemaNameToTopic3:TheNameToReplace3" + ); + final Schema keySchema = SchemaBuilder.struct().keySchema(); + final Schema valueSchema = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic1").schema(); + final SinkRecord originalRecord = record(keySchema, "key", valueSchema, "{}"); + final SinkRecord transformedRecord = transformation(configs).apply(originalRecord); + assertEquals("TheNameToReplace1", transformedRecord.topic()); + + final Schema valueSchema2 = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic3").schema(); + final SinkRecord originalRecord2 = record(keySchema, "key", valueSchema2, "{}"); + final SinkRecord transformedRecord2 = transformation(configs).apply(originalRecord2); + assertEquals("TheNameToReplace3", transformedRecord2.topic()); + + } + + @Test + void regexConfigValueAfterLastDotToTopicTest() { + final Map configs = new HashMap<>(); + // pass regegx that will parse the class name after last dot + configs.put(ExtractTopicFromValueSchemaConfig.REGEX_SCHEMA_NAME_TO_TOPIC, + "(?:[.]|^)([^.]*)$"); + final Schema keySchema = SchemaBuilder.struct().keySchema(); + final Schema valueSchema = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic").schema(); + final SinkRecord originalRecord = record(keySchema, "key", valueSchema, "{}"); + final SinkRecord transformedRecord = transformation(configs).apply(originalRecord); + assertEquals("SchemaNameToTopic", transformedRecord.topic()); + + } + + @Test + void regexNoMatchToTopicTest() { + final Map configs = new HashMap<>(); + // pass regegx that will parse the class name after last dot + configs.put(ExtractTopicFromValueSchemaConfig.REGEX_SCHEMA_NAME_TO_TOPIC, + "/[^;]*/"); + final Schema keySchema = SchemaBuilder.struct().keySchema(); + final Schema valueSchema = SchemaBuilder.struct().name("com.acme.schema.SchemaNameToTopic").schema(); + final SinkRecord originalRecord = record(keySchema, "key", valueSchema, "{}"); + final SinkRecord transformedRecord = transformation(configs).apply(originalRecord); + assertEquals("com.acme.schema.SchemaNameToTopic", transformedRecord.topic()); + + } + + private ExtractTopicFromValueSchema transformation(final Map configs) { + final ExtractTopicFromValueSchema transform = createTransformationObject(); + if (configs != null) { + transform.configure(configs); + } + return transform; + } + + protected ExtractTopicFromValueSchema createTransformationObject() { + return new ExtractTopicFromValueSchema.Name<>(); + } + + protected SinkRecord record(final Schema keySchema, + final Object key, + final Schema valueSchema, + final Object value) { + return new SinkRecord("original_topic", 0, + keySchema, key, + valueSchema, value, + 123L, + 456L, TimestampType.CREATE_TIME); + } +}