From 27588aa84824ded5a7294a8ba72004a38f6365ab Mon Sep 17 00:00:00 2001 From: hailin0 Date: Fri, 3 Nov 2023 18:22:18 +0800 Subject: [PATCH] [Bug][CDC] Fix state recovery error when switching a single table to multiple tables --- .../api/table/catalog/CatalogTableUtil.java | 16 +++-- ...SeaTunnelRowDebeziumDeserializeSchema.java | 15 ++-- .../MongodbIncrementalSourceFactory.java | 2 +- ...MongoDBConnectorDeserializationSchema.java | 70 +++++++++++++------ .../source/MySqlIncrementalSourceFactory.java | 2 +- .../SqlServerIncrementalSourceFactory.java | 2 +- .../seatunnel/cdc/mysql/MysqlCDCIT.java | 6 ++ 7 files changed, 78 insertions(+), 35 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index c6a22e96c5c0..043975491484 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -151,13 +151,17 @@ public static SeaTunnelDataType convertToDataType( if (catalogTables.size() == 1) { return catalogTables.get(0).getTableSchema().toPhysicalRowDataType(); } else { - Map rowTypeMap = new HashMap<>(); - for (CatalogTable catalogTable : catalogTables) { - String tableId = catalogTable.getTableId().toTablePath().toString(); - rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); - } - return new MultipleRowType(rowTypeMap); + return convertToMultipleRowType(catalogTables); + } + } + + public static MultipleRowType convertToMultipleRowType(List catalogTables) { + Map rowTypeMap = new HashMap<>(); + for (CatalogTable catalogTable : catalogTables) { + String tableId = catalogTable.getTableId().toTablePath().toString(); + rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); } + return new MultipleRowType(rowTypeMap); } // We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index ea0a3fc13e74..3e86a6603db4 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory; @@ -233,12 +234,14 @@ public SchemaChangeResolver getSchemaChangeResolver() { @Override public void restoreCheckpointProducedType(SeaTunnelDataType checkpointDataType) { - if (!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) { - throw new IllegalStateException( - String.format( - "The produced type %s of the SeaTunnel deserialization schema " - + "doesn't match the type %s of the restored snapshot.", - resultTypeInfo.getSqlType(), checkpointDataType.getSqlType())); + if (SqlType.ROW.equals(checkpointDataType.getSqlType()) + && SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) { + // TODO: Older versions may have this issue + log.warn( + "Skip incompatible restore type. produced type: {}, checkpoint type: {}", + resultTypeInfo, + checkpointDataType); + return; } if (checkpointDataType instanceof MultipleRowType) { MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java index 761d36f8a25e..07f85fa2a68b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java @@ -77,7 +77,7 @@ TableSource createSource(TableSourceFactoryContext context) { CatalogTableUtil.getCatalogTables( context.getOptions(), context.getClassLoader()); SeaTunnelDataType dataType = - CatalogTableUtil.convertToDataType(catalogTables); + CatalogTableUtil.convertToMultipleRowType(catalogTables); return (SeaTunnelSource) new MongodbIncrementalSource<>(context.getOptions(), dataType, catalogTables); }; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java index 4df666d2add6..80ba3052fc26 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -41,6 +42,7 @@ import org.bson.types.Decimal128; import com.mongodb.client.model.changestream.OperationType; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; @@ -67,17 +69,17 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; +@Slf4j public class MongoDBConnectorDeserializationSchema implements DebeziumDeserializationSchema { - private final SeaTunnelDataType resultTypeInfo; - private final DeserializationRuntimeConverter physicalConverter; + private final Map tableRowConverters; public MongoDBConnectorDeserializationSchema( SeaTunnelDataType physicalDataType, SeaTunnelDataType resultTypeInfo) { - this.physicalConverter = createConverter(physicalDataType); + this.tableRowConverters = createConverter(physicalDataType); this.resultTypeInfo = resultTypeInfo; } @@ -92,29 +94,44 @@ public void deserialize(@Nonnull SourceRecord record, Collector ou Objects.requireNonNull( extractBsonDocument(value, valueSchema, DOCUMENT_KEY))); BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT); + String tableId = extractTableId(record); + DeserializationRuntimeConverter tableRowConverter; + if (tableId == null && tableRowConverters.size() == 1) { + tableRowConverter = tableRowConverters.get(0); + } else { + tableRowConverter = tableRowConverters.get(tableId); + } + if (tableRowConverter == null) { + log.debug("Ignore newly added table {}", tableId); + return; + } switch (op) { case INSERT: - SeaTunnelRow insert = extractRowData(fullDocument); + SeaTunnelRow insert = extractRowData(tableRowConverter, fullDocument); insert.setRowKind(RowKind.INSERT); + insert.setTableId(tableId); emit(record, insert, out); break; case DELETE: - SeaTunnelRow delete = extractRowData(documentKey); + SeaTunnelRow delete = extractRowData(tableRowConverter, documentKey); delete.setRowKind(RowKind.DELETE); + delete.setTableId(tableId); emit(record, delete, out); break; case UPDATE: if (fullDocument == null) { break; } - SeaTunnelRow updateAfter = extractRowData(fullDocument); + SeaTunnelRow updateAfter = extractRowData(tableRowConverter, fullDocument); updateAfter.setRowKind(RowKind.UPDATE_AFTER); + updateAfter.setTableId(tableId); emit(record, updateAfter, out); break; case REPLACE: - SeaTunnelRow replaceAfter = extractRowData(fullDocument); + SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, fullDocument); replaceAfter.setRowKind(RowKind.UPDATE_AFTER); + replaceAfter.setTableId(tableId); emit(record, replaceAfter, out); break; case INVALIDATE: @@ -145,9 +162,15 @@ private void emit( collector.collect(physicalRow); } - private SeaTunnelRow extractRowData(BsonDocument document) { + private SeaTunnelRow extractRowData( + DeserializationRuntimeConverter tableRowConverter, BsonDocument document) { checkNotNull(document); - return (SeaTunnelRow) physicalConverter.convert(document); + return (SeaTunnelRow) tableRowConverter.convert(document); + } + + private String extractTableId(SourceRecord record) { + // TODO extract table id from record + return null; } // ------------------------------------------------------------------------------------- @@ -159,17 +182,24 @@ public interface DeserializationRuntimeConverter extends Serializable { Object convert(BsonValue bsonValue); } - public DeserializationRuntimeConverter createConverter(SeaTunnelDataType type) { - SerializableFunction internalRowConverter = - createNullSafeInternalConverter(type); - return new DeserializationRuntimeConverter() { - private static final long serialVersionUID = 1L; - - @Override - public Object convert(BsonValue bsonValue) { - return internalRowConverter.apply(bsonValue); - } - }; + public Map createConverter( + SeaTunnelDataType inputDataType) { + Map tableRowConverters = new HashMap<>(); + for (Map.Entry item : (MultipleRowType) inputDataType) { + SerializableFunction internalRowConverter = + createNullSafeInternalConverter(item.getValue()); + DeserializationRuntimeConverter itemRowConverter = + new DeserializationRuntimeConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(BsonValue bsonValue) { + return internalRowConverter.apply(bsonValue); + } + }; + tableRowConverters.put(item.getKey(), itemRowConverter); + } + return tableRowConverters; } private static SerializableFunction createNullSafeInternalConverter( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 90e76e835c62..1ec94c3cfc2d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -97,7 +97,7 @@ TableSource createSource(TableSourceFactoryContext context) { CatalogTableUtil.getCatalogTables( context.getOptions(), context.getClassLoader()); SeaTunnelDataType dataType = - CatalogTableUtil.convertToDataType(catalogTables); + CatalogTableUtil.convertToMultipleRowType(catalogTables); return (SeaTunnelSource) new MySqlIncrementalSource<>(context.getOptions(), dataType, catalogTables); }; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java index 41623937af00..6338d85aa2f5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java @@ -102,7 +102,7 @@ TableSource createSource(TableSourceFactoryContext context) { CatalogTableUtil.getCatalogTables( context.getOptions(), context.getClassLoader()); SeaTunnelDataType dataType = - CatalogTableUtil.convertToDataType(catalogTables); + CatalogTableUtil.convertToMultipleRowType(catalogTables); return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables); }; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index a035ec4caacc..6b3519f53606 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -324,6 +324,12 @@ public void testMultiTableWithRestore(TestContainer container) getSourceQuerySQL( MYSQL_DATABASE2, SOURCE_TABLE_2))))); + + log.info("****************** container logs start ******************"); + String containerLogs = container.getServerLogs(); + log.info(containerLogs); + Assertions.assertFalse(containerLogs.contains("ERROR")); + log.info("****************** container logs end ******************"); } private Connection getJdbcConnection() throws SQLException {