Skip to content

Commit

Permalink
Implementation for new SMT ExtractTopicFromValueSchema and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaka-paf committed Jun 8, 2023
1 parent 9e9456e commit 08c0396
Show file tree
Hide file tree
Showing 6 changed files with 502 additions and 10 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,38 @@ Either `field.value` or `field.value.pattern` must be defined to apply filter.

Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored.

### `ExtractTopicFromValueSchema`

This transformation checks the record value schema name and if it exists uses it as the topic name.

- `io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name` - works on value schema name.

By default (if SchemaNameToTopicMap or the RegExSchemaNameToTopicMap is not set) transformation uses the content of the schema.name field of the record value.

The transformation defines the following optional configurations which can be used to tamper the schema.name:

- `schema.name.topic-map` - Map that contains the schema.name value and corresponding new topic name value that should be used instead. Format is "SchemaValue1:NewValue1,SchemaValue2:NewValue2" so key:value pairs as comma separated list.
- `schema.name.regex` - RegEx that should be used to parse the schema.name to desired value. For example for example `(?:[.]|^)([^.]*)$` which parses the name after last dot.

Here is an example of this transformation configuration (using :schema.name.topic-map)

```properties
transforms=ExtractTopicFromValueSchema
transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name
transforms.ExtractTopicFromValueSchema.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2

```
And here is an example of this transformation configuration (using :schema.name.regex)
```properties
transforms=ExtractTopicFromValueSchema
transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name
transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$

## License

This project is licensed under the [Apache License, Version 2.0](LICENSE).

## Trademarks

Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ final class IntegrationTest {
private final TopicPartition newTopicPartition0 =
new TopicPartition(TestSourceConnector.NEW_TOPIC, 0);

private final TopicPartition originalTopicValueFromSchema =
new TopicPartition(TopicFromValueSchemaConnector.TOPIC, 0);

private final TopicPartition newTopicValueFromSchema =
new TopicPartition(TopicFromValueSchemaConnector.NAME, 0);
private static File pluginsDir;

@Container
Expand Down Expand Up @@ -92,7 +97,9 @@ static void setUpAll() throws IOException, InterruptedException {
assert integrationTestClassesPath.exists();

final Class<?>[] testConnectorClasses = new Class[]{
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class,
TopicFromValueSchemaConnector.class,
TopicFromValueSchemaConnector.TopicFromValueSchemaConnectorTask.class
};
for (final Class<?> clazz : testConnectorClasses) {
final String packageName = clazz.getPackage().getName();
Expand Down Expand Up @@ -127,7 +134,12 @@ void setUp() throws ExecutionException, InterruptedException {

final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1);
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
adminClient.createTopics(Arrays.asList(originalTopic, newTopic)).all().get();
final NewTopic originalTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
final NewTopic newTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
newTopicForExtractTopicFromValue)).all().get();

connectRunner = new ConnectRunner(pluginsDir, kafka.getBootstrapServers());
connectRunner.start();
Expand Down Expand Up @@ -159,19 +171,42 @@ final void testExtractTopic() throws ExecutionException, InterruptedException, I
connectorConfig.put("tasks.max", "1");
connectRunner.createConnector(connectorConfig);

checkMessageTopics(originalTopicPartition0, newTopicPartition0);
}

@Test
@Timeout(10)
final void testExtractTopicFromValueSchemaName() throws ExecutionException, InterruptedException, IOException {
final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("name", "test-source-connector");
connectorConfig.put("connector.class", TopicFromValueSchemaConnector.class.getName());
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter.value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
connectorConfig.put("transforms",
"ExtractTopicFromValueSchema");
connectorConfig.put("transforms.ExtractTopicFromValueSchema.type",
"io.aiven.kafka.connect.transforms.ExtractTopicFromValueSchema$Name");
connectorConfig.put("tasks.max", "1");
connectRunner.createConnector(connectorConfig);
checkMessageTopics(originalTopicValueFromSchema, newTopicValueFromSchema);

}

final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition)
throws InterruptedException {
waitForCondition(
() -> consumer
.endOffsets(Arrays.asList(originalTopicPartition0, newTopicPartition0))
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
.orElse(false),
5000, "Messages appear in any topic"
() -> consumer.endOffsets(Arrays.asList(originalTopicPartition, newTopicPartition))
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
.orElse(false), 5000, "Messages appear in any topic"
);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(
Arrays.asList(originalTopicPartition0, newTopicPartition0));
Arrays.asList(originalTopicPartition, newTopicPartition));
// The original topic should be empty.
assertEquals(0, endOffsets.get(originalTopicPartition0));
assertEquals(0, endOffsets.get(originalTopicPartition));
// The new topic should be non-empty.
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition0));
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition));
}

private void waitForCondition(final Supplier<Boolean> conditionChecker,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A connector needed for testing of ExtractTopicFromValueSchema.
*
* <p>It just produces a fixed number of struct records with value schema name set.
*/
public class TopicFromValueSchemaConnector extends SourceConnector {
static final int MESSAGES_TO_PRODUCE = 10;

private static final Logger log = LoggerFactory.getLogger(TopicFromValueSchemaConnector.class);
static final String TOPIC = "topic-for-value-schema-connector-test";
static final String FIELD = "field-0";

static final String NAME = "com.acme.schema.SchemaNameToTopic";

@Override
public void start(final Map<String, String> props) {
}

@Override
public Class<? extends Task> taskClass() {
return TopicFromValueSchemaConnectorTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
return Collections.singletonList(Collections.emptyMap());
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public String version() {
return null;
}

public static class TopicFromValueSchemaConnectorTask extends SourceTask {
private int counter = 0;

private final Schema valueSchema = SchemaBuilder.struct()
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
.name(NAME)
.schema();
private final Struct value = new Struct(valueSchema).put(FIELD, "Data");

@Override
public void start(final Map<String, String> props) {
log.info("Started TopicFromValueSchemaConnector!!!");
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
if (counter >= MESSAGES_TO_PRODUCE) {
return null; // indicate pause
}

final Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("partition", "0");
final Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put("offset", Integer.toString(counter));

counter += 1;

return Collections.singletonList(
new SourceRecord(sourcePartition, sourceOffset,
TOPIC,
valueSchema, value)
);
}

@Override
public void stop() {
}

@Override
public String version() {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ExtractTopicFromValueSchema<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger log = LoggerFactory.getLogger(ExtractTopicFromValueSchema.class);

private ExtractTopicFromValueSchemaConfig config;
private Optional<String> regex;

private Optional<Map<String, String>> schemaNameToTopicMap;
private Pattern pattern;

@Override
public ConfigDef config() {
return ExtractTopicFromValueSchemaConfig.config();
}

@Override
public void configure(final Map<String, ?> configs) {
this.config = new ExtractTopicFromValueSchemaConfig(configs);
schemaNameToTopicMap = config.schemaNameToTopicMap();
regex = config.regEx();
if (regex.isPresent()) {
pattern = Pattern.compile(regex.get());
}
}

@Override
public R apply(final R record) {

if (null == record.valueSchema() || null == record.valueSchema().name()) {
throw new DataException(" value schema name can't be null: " + record);
}
// If no extra configs use record.valueSchema().name() -> newTopic
if (null == config) {
return createConnectRecord(record, Optional.ofNullable(record.valueSchema().name()).orElse(record.topic()));
}
// First check schema value name -> desired topic name mapping and use that if it is set.
if (schemaNameToTopicMap.isPresent() && schemaNameToTopicMap.get().containsKey(record.valueSchema().name())) {
return createConnectRecord(record, schemaNameToTopicMap.get().get(record.valueSchema().name()));
}
// Secondly check if regex parsing from schema value name is set and use that.
final Optional<String> regex = config.regEx();
if (regex.isPresent()) {
final Matcher matcher = pattern.matcher(record.valueSchema().name());
if (matcher.find() && matcher.groupCount() == 1) {
return createConnectRecord(record, matcher.group(1));
}
log.trace("No match with pattern {} from schema name {}", pattern.pattern(), record.valueSchema().name());
}
// If no other configurations are set use value schema name as new topic name.
return createConnectRecord(record, record.valueSchema().name());
}

private R createConnectRecord(final R record, final String newTopicName) {
return record.newRecord(
newTopicName,
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
record.value(),
record.timestamp(),
record.headers()
);
}

@Override
public void close() {
}

public static class Name<R extends ConnectRecord<R>> extends ExtractTopicFromValueSchema<R> {

@Override
public void close() {
}
}
}
Loading

0 comments on commit 08c0396

Please sign in to comment.