From 97387bfb031b8581f846782962b94e1c0f2aa3e9 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Mon, 15 Jan 2024 19:54:28 +0800 Subject: [PATCH] [Improve][Oracle-CDC] Clean unused code --- .../source/OracleIncrementalSource.java | 28 +------------------ .../cdc/oracle/utils/OracleTypeUtils.java | 16 ----------- 2 files changed, 1 insertion(+), 43 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java index 5c942f8b50c..933c8cdc37d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java @@ -37,7 +37,6 @@ import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat; import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema; import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema; -import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffsetFactory; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; @@ -45,24 +44,18 @@ import org.apache.kafka.connect.data.Struct; import com.google.auto.service.AutoService; -import io.debezium.connector.oracle.OracleConnection; import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.history.ConnectTableChangeSerializer; import io.debezium.relational.history.TableChanges; import lombok.NoArgsConstructor; -import java.sql.SQLException; import java.time.ZoneId; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection; -import static org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleTypeUtils.convertFromTable; - @NoArgsConstructor @AutoService(SeaTunnelSource.class) public class OracleIncrementalSource extends IncrementalSource @@ -116,26 +109,7 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES)); } - SeaTunnelDataType physicalRowType; - if (dataType == null) { - OracleSourceConfig oracleSourceConfig = - (OracleSourceConfig) this.configFactory.create(0); - TableId tableId = - this.dataSourceDialect.discoverDataCollections(oracleSourceConfig).get(0); - Table table; - try (OracleConnection oracleConnection = - createOracleConnection(oracleSourceConfig.getDbzConfiguration())) { - table = - ((OracleDialect) dataSourceDialect) - .queryTableSchema(oracleConnection, tableId) - .getTable(); - } catch (SQLException e) { - throw new SeaTunnelException(e); - } - physicalRowType = convertFromTable(table); - } else { - physicalRowType = dataType; - } + SeaTunnelDataType physicalRowType = dataType; String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE); return (DebeziumDeserializationSchema) SeaTunnelRowDebeziumDeserializeSchema.builder() diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java index 8a7ddc91d8e..7e23fbf3a19 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java @@ -22,14 +22,11 @@ import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import io.debezium.relational.Column; -import io.debezium.relational.Table; import oracle.jdbc.OracleTypes; import java.sql.Types; -import java.util.List; /** Utilities for converting from oracle types to SeaTunnel types. */ public class OracleTypeUtils { @@ -78,17 +75,4 @@ public static SeaTunnelDataType convertFromColumn(Column column) { column.typeName(), column.jdbcType())); } } - - public static SeaTunnelRowType convertFromTable(Table table) { - - List columns = table.columns(); - String[] fieldNames = columns.stream().map(Column::name).toArray(String[]::new); - - SeaTunnelDataType[] fieldTypes = - columns.stream() - .map(OracleTypeUtils::convertFromColumn) - .toArray(SeaTunnelDataType[]::new); - - return new SeaTunnelRowType(fieldNames, fieldTypes); - } }