Skip to content

Commit

Permalink
[Bug][CDC] Fix state recovery error when switching a single table to …
Browse files Browse the repository at this point in the history
…multiple tables
  • Loading branch information
hailin0 committed Nov 3, 2023
1 parent 70cca95 commit 9bf9a2d
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,17 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
if (catalogTables.size() == 1) {
return catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
} else {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
}
return new MultipleRowType(rowTypeMap);
return convertToMultipleRowType(catalogTables);
}
}

public static MultipleRowType convertToMultipleRowType(List<CatalogTable> catalogTables) {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
}
return new MultipleRowType(rowTypeMap);
}

// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
Expand Down Expand Up @@ -233,12 +234,14 @@ public SchemaChangeResolver getSchemaChangeResolver() {

@Override
public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> checkpointDataType) {
if (!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) {
throw new IllegalStateException(
String.format(
"The produced type %s of the SeaTunnel deserialization schema "
+ "doesn't match the type %s of the restored snapshot.",
resultTypeInfo.getSqlType(), checkpointDataType.getSqlType()));
if (SqlType.ROW.equals(checkpointDataType.getSqlType())
&& SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
// TODO: Older versions may have this issue
log.warn(
"Skip incompatible restore type. produced type: {}, checkpoint type: {}",
resultTypeInfo,
checkpointDataType);
return;
}
if (checkpointDataType instanceof MultipleRowType) {
MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MongodbIncrementalSource<>(context.getOptions(), dataType, catalogTables);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MySqlIncrementalSource<>(context.getOptions(), dataType, catalogTables);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ public void testMultiTableWithRestore(TestContainer container)
getSourceQuerySQL(
MYSQL_DATABASE2,
SOURCE_TABLE_2)))));

log.info("******************container logs start******************");
String containerLogs = container.getServerLogs();
log.info(containerLogs);
Assertions.assertFalse(containerLogs.contains("ERROR"));
log.info("******************container logs end******************");
}

private Connection getJdbcConnection() throws SQLException {
Expand Down

0 comments on commit 9bf9a2d

Please sign in to comment.