Skip to content

Commit

Permalink
[Improve][CDC] Clean unused code (#5785)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Nov 3, 2023
1 parent 6c8de62 commit b5a66d3
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
Expand Down Expand Up @@ -112,16 +114,16 @@ public SourceConfig.Factory<MongodbSourceConfig> createSourceConfigFactory(
@Override
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
ReadonlyConfig config) {
SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
config.get(JdbcSourceOptions.FORMAT))) {
return (DebeziumDeserializationSchema<T>)
new DebeziumJsonDeserializeSchema(
config.get(MongodbSourceOptions.DEBEZIUM_PROPERTIES));
} else {
physicalRowType = dataType;
return (DebeziumDeserializationSchema<T>)
new MongoDBConnectorDeserializationSchema(physicalRowType, physicalRowType);
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SeaTunnelDataType<SeaTunnelRow> physicalRowType = dataType;
return (DebeziumDeserializationSchema<T>)
new MongoDBConnectorDeserializationSchema(physicalRowType, physicalRowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
Expand All @@ -44,8 +41,6 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -107,20 +102,7 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
// TODO: support metadata keys
try (Catalog catalog =
new MySqlCatalogFactory().createCatalog(DatabaseIdentifier.MYSQL, config)) {
catalog.open();
CatalogTable table =
catalog.getTable(
TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
physicalRowType = table.getTableSchema().toPhysicalRowDataType();
}
} else {
physicalRowType = dataType;
}
SeaTunnelDataType<SeaTunnelRow> physicalRowType = dataType;
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
Expand All @@ -38,24 +37,17 @@
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

import com.google.auto.service.AutoService;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;

import java.time.ZoneId;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
Expand Down Expand Up @@ -110,26 +102,7 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);
Table table;
try (SqlServerConnection sqlServerConnection =
createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration())) {
table =
((SqlServerDialect) dataSourceDialect)
.queryTableSchema(sqlServerConnection, tableId)
.getTable();
} catch (Exception e) {
throw new SeaTunnelException(e);
}
physicalRowType = convertFromTable(table);
} else {
physicalRowType = dataType;
}
SeaTunnelDataType<SeaTunnelRow> physicalRowType = dataType;
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import io.debezium.relational.Column;
import io.debezium.relational.Table;

import java.util.List;

/** Utilities for converting from SqlServer types to SeaTunnel types. */
public class SqlServerTypeUtils {
Expand Down Expand Up @@ -130,17 +126,4 @@ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
column.typeName(), column.jdbcType()));
}
}

public static SeaTunnelRowType convertFromTable(Table table) {

List<Column> columns = table.columns();
String[] fieldNames = columns.stream().map(Column::name).toArray(String[]::new);

SeaTunnelDataType<?>[] fieldTypes =
columns.stream()
.map(SqlServerTypeUtils::convertFromColumn)
.toArray(SeaTunnelDataType[]::new);

return new SeaTunnelRowType(fieldNames, fieldTypes);
}
}

0 comments on commit b5a66d3

Please sign in to comment.