Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement logic to trigger snapshot of new tables via debezium #13994

Merged
merged 12 commits into from
Jun 29, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.CompositeIterator;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
Expand All @@ -21,9 +20,7 @@
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -46,32 +43,58 @@ public class AirbyteDebeziumHandler {
*/
private static final int QUEUE_CAPACITY = 10000;

private final Properties connectorProperties;
private final JsonNode config;
private final CdcTargetPosition targetPosition;
private final ConfiguredAirbyteCatalog catalog;
private final boolean trackSchemaHistory;

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition targetPosition,
final Properties connectorProperties,
final ConfiguredAirbyteCatalog catalog,
final boolean trackSchemaHistory) {
this.config = config;
this.targetPosition = targetPosition;
this.connectorProperties = connectorProperties;
this.catalog = catalog;
this.trackSchemaHistory = trackSchemaHistory;
this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt) {
public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(final ConfiguredAirbyteCatalog catalog,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Instant emittedAt) {
LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
/*
* TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this
* for MySQL and MSSQL
*/
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = Optional.empty();
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties,
config,
catalog,
offsetManager,
schemaHistoryManager);
publisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close);

return AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt));
}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final ConfiguredAirbyteCatalog catalog,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Instant emittedAt) {
LOGGER.info("using CDC: {}", true);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager,
Expand Down Expand Up @@ -107,10 +130,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.
final CompositeIterator<AirbyteMessage> messageIteratorWithStateDecorator =
AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));

return Collections.singletonList(messageIteratorWithStateDecorator);
return AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,16 @@ public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcSt
return offsetManager;
}

public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpose() {
final Path cdcWorkingDir;
try {
cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-dummy-state-offset");
} catch (final IOException e) {
throw new RuntimeException(e);
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");

return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -61,6 +62,29 @@ public abstract class CdcSourceTest {
protected static final String COL_MAKE_ID = "make_id";
protected static final String COL_MODEL = "model";

protected final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Fiesta-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Focus-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Ranger-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"GLA-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"A 220-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"E 350-random")));

protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
Expand Down Expand Up @@ -157,28 +181,6 @@ private void createAndPopulateRandomTable() {
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Fiesta-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Focus-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Ranger-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"GLA-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"A 220-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"E 350-random")));
for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
Expand All @@ -189,13 +191,13 @@ protected void writeModelRecord(final JsonNode recordJson) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL);
}

private void writeRecords(
final JsonNode recordJson,
final String dbName,
final String streamName,
final String idCol,
final String makeIdCol,
final String modelCol) {
protected void writeRecords(
final JsonNode recordJson,
final String dbName,
final String streamName,
final String idCol,
final String makeIdCol,
final String modelCol) {
executeQuery(
String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName,
idCol, makeIdCol, modelCol,
Expand Down Expand Up @@ -223,45 +225,62 @@ protected static Set<AirbyteRecordMessage> removeDuplicates(final Set<AirbyteRec
}

protected Set<AirbyteRecordMessage> extractRecordMessages(final List<AirbyteMessage> messages) {
final List<AirbyteRecordMessage> recordMessageList = messages
.stream()
.filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord)
.collect(Collectors.toList());
final Set<AirbyteRecordMessage> recordMessageSet = new HashSet<>(recordMessageList);
final Map<String, Set<AirbyteRecordMessage>> recordsPerStream = extractRecordMessagesStreamWise(messages);
final Set<AirbyteRecordMessage> consolidatedRecords = new HashSet<>();
recordsPerStream.values().forEach(consolidatedRecords::addAll);
return consolidatedRecords;
}

protected Map<String, Set<AirbyteRecordMessage>> extractRecordMessagesStreamWise(final List<AirbyteMessage> messages) {
final Map<String, List<AirbyteRecordMessage>> recordsPerStream = new HashMap<>();
for (final AirbyteMessage message : messages) {
if (message.getType() == Type.RECORD) {
AirbyteRecordMessage recordMessage = message.getRecord();
recordsPerStream.computeIfAbsent(recordMessage.getStream(), (c) -> new ArrayList<>()).add(recordMessage);
}
}

assertEquals(recordMessageList.size(), recordMessageSet.size(),
"Expected no duplicates in airbyte record message output for a single sync.");
final Map<String, Set<AirbyteRecordMessage>> recordsPerStreamWithNoDuplicates = new HashMap<>();
for (final Map.Entry<String, List<AirbyteRecordMessage>> element : recordsPerStream.entrySet()) {
final String streamName = element.getKey();
final List<AirbyteRecordMessage> records = element.getValue();
final Set<AirbyteRecordMessage> recordMessageSet = new HashSet<>(records);
assertEquals(records.size(), recordMessageSet.size(),
"Expected no duplicates in airbyte record message output for a single sync.");
recordsPerStreamWithNoDuplicates.put(streamName, recordMessageSet);
}

return recordMessageSet;
return recordsPerStreamWithNoDuplicates;
}

protected List<AirbyteStateMessage> extractStateMessages(final List<AirbyteMessage> messages) {
return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState)
.collect(Collectors.toList());
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords, final Set<AirbyteRecordMessage> actualRecords) {
protected void assertExpectedRecords(final Set<JsonNode> expectedRecords, final Set<AirbyteRecordMessage> actualRecords) {
// assume all streams are cdc.
assertExpectedRecords(expectedRecords, actualRecords, actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet()));
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams) {
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES);
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES, MODELS_SCHEMA);
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams,
final Set<String> streamNames) {
protected void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams,
final Set<String> streamNames,
final String namespace) {
final Set<JsonNode> actualData = actualRecords
.stream()
.map(recordMessage -> {
assertTrue(streamNames.contains(recordMessage.getStream()));
assertNotNull(recordMessage.getEmittedAt());

assertEquals(MODELS_SCHEMA, recordMessage.getNamespace());
assertEquals(namespace, recordMessage.getNamespace());

final JsonNode data = recordMessage.getData();

Expand Down Expand Up @@ -482,7 +501,8 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.collect(Collectors.toSet()),
recordMessages1,
Collections.singleton(MODELS_STREAM_NAME),
names);
names,
MODELS_SCHEMA);

final JsonNode puntoRecord = Jsons
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
Expand All @@ -503,7 +523,8 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.collect(Collectors.toSet()),
recordMessages2,
Collections.singleton(MODELS_STREAM_NAME),
names);
names,
MODELS_SCHEMA);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,31 @@

public class PostgresCdcProperties {

static Properties getDebeziumProperties(final JsonNode config) {
final Properties props = new Properties();
static Properties getDebeziumDefaultProperties(final JsonNode config) {
final Properties props = commonProperties();
props.setProperty("plugin.name", PostgresUtils.getPluginValue(config.get("replication_method")));
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty("snapshot.mode", "initial");

props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText());
props.setProperty("publication.name", config.get("replication_method").get("publication").asText());

props.setProperty("publication.autocreate.mode", "disabled");

return props;
}

private static Properties commonProperties() {
final Properties props = new Properties();
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter");
return props;
}

static Properties getSnapshotProperties() {
final Properties props = commonProperties();
props.setProperty("snapshot.mode", "initial_only");
return props;
}

Expand Down
Loading