Skip to content

Commit

Permalink
Compatible kafka connect json apache#4137
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 7, 2023
1 parent cf55a72 commit 9995a38
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 105 deletions.
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-compatible-connect-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
COMPATIBLE_DEBEZIUM_JSON
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
Expand Down Expand Up @@ -265,6 +266,10 @@ private void setDeserialization(Config config) {
CanalJsonDeserializationSchema.builder(typeInfo)
.setIgnoreParseErrors(true)
.build();
case COMPATIBLE_KAFKA_CONNECT_JSON:
deserializationSchema =
new CompatibleKafkaConnectDeserializationSchema(
typeInfo, config, false, false);
break;
default:
throw new SeaTunnelJsonFormatException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -150,8 +151,17 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
recordList) {

try {
deserializationSchema.deserialize(
record.value(), output);
if (deserializationSchema
instanceof
CompatibleKafkaConnectDeserializationSchema) {
((CompatibleKafkaConnectDeserializationSchema)
deserializationSchema)
.deserialize(
record, output);
} else {
deserializationSchema.deserialize(
record.value(), output);
}
} catch (IOException e) {
if (this.messageFormatErrorHandleWay
== MessageFormatErrorHandleWay
Expand Down
1 change: 1 addition & 0 deletions seatunnel-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<module>seatunnel-format-json</module>
<module>seatunnel-format-text</module>
<module>seatunnel-format-compatible-debezium-json</module>
<module>seatunnel-format-compatible-connect-json</module>
</modules>

</project>
53 changes: 53 additions & 0 deletions seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-formats</artifactId>
<version>${revision}</version>
</parent>

<artifactId>seatunnel-format-compatible-connect-json</artifactId>
<name>SeaTunnel : Formats : Compatible Kafka Connect Json</name>
<properties>
<kafka.connect.version>3.4.0</kafka.connect.version>
<debezium.version>2.1.2.Final</debezium.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.connect.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.connect.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.format.compatible.kafka.connect.json;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.util.ConnectUtils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordState;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

/** Compatible kafka connect deserialization schema */
@RequiredArgsConstructor
public class CompatibleKafkaConnectDeserializationSchema
implements DeserializationSchema<SeaTunnelRow> {

private static final String INCLUDE_SCHEMA_METHOD = "convertToJsonWithEnvelope";
private static final String EXCLUDE_SCHEMA_METHOD = "convertToJsonWithoutEnvelope";
private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
private static final String OP_FIELD = "__op";
private transient JsonConverter keyConverter;
private transient JsonConverter valueConverter;
private transient ExtractNewRecordState<SinkRecord> extractNewRecordState;
private transient Method keyConverterMethod;
private transient Method valueConverterMethod;
private final SeaTunnelRowType seaTunnelRowType;
private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
private final Boolean fromDebeziumRecord;
private final boolean keySchemaEnable;
private final boolean valueSchemaEnable;
/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();

public CompatibleKafkaConnectDeserializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
@NonNull Config config,
boolean failOnMissingField,
boolean ignoreParseErrors) {

Map<String, String> configMap = ReadonlyConfig.fromConfig(config).toMap();
this.seaTunnelRowType = seaTunnelRowType;
this.fromDebeziumRecord = KafkaConnectJsonFormatOptions.getDebeziumRecordEnabled(configMap);
this.keySchemaEnable =
KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap);
this.valueSchemaEnable =
KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap);

// Runtime converter
this.runtimeConverter =
new JsonToRowConverters(failOnMissingField, ignoreParseErrors)
.createConverter(checkNotNull(seaTunnelRowType));
}

@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
throw new UnsupportedEncodingException();
}

/**
* Deserialize consumer record
*
* @param msg
* @param out
* @throws Exception
*/
public void deserialize(ConsumerRecord<byte[], byte[]> msg, Collector<SeaTunnelRow> out)
throws InvocationTargetException, IllegalAccessException {
tryInit();
SinkRecord record = convertToSinkRecord(msg);
RowKind rowKind = RowKind.INSERT;
if (fromDebeziumRecord) {
// Rewrite capture data
record = convertByTransforms(record);
rowKind = getDebeziumRecordRowKind(record);
}
JsonNode jsonNode =
(JsonNode)
valueConverterMethod.invoke(
valueConverter, record.valueSchema(), record.value());
JsonNode payload = jsonNode.get(KAFKA_CONNECT_SINK_RECORD_PAYLOAD);
if (payload.isArray()) {
ArrayNode arrayNode = (ArrayNode) payload;
for (int i = 0; i < arrayNode.size(); i++) {
SeaTunnelRow row = convertJsonNode(arrayNode.get(i));
row.setRowKind(rowKind);
out.collect(row);
}
} else {
SeaTunnelRow row = convertJsonNode(payload);
row.setRowKind(rowKind);
out.collect(row);
}
}

private RowKind getDebeziumRecordRowKind(SinkRecord record) {
Header header = record.headers().lastWithName(OP_FIELD);
if (header != null) {
String op = header.value().toString();
Envelope.Operation operation = Envelope.Operation.valueOf(op);
switch (operation) {
case CREATE:
return RowKind.INSERT;
case UPDATE:
return RowKind.UPDATE_AFTER;
case DELETE:
return RowKind.DELETE;
}
}
return RowKind.INSERT;
}

private SeaTunnelRow convertJsonNode(JsonNode jsonNode) {
if (jsonNode.isNull()) {
return null;
}
try {
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode jsonData =
objectMapper.readTree(jsonNode.toString());
return (SeaTunnelRow) runtimeConverter.convert(jsonData);
} catch (Throwable t) {
throw new SeaTunnelJsonFormatException(
CommonErrorCode.JSON_OPERATION_FAILED,
String.format("Failed to deserialize JSON '%s'.", jsonNode),
t);
}
}

private SinkRecord convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg) {
SchemaAndValue keyAndSchema =
keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key());
SchemaAndValue valueAndSchema =
valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value());
Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
return new SinkRecord(
msg.topic(),
msg.partition(),
keyAndSchema.schema(),
keyAndSchema.value(),
valueAndSchema.schema(),
valueAndSchema.value(),
msg.offset(),
timestamp,
msg.timestampType(),
null);
}

private SinkRecord convertByTransforms(SinkRecord record) {
return this.extractNewRecordState.apply(record);
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
}

private void tryInit() {
if (keyConverter == null) {
synchronized (this) {
if (keyConverter == null) {
keyConverter = new JsonConverter();
keyConverter.configure(
Collections.singletonMap(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, keySchemaEnable),
true);
keyConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
keySchemaEnable
? INCLUDE_SCHEMA_METHOD
: EXCLUDE_SCHEMA_METHOD,
Schema.class,
Object.class)
.get();
}
}
}
if (valueConverter == null) {
synchronized (this) {
if (valueConverter == null) {
valueConverter = new JsonConverter();
valueConverter.configure(
Collections.singletonMap(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, valueSchemaEnable),
false);
valueConverterMethod =
ReflectionUtils.getDeclaredMethod(
JsonConverter.class,
valueSchemaEnable
? INCLUDE_SCHEMA_METHOD
: EXCLUDE_SCHEMA_METHOD,
Schema.class,
Object.class)
.get();
}
}
}

if (extractNewRecordState == null) {
synchronized (this) {
this.extractNewRecordState = new ExtractNewRecordState<>();
Map<String, Object> transformsConfiguration = new HashMap<>();
transformsConfiguration.put(
ExtractNewRecordStateConfigDefinition.ADD_HEADERS.name(),
"op,source.db,source.table,source.server_id,source.gtid");
transformsConfiguration.put(
ExtractNewRecordStateConfigDefinition.HANDLE_DELETES.name(),
ExtractNewRecordStateConfigDefinition.DeleteHandling.REWRITE);
this.extractNewRecordState.configure(transformsConfiguration);
}
}
}
}
Loading

0 comments on commit 9995a38

Please sign in to comment.