Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement logic to trigger snapshot of new tables via debezium #13994

Merged
merged 12 commits into from
Jun 29, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.CompositeIterator;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
Expand All @@ -19,9 +18,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.debezium.engine.ChangeEvent;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -44,32 +41,58 @@ public class AirbyteDebeziumHandler {
*/
private static final int QUEUE_CAPACITY = 10000;

private final Properties connectorProperties;
private final JsonNode config;
private final CdcTargetPosition targetPosition;
private final ConfiguredAirbyteCatalog catalog;
private final boolean trackSchemaHistory;

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition targetPosition,
final Properties connectorProperties,
final ConfiguredAirbyteCatalog catalog,
final boolean trackSchemaHistory) {
this.config = config;
this.targetPosition = targetPosition;
this.connectorProperties = connectorProperties;
this.catalog = catalog;
this.trackSchemaHistory = trackSchemaHistory;
this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt) {
public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(final ConfiguredAirbyteCatalog catalog,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Instant emittedAt) {
LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
/*
* TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this
* for MySQL and MSSQL
*/
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = Optional.empty();
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties,
config,
catalog,
offsetManager,
schemaHistoryManager);
publisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close);

return AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt));
}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final ConfiguredAirbyteCatalog catalog,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Instant emittedAt) {
LOGGER.info("using CDC: {}", true);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager,
Expand Down Expand Up @@ -105,10 +128,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.
final CompositeIterator<AirbyteMessage> messageIteratorWithStateDecorator =
AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));

return Collections.singletonList(messageIteratorWithStateDecorator);
return AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,16 @@ public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcSt
return offsetManager;
}

public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpose() {
final Path cdcWorkingDir;
try {
cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-dummy-state-offset");
} catch (final IOException e) {
throw new RuntimeException(e);
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");

return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,29 @@ protected List<AirbyteStateMessage> extractStateMessages(final List<AirbyteMessa
.collect(Collectors.toList());
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords, final Set<AirbyteRecordMessage> actualRecords) {
protected void assertExpectedRecords(final Set<JsonNode> expectedRecords, final Set<AirbyteRecordMessage> actualRecords) {
// assume all streams are cdc.
assertExpectedRecords(expectedRecords, actualRecords, actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet()));
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams) {
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES);
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES, MODELS_SCHEMA);
}

private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams,
final Set<String> streamNames) {
protected void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams,
final Set<String> streamNames,
final String namespace) {
final Set<JsonNode> actualData = actualRecords
.stream()
.map(recordMessage -> {
assertTrue(streamNames.contains(recordMessage.getStream()));
assertNotNull(recordMessage.getEmittedAt());

assertEquals(MODELS_SCHEMA, recordMessage.getNamespace());
assertEquals(namespace, recordMessage.getNamespace());

final JsonNode data = recordMessage.getData();

Expand Down Expand Up @@ -482,7 +483,8 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.collect(Collectors.toSet()),
recordMessages1,
Collections.singleton(MODELS_STREAM_NAME),
names);
names,
MODELS_SCHEMA);

final JsonNode puntoRecord = Jsons
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
Expand All @@ -503,7 +505,8 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.collect(Collectors.toSet()),
recordMessages2,
Collections.singleton(MODELS_STREAM_NAME),
names);
names,
MODELS_SCHEMA);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,31 @@

public class PostgresCdcProperties {

static Properties getDebeziumProperties(final JsonNode config) {
final Properties props = new Properties();
static Properties getDebeziumDefaultProperties(final JsonNode config) {
final Properties props = commonProperties();
props.setProperty("plugin.name", PostgresUtils.getPluginValue(config.get("replication_method")));
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty("snapshot.mode", "initial");

props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText());
props.setProperty("publication.name", config.get("replication_method").get("publication").asText());

props.setProperty("publication.autocreate.mode", "disabled");

return props;
}

private static Properties commonProperties() {
final Properties props = new Properties();
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter");
return props;
}

static Properties getSnapshotProperties() {
final Properties props = commonProperties();
props.setProperty("snapshot.mode", "initial_only");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
Expand All @@ -32,6 +34,7 @@
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
Expand All @@ -47,10 +50,12 @@
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,7 +114,7 @@ public JsonNode toDatabaseConfigStatic(final JsonNode config) {

additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
final Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", jdbcUrl.toString());

Expand Down Expand Up @@ -220,7 +225,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
// create it.
final AirbyteConnectionStatus check = check(config);

if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
if (check.getStatus().equals(Status.FAILED)) {
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
}

Expand All @@ -237,12 +242,22 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database),
PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false);
return handler.getIncrementalIterators(
PostgresCdcTargetPosition.targetPosition(database), false);
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(),
new PostgresCdcStateHandler(stateManager),
new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getDebeziumDefaultProperties(sourceConfig),
emittedAt);
if (streamsToSnapshot.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
}

final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getSnapshotProperties(), emittedAt);
return Collections.singletonList(AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier)));

} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
Expand All @@ -255,6 +270,11 @@ private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
return any.isPresent();
}

//TODO(Subodh) : add logic of identifying new streams after PR https://github.com/airbytehq/airbyte/pull/13609 is merged
protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) {
return Collections.emptyList();
}

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
Expand Down
Loading