Skip to content

Commit

Permalink
Compatible kafka connect json apache#4137
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 7, 2023
1 parent f19ed9f commit d7e9b61
Show file tree
Hide file tree
Showing 15 changed files with 857 additions and 106 deletions.
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-compatible-connect-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ public enum MessageFormat {
TEXT,
CANAL_JSON,
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
Expand Down Expand Up @@ -267,6 +268,10 @@ private void setDeserialization(Config config) {
CanalJsonDeserializationSchema.builder(typeInfo)
.setIgnoreParseErrors(true)
.build();
case COMPATIBLE_KAFKA_CONNECT_JSON:
deserializationSchema =
new CompatibleKafkaConnectDeserializationSchema(
typeInfo, config, false, false);
break;
case DEBEZIUM_JSON:
boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -150,9 +151,18 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
recordList) {

try {
deserializationSchema.deserialize(
record.value(), output);
} catch (Exception e) {
if (deserializationSchema
instanceof
CompatibleKafkaConnectDeserializationSchema) {
((CompatibleKafkaConnectDeserializationSchema)
deserializationSchema)
.deserialize(
record, output);
} else {
deserializationSchema.deserialize(
record.value(), output);
}
} catch (IOException e) {
if (this.messageFormatErrorHandleWay
== MessageFormatErrorHandleWay
.SKIP) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: seatunnel
-- ----------------------------------------------------------------------------------------------------------------

-- Create debezium source record sink table
CREATE TABLE debezium_sink (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight VARCHAR(512)
);

-- Create jdbc source record sink table
CREATE TABLE jdbc_sink (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight VARCHAR(512)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "debezium_source_record"
result_table_name = "kafka_table"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = COMPATIBLE_KAFKA_CONNECT_JSON
from_debezium_record = true
}
}


sink {
Jdbc {
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://mysql_e2e:3306/seatunnel?useSSL=false"
user = st_user
password = seatunnel
generate_sink_sql = true
database = seatunnel
table = debezium_sink
primary_keys = ["id"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "jdbc_source_record"
result_table_name = "kafka_table"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = COMPATIBLE_KAFKA_CONNECT_JSON
from_debezium_record = false
}
}


sink {
Jdbc {
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://mysql_e2e:3306/seatunnel?useSSL=false"
user = st_user
password = seatunnel
generate_sink_sql = true
database = seatunnel
table = jdbc_sink
primary_keys = ["id"]
}
}
1 change: 1 addition & 0 deletions seatunnel-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<module>seatunnel-format-json</module>
<module>seatunnel-format-text</module>
<module>seatunnel-format-compatible-debezium-json</module>
<module>seatunnel-format-compatible-connect-json</module>
</modules>

</project>
59 changes: 59 additions & 0 deletions seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-formats</artifactId>
<version>${revision}</version>
</parent>

<artifactId>seatunnel-format-compatible-connect-json</artifactId>
<name>SeaTunnel : Formats : Compatible Kafka Connect Json</name>
<properties>
<kafka.connect.version>3.4.0</kafka.connect.version>
<debezium.version>2.1.2.Final</debezium.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.connect.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.connect.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${debezium.version}</version>
</dependency>

</dependencies>

</project>
Loading

0 comments on commit d7e9b61

Please sign in to comment.