Skip to content

Commit

Permalink
[Bug][CDC] Fix state recovery error when switching a single table to …
Browse files Browse the repository at this point in the history
…multiple tables (#5784)
  • Loading branch information
hailin0 authored Nov 7, 2023
1 parent d908f0a commit 37fcff3
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,17 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
if (catalogTables.size() == 1) {
return catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
} else {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
}
return new MultipleRowType(rowTypeMap);
return convertToMultipleRowType(catalogTables);
}
}

public static MultipleRowType convertToMultipleRowType(List<CatalogTable> catalogTables) {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
}
return new MultipleRowType(rowTypeMap);
}

// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
Expand Down Expand Up @@ -233,12 +234,14 @@ public SchemaChangeResolver getSchemaChangeResolver() {

@Override
public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> checkpointDataType) {
if (!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) {
throw new IllegalStateException(
String.format(
"The produced type %s of the SeaTunnel deserialization schema "
+ "doesn't match the type %s of the restored snapshot.",
resultTypeInfo.getSqlType(), checkpointDataType.getSqlType()));
if (SqlType.ROW.equals(checkpointDataType.getSqlType())
&& SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
// TODO: Older versions may have this issue
log.warn(
"Skip incompatible restore type. produced type: {}, checkpoint type: {}",
resultTypeInfo,
checkpointDataType);
return;
}
if (checkpointDataType instanceof MultipleRowType) {
MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MongodbIncrementalSource<>(context.getOptions(), dataType, catalogTables);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -41,6 +42,7 @@
import org.bson.types.Decimal128;

import com.mongodb.client.model.changestream.OperationType;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;

Expand All @@ -67,17 +69,17 @@
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

@Slf4j
public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializationSchema<SeaTunnelRow> {

private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;

private final DeserializationRuntimeConverter physicalConverter;
private final Map<String, DeserializationRuntimeConverter> tableRowConverters;

public MongoDBConnectorDeserializationSchema(
SeaTunnelDataType<SeaTunnelRow> physicalDataType,
SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
this.physicalConverter = createConverter(physicalDataType);
this.tableRowConverters = createConverter(physicalDataType);
this.resultTypeInfo = resultTypeInfo;
}

Expand All @@ -92,29 +94,44 @@ public void deserialize(@Nonnull SourceRecord record, Collector<SeaTunnelRow> ou
Objects.requireNonNull(
extractBsonDocument(value, valueSchema, DOCUMENT_KEY)));
BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT);
String tableId = extractTableId(record);
DeserializationRuntimeConverter tableRowConverter;
if (tableId == null && tableRowConverters.size() == 1) {
tableRowConverter = tableRowConverters.values().iterator().next();
} else {
tableRowConverter = tableRowConverters.get(tableId);
}
if (tableRowConverter == null) {
log.debug("Ignore newly added table {}", tableId);
return;
}

switch (op) {
case INSERT:
SeaTunnelRow insert = extractRowData(fullDocument);
SeaTunnelRow insert = extractRowData(tableRowConverter, fullDocument);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
emit(record, insert, out);
break;
case DELETE:
SeaTunnelRow delete = extractRowData(documentKey);
SeaTunnelRow delete = extractRowData(tableRowConverter, documentKey);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
emit(record, delete, out);
break;
case UPDATE:
if (fullDocument == null) {
break;
}
SeaTunnelRow updateAfter = extractRowData(fullDocument);
SeaTunnelRow updateAfter = extractRowData(tableRowConverter, fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
updateAfter.setTableId(tableId);
emit(record, updateAfter, out);
break;
case REPLACE:
SeaTunnelRow replaceAfter = extractRowData(fullDocument);
SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
replaceAfter.setTableId(tableId);
emit(record, replaceAfter, out);
break;
case INVALIDATE:
Expand Down Expand Up @@ -145,9 +162,15 @@ private void emit(
collector.collect(physicalRow);
}

private SeaTunnelRow extractRowData(BsonDocument document) {
private SeaTunnelRow extractRowData(
DeserializationRuntimeConverter tableRowConverter, BsonDocument document) {
checkNotNull(document);
return (SeaTunnelRow) physicalConverter.convert(document);
return (SeaTunnelRow) tableRowConverter.convert(document);
}

private String extractTableId(SourceRecord record) {
// TODO extract table id from record
return null;
}

// -------------------------------------------------------------------------------------
Expand All @@ -159,17 +182,24 @@ public interface DeserializationRuntimeConverter extends Serializable {
Object convert(BsonValue bsonValue);
}

public DeserializationRuntimeConverter createConverter(SeaTunnelDataType<?> type) {
SerializableFunction<BsonValue, Object> internalRowConverter =
createNullSafeInternalConverter(type);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(BsonValue bsonValue) {
return internalRowConverter.apply(bsonValue);
}
};
public Map<String, DeserializationRuntimeConverter> createConverter(
SeaTunnelDataType<?> inputDataType) {
Map<String, DeserializationRuntimeConverter> tableRowConverters = new HashMap<>();
for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) inputDataType) {
SerializableFunction<BsonValue, Object> internalRowConverter =
createNullSafeInternalConverter(item.getValue());
DeserializationRuntimeConverter itemRowConverter =
new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(BsonValue bsonValue) {
return internalRowConverter.apply(bsonValue);
}
};
tableRowConverters.put(item.getKey(), itemRowConverter);
}
return tableRowConverters;
}

private static SerializableFunction<BsonValue, Object> createNullSafeInternalConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MySqlIncrementalSource<>(context.getOptions(), dataType, catalogTables);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ public void testMultiTableWithRestore(TestContainer container)
getSourceQuerySQL(
MYSQL_DATABASE2,
SOURCE_TABLE_2)))));

log.info("****************** container logs start ******************");
String containerLogs = container.getServerLogs();
log.info(containerLogs);
Assertions.assertFalse(containerLogs.contains("ERROR"));
log.info("****************** container logs end ******************");
}

private Connection getJdbcConnection() throws SQLException {
Expand Down

0 comments on commit 37fcff3

Please sign in to comment.