diff --git a/airbyte-integrations/bases/debezium/build.gradle b/airbyte-integrations/bases/debezium-v1-4-2/build.gradle similarity index 100% rename from airbyte-integrations/bases/debezium/build.gradle rename to airbyte-integrations/bases/debezium-v1-4-2/build.gradle diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java diff --git a/airbyte-integrations/bases/debezium/src/test/resources/delete_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/delete_change_event.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_change_event.json diff --git a/airbyte-integrations/bases/debezium/src/test/resources/delete_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_message.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/delete_message.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_message.json diff --git a/airbyte-integrations/bases/debezium/src/test/resources/insert_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/insert_change_event.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_change_event.json diff --git a/airbyte-integrations/bases/debezium/src/test/resources/insert_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_message.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/insert_message.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_message.json diff --git a/airbyte-integrations/bases/debezium/src/test/resources/test_debezium_offset.dat b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/test_debezium_offset.dat similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/test_debezium_offset.dat rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/test_debezium_offset.dat diff --git a/airbyte-integrations/bases/debezium/src/test/resources/update_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/update_change_event.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_change_event.json diff --git a/airbyte-integrations/bases/debezium/src/test/resources/update_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_message.json similarity index 100% rename from airbyte-integrations/bases/debezium/src/test/resources/update_message.json rename to airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_message.json diff --git a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java similarity index 100% rename from airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java rename to airbyte-integrations/bases/debezium-v1-4-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-2/build.gradle b/airbyte-integrations/bases/debezium-v1-9-2/build.gradle new file mode 100644 index 000000000000..6359e00d400a --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/build.gradle @@ -0,0 +1,27 @@ +plugins { + id "java-test-fixtures" +} + +project.configurations { + testFixturesImplementation.extendsFrom implementation +} +dependencies { + implementation project(':airbyte-protocol:protocol-models') + implementation project(':airbyte-db:db-lib') + + implementation 'io.debezium:debezium-api:1.9.2.Final' + implementation 'io.debezium:debezium-embedded:1.9.2.Final' +// commented out because source mysql and sqlserver do not yet support the new cdc implementation +// implementation 'io.debezium:debezium-connector-sqlserver:1.9.2.Final' +// implementation 'io.debezium:debezium-connector-mysql:1.9.2.Final' + implementation 'io.debezium:debezium-connector-postgres:1.9.2.Final' + implementation 'org.codehaus.plexus:plexus-utils:3.4.2' + + testFixturesImplementation project(':airbyte-db:db-lib') + testFixturesImplementation project(':airbyte-integrations:bases:base-java') + + testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2' + testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2' + testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2' + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java new file mode 100644 index 000000000000..9d89d5a5a781 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.commons.util.CompositeIterator; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; +import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage; +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.integrations.debezium.internals.DebeziumRecordIterator; +import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; +import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.debezium.engine.ChangeEvent; +import java.time.Instant; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants + * to use debezium for CDC, it should use this class + */ +public class AirbyteDebeziumHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class); + /** + * We use 10000 as capacity cause the default queue size and batch size of debezium is : + * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048 + * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192 + */ + private static final int QUEUE_CAPACITY = 10000; + + private final Properties connectorProperties; + private final JsonNode config; + private final CdcTargetPosition targetPosition; + private final ConfiguredAirbyteCatalog catalog; + private final boolean trackSchemaHistory; + + private final LinkedBlockingQueue> queue; + + public AirbyteDebeziumHandler(final JsonNode config, + final CdcTargetPosition targetPosition, + final Properties connectorProperties, + final ConfiguredAirbyteCatalog catalog, + final boolean trackSchemaHistory) { + this.config = config; + this.targetPosition = targetPosition; + this.connectorProperties = connectorProperties; + this.catalog = catalog; + this.trackSchemaHistory = trackSchemaHistory; + this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + } + + public List> getIncrementalIterators(final CdcSavedInfoFetcher cdcSavedInfoFetcher, + final CdcStateHandler cdcStateHandler, + final CdcMetadataInjector cdcMetadataInjector, + final Instant emittedAt) { + LOGGER.info("using CDC: {}", true); + final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset()); + final Optional schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher); + final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager, + schemaHistoryManager); + publisher.start(queue); + + // handle state machine around pub/sub logic. + final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( + queue, + targetPosition, + publisher::hasClosed, + publisher::close); + + // convert to airbyte message. + final AutoCloseableIterator messageIterator = AutoCloseableIterators + .transform( + eventIterator, + (event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt)); + + // our goal is to get the state at the time this supplier is called (i.e. after all message records + // have been produced) + final Supplier stateMessageSupplier = () -> { + final Map offset = offsetManager.read(); + final String dbHistory = trackSchemaHistory ? schemaHistoryManager + .orElseThrow(() -> new RuntimeException("Schema History Tracking is true but manager is not initialised")).read() : null; + + return cdcStateHandler.saveState(offset, dbHistory); + }; + + // wrap the supplier in an iterator so that we can concat it to the message iterator. + final Iterator stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier); + + // this structure guarantees that the debezium engine will be closed, before we attempt to emit the + // state file. we want this so that we have a guarantee that the debezium offset file (which we use + // to produce the state file) is up-to-date. + final CompositeIterator messageIteratorWithStateDecorator = + AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator)); + + return Collections.singletonList(messageIteratorWithStateDecorator); + } + + private Optional schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) { + if (trackSchemaHistory) { + FilteredFileDatabaseHistory.setDatabaseName(config.get("database").asText()); + return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(cdcSavedInfoFetcher.getSavedSchemaHistory())); + } + + return Optional.empty(); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java new file mode 100644 index 000000000000..cd99773d99f1 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * This interface is used to add metadata to the records fetched from the database. For instance, in + * Postgres we add the lsn to the records. In MySql we add the file name and position to the + * records. + */ +public interface CdcMetadataInjector { + + /** + * A debezium record contains multiple pieces. Ref : + * https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events + * + * @param event is the actual record which contains data and would be written to the destination + * @param source contains the metadata about the record and we need to extract that metadata and add + * it to the event before writing it to destination + */ + void addMetaData(ObjectNode event, JsonNode source); + + /** + * As part of Airbyte record we need to add the namespace (schema name) + * + * @param source part of debezium record and contains the metadata about the record. We need to + * extract namespace out of this metadata and return Ref : + * https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events + */ + String namespace(JsonNode source); + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java new file mode 100644 index 000000000000..a0efa36f05a8 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Optional; + +/** + * This interface is used to fetch the saved info required for debezium to run incrementally. Each + * connector saves offset and schema history in different manner + */ +public interface CdcSavedInfoFetcher { + + JsonNode getSavedOffset(); + + Optional getSavedSchemaHistory(); + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java new file mode 100644 index 000000000000..7b76186fc9c1 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import io.airbyte.protocol.models.AirbyteMessage; +import java.util.Map; + +/** + * This interface is used to allow connectors to save the offset and schema history in the manner + * which suits them + */ +@FunctionalInterface +public interface CdcStateHandler { + + AirbyteMessage saveState(Map offset, String dbHistory); + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java new file mode 100644 index 000000000000..47209ada28f7 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * This interface is used to define the target position at the beginning of the sync so that once we + * reach the desired target, we can shutdown the sync. This is needed because it might happen that + * while we are syncing the data, new changes are being made in the source database and as a result + * we might end up syncing forever. In order to tackle that, we need to define a point to end at the + * beginning of the sync + */ +public interface CdcTargetPosition { + + boolean reachedTargetPosition(JsonNode valueAsJson); + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java new file mode 100644 index 000000000000..89dbd3d1f472 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.json.Jsons; +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles reading and writing a debezium offset file. In many cases it is duplicating + * logic in debezium because that logic is not exposed in the public API. We mostly treat the + * contents of this state file like a black box. We know it is a Map<ByteBuffer, Bytebuffer>. + * We deserialize it to a Map<String, String> so that the state file can be human readable. If + * we ever discover that any of the contents of these offset files is not string serializable we + * will likely have to drop the human readability support and just base64 encode it. + */ +public class AirbyteFileOffsetBackingStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class); + + private final Path offsetFilePath; + + public AirbyteFileOffsetBackingStore(final Path offsetFilePath) { + this.offsetFilePath = offsetFilePath; + } + + public Path getOffsetFilePath() { + return offsetFilePath; + } + + public Map read() { + final Map raw = load(); + + return raw.entrySet().stream().collect(Collectors.toMap( + e -> byteBufferToString(e.getKey()), + e -> byteBufferToString(e.getValue()))); + } + + @SuppressWarnings("unchecked") + public void persist(final JsonNode cdcState) { + final Map mapAsString = + cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap(); + final Map mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap( + e -> stringToByteBuffer(e.getKey()), + e -> stringToByteBuffer(e.getValue()))); + + FileUtils.deleteQuietly(offsetFilePath.toFile()); + save(mappedAsStrings); + } + + private static String byteBufferToString(final ByteBuffer byteBuffer) { + Preconditions.checkNotNull(byteBuffer); + return new String(byteBuffer.array(), StandardCharsets.UTF_8); + } + + private static ByteBuffer stringToByteBuffer(final String s) { + Preconditions.checkNotNull(s); + return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + } + + /** + * See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this + * method is not public. + */ + @SuppressWarnings("unchecked") + private Map load() { + try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) { + // todo (cgardens) - we currently suppress a security warning for this line. use of readObject from + // untrusted sources is considered unsafe. Since the source is controlled by us in this case it + // should be safe. That said, changing this implementation to not use readObject would remove some + // headache. + final Object obj = is.readObject(); + if (!(obj instanceof HashMap)) + throw new ConnectException("Expected HashMap but found " + obj.getClass()); + final Map raw = (Map) obj; + final Map data = new HashMap<>(); + for (final Map.Entry mapEntry : raw.entrySet()) { + final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; + final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; + data.put(key, value); + } + + return data; + } catch (final NoSuchFileException | EOFException e) { + // NoSuchFileException: Ignore, may be new. + // EOFException: Ignore, this means the file was missing or corrupt + return Collections.emptyMap(); + } catch (final IOException | ClassNotFoundException e) { + throw new ConnectException(e); + } + } + + /** + * See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this + * method is not public. + */ + private void save(final Map data) { + try (final ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) { + final Map raw = new HashMap<>(); + for (final Map.Entry mapEntry : data.entrySet()) { + final byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; + final byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; + raw.put(key, value); + } + os.writeObject(raw); + } catch (final IOException e) { + throw new ConnectException(e); + } + } + + public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcState) { + final Path cdcWorkingDir; + try { + cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset"); + } catch (final IOException e) { + throw new RuntimeException(e); + } + final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat"); + + final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath); + offsetManager.persist(cdcState); + return offsetManager; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java new file mode 100644 index 000000000000..0f4e37fd20ef --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.debezium.document.Document; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.HistoryRecord; +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.commons.io.FileUtils; + +/** + * The purpose of this class is : to , 1. Read the contents of the file {@link #path} which contains + * the schema history at the end of the sync so that it can be saved in state for future syncs. + * Check {@link #read()} 2. Write the saved content back to the file {@link #path} at the beginning + * of the sync so that debezium can function smoothly. Check persist(Optional<JsonNode>). To + * understand more about file, please refer {@link FilteredFileDatabaseHistory} + */ +public class AirbyteSchemaHistoryStorage { + + private final Path path; + private static final Charset UTF8 = StandardCharsets.UTF_8; + private final DocumentReader reader = DocumentReader.defaultReader(); + private final DocumentWriter writer = DocumentWriter.defaultWriter(); + + public AirbyteSchemaHistoryStorage(final Path path) { + this.path = path; + } + + public Path getPath() { + return path; + } + + /** + * This implementation is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} + */ + public String read() { + final StringBuilder fileAsString = new StringBuilder(); + try { + for (final String line : Files.readAllLines(path, UTF8)) { + if (line != null && !line.isEmpty()) { + final Document record = reader.read(line); + final String recordAsString = writer.write(record); + fileAsString.append(recordAsString); + fileAsString.append(System.lineSeparator()); + } + } + return fileAsString.toString(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + /** + * This implementation is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#start()} + */ + private void makeSureFileExists() { + try { + // Make sure the file exists ... + if (!Files.exists(path)) { + // Create parent directories if we have them ... + if (path.getParent() != null) { + Files.createDirectories(path.getParent()); + } + try { + Files.createFile(path); + } catch (final FileAlreadyExistsException e) { + // do nothing + } + } + } catch (final IOException e) { + throw new IllegalStateException( + "Unable to check or create history file at " + path + ": " + e.getMessage(), e); + } + } + + public void persist(final Optional schemaHistory) { + if (schemaHistory.isEmpty()) { + return; + } + final String fileAsString = Jsons.object(schemaHistory.get(), String.class); + + if (fileAsString == null || fileAsString.isEmpty()) { + return; + } + + FileUtils.deleteQuietly(path.toFile()); + makeSureFileExists(); + writeToFile(fileAsString); + } + + /** + * This implementation is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} + * + * @param fileAsString Represents the contents of the file saved in state from previous syncs + */ + private void writeToFile(final String fileAsString) { + try { + final String[] split = fileAsString.split(System.lineSeparator()); + for (final String element : split) { + final Document read = reader.read(element); + final String line = writer.write(read); + + try (final BufferedWriter historyWriter = Files + .newBufferedWriter(path, StandardOpenOption.APPEND)) { + try { + historyWriter.append(line); + historyWriter.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + public static AirbyteSchemaHistoryStorage initializeDBHistory(final Optional schemaHistory) { + final Path dbHistoryWorkingDir; + try { + dbHistoryWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-db-history"); + } catch (final IOException e) { + throw new RuntimeException(e); + } + final Path dbHistoryFilePath = dbHistoryWorkingDir.resolve("dbhistory.dat"); + + final AirbyteSchemaHistoryStorage schemaHistoryManager = new AirbyteSchemaHistoryStorage(dbHistoryFilePath); + schemaHistoryManager.persist(schemaHistory); + return schemaHistoryManager; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java new file mode 100644 index 000000000000..ab0a9e6cde16 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import io.airbyte.db.DataTypeUtils; +import io.debezium.spi.converter.RelationalColumn; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DebeziumConverterUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class); + + private DebeziumConverterUtils() { + throw new UnsupportedOperationException(); + } + + public static String convertDate(final Object input) { + /** + * While building this custom converter we were not sure what type debezium could return cause there + * is no mention of it in the documentation. Secondly if you take a look at + * {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)} + * method, even it is handling multiple data types but its not clear under what circumstances which + * data type would be returned. I just went ahead and handled the data types that made sense. + * Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA + */ + if (input instanceof LocalDateTime) { + return DataTypeUtils.toISO8601String((LocalDateTime) input); + } else if (input instanceof LocalDate) { + return DataTypeUtils.toISO8601String((LocalDate) input); + } else if (input instanceof Duration) { + return DataTypeUtils.toISO8601String((Duration) input); + } else if (input instanceof Timestamp) { + return DataTypeUtils.toISO8601StringWithMicroseconds((((Timestamp) input).toInstant())); + } else if (input instanceof Number) { + return DataTypeUtils.toISO8601String( + new Timestamp(((Number) input).longValue()).toLocalDateTime()); + } else if (input instanceof Date) { + return DataTypeUtils.toISO8601String((Date) input); + } else if (input instanceof String) { + try { + return LocalDateTime.parse((String) input).toString(); + } catch (final DateTimeParseException e) { + LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", input); + return input.toString(); + } + } + LOGGER.warn("Uncovered date class type '{}'. Use default converter", input.getClass().getName()); + return input.toString(); + } + + public static Object convertDefaultValue(RelationalColumn field) { + if (field.isOptional()) { + return null; + } else if (field.hasDefaultValue()) { + return field.defaultValue(); + } + return null; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java new file mode 100644 index 000000000000..da31b6143210 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.debezium.CdcMetadataInjector; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.debezium.engine.ChangeEvent; +import java.sql.Timestamp; +import java.time.Instant; + +public class DebeziumEventUtils { + + public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at"; + public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at"; + + public static AirbyteMessage toAirbyteMessage(final ChangeEvent event, + final CdcMetadataInjector cdcMetadataInjector, + final Instant emittedAt) { + final JsonNode debeziumRecord = Jsons.deserialize(event.value()); + final JsonNode before = debeziumRecord.get("before"); + final JsonNode after = debeziumRecord.get("after"); + final JsonNode source = debeziumRecord.get("source"); + + final JsonNode data = formatDebeziumData(before, after, source, cdcMetadataInjector); + final String schemaName = cdcMetadataInjector.namespace(source); + final String streamName = source.get("table").asText(); + + final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(schemaName) + .withEmittedAt(emittedAt.toEpochMilli()) + .withData(data); + + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(airbyteRecordMessage); + } + + // warning mutates input args. + private static JsonNode formatDebeziumData(final JsonNode before, + final JsonNode after, + final JsonNode source, + final CdcMetadataInjector cdcMetadataInjector) { + final ObjectNode base = (ObjectNode) (after.isNull() ? before : after); + + final long transactionMillis = source.get("ts_ms").asLong(); + final String transactionTimestamp = new Timestamp(transactionMillis).toInstant().toString(); + + base.put(CDC_UPDATED_AT, transactionTimestamp); + cdcMetadataInjector.addMetaData(base, source); + + if (after.isNull()) { + base.put(CDC_DELETED_AT, transactionTimestamp); + } else { + base.put(CDC_DELETED_AT, (String) null); + } + + return base; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java new file mode 100644 index 000000000000..76305dabf259 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.AbstractIterator; +import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.debezium.engine.ChangeEvent; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The record iterator is the consumer (in the producer / consumer relationship with debezium) + * responsible for 1. making sure every record produced by the record publisher is processed 2. + * signalling to the record publisher when it is time for it to stop producing records. It emits + * this signal either when the publisher had not produced a new record for a long time or when it + * has processed at least all of the records that were present in the database when the source was + * started. Because the publisher might publish more records between the consumer sending this + * signal and the publisher actually shutting down, the consumer must stay alive as long as the + * publisher is not closed. Even after the publisher is closed, the consumer will finish processing + * any produced records before closing. + */ +public class DebeziumRecordIterator extends AbstractIterator> + implements AutoCloseableIterator> { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class); + + private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES); + private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES); + + private final LinkedBlockingQueue> queue; + private final CdcTargetPosition targetPosition; + private final Supplier publisherStatusSupplier; + private final VoidCallable requestClose; + private boolean receivedFirstRecord; + private boolean hasSnapshotFinished; + private boolean signalledClose; + + public DebeziumRecordIterator(final LinkedBlockingQueue> queue, + final CdcTargetPosition targetPosition, + final Supplier publisherStatusSupplier, + final VoidCallable requestClose) { + this.queue = queue; + this.targetPosition = targetPosition; + this.publisherStatusSupplier = publisherStatusSupplier; + this.requestClose = requestClose; + this.receivedFirstRecord = false; + this.hasSnapshotFinished = true; + this.signalledClose = false; + } + + @Override + protected ChangeEvent computeNext() { + // keep trying until the publisher is closed or until the queue is empty. the latter case is + // possible when the publisher has shutdown but the consumer has not yet processed all messages it + // emitted. + while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { + final ChangeEvent next; + try { + final WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES; + next = queue.poll(waitTime.period, waitTime.timeUnit); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + // if within the timeout, the consumer could not get a record, it is time to tell the producer to + // shutdown. + if (next == null) { + LOGGER.info("Closing cause next is returned as null"); + requestClose(); + LOGGER.info("no record found. polling again."); + continue; + } + + final JsonNode eventAsJson = Jsons.deserialize(next.value()); + hasSnapshotFinished = hasSnapshotFinished(eventAsJson); + + // if the last record matches the target file position, it is time to tell the producer to shutdown. + if (!signalledClose && shouldSignalClose(eventAsJson)) { + requestClose(); + } + receivedFirstRecord = true; + return next; + } + return endOfData(); + } + + private boolean hasSnapshotFinished(final JsonNode eventAsJson) { + final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase()); + return SnapshotMetadata.TRUE != snapshot; + } + + /** + * Debezium was built as an ever running process which keeps on listening for new changes on DB and + * immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order + * to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the + * beginning of the sync we define a target position in the logs of the DB. This can be an LSN or + * anything specific to the DB which can help us identify that we have reached a specific position + * in the log based replication When we start processing records from debezium, we extract the the + * log position from the metadata of the record and compare it with our target that we defined at + * the beginning of the sync. If we have reached the target position, we shutdown the debezium + * engine 2. The TargetPosition logic might not always work and in order to tackle that we have + * another logic where if we do not receive records from debezium for a given duration, we ask + * debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is + * running for the first time, we let it complete the snapshot and only after the completion of + * snapshot we should shutdown the engine. If we are closing the engine before completion of + * snapshot, we throw an exception + */ + @Override + public void close() throws Exception { + requestClose(); + } + + private boolean shouldSignalClose(final JsonNode eventAsJson) { + return targetPosition.reachedTargetPosition(eventAsJson); + } + + private void requestClose() { + try { + requestClose.call(); + signalledClose = true; + } catch (final Exception e) { + throw new RuntimeException(e); + } + throwExceptionIfSnapshotNotFinished(); + } + + private void throwExceptionIfSnapshotNotFinished() { + if (!hasSnapshotFinished) { + throw new RuntimeException("Closing down debezium engine but snapshot has not finished"); + } + } + + private static class WaitTime { + + public final int period; + public final TimeUnit timeUnit; + + public WaitTime(final int period, final TimeUnit timeUnit) { + this.period = period; + this.timeUnit = timeUnit; + } + + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java new file mode 100644 index 000000000000..c15c10815620 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import java.util.Optional; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.codehaus.plexus.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The purpose of this class is to intiliaze and spawn the debezium engine with the right properties + * to fetch records + */ +public class DebeziumRecordPublisher implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class); + private final ExecutorService executor; + private DebeziumEngine> engine; + + private final JsonNode config; + private final AirbyteFileOffsetBackingStore offsetManager; + private final Optional schemaHistoryManager; + + private final AtomicBoolean hasClosed; + private final AtomicBoolean isClosing; + private final AtomicReference thrownError; + private final CountDownLatch engineLatch; + private final Properties properties; + private final ConfiguredAirbyteCatalog catalog; + + public DebeziumRecordPublisher(final Properties properties, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final AirbyteFileOffsetBackingStore offsetManager, + final Optional schemaHistoryManager) { + this.properties = properties; + this.config = config; + this.catalog = catalog; + this.offsetManager = offsetManager; + this.schemaHistoryManager = schemaHistoryManager; + this.hasClosed = new AtomicBoolean(false); + this.isClosing = new AtomicBoolean(false); + this.thrownError = new AtomicReference<>(); + this.executor = Executors.newSingleThreadExecutor(); + this.engineLatch = new CountDownLatch(1); + } + + public void start(final Queue> queue) { + engine = DebeziumEngine.create(Json.class) + .using(getDebeziumProperties()) + .using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) + .notifying(e -> { + // debezium outputs a tombstone event that has a value of null. this is an artifact of how it + // interacts with kafka. we want to ignore it. + // more on the tombstone: + // https://debezium.io/documentation/reference/configuration/event-flattening.html + if (e.value() != null) { + boolean inserted = false; + while (!inserted) { + inserted = queue.offer(e); + } + } + }) + .using((success, message, error) -> { + LOGGER.info("Debezium engine shutdown."); + thrownError.set(error); + engineLatch.countDown(); + }) + .build(); + + // Run the engine asynchronously ... + executor.execute(engine); + } + + public boolean hasClosed() { + return hasClosed.get(); + } + + public void close() throws Exception { + if (isClosing.compareAndSet(false, true)) { + // consumers should assume records can be produced until engine has closed. + if (engine != null) { + engine.close(); + } + + // wait for closure before shutting down executor service + engineLatch.await(5, TimeUnit.MINUTES); + + // shut down and await for thread to actually go down + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + + // after the engine is completely off, we can mark this as closed + hasClosed.set(true); + + if (thrownError.get() != null) { + throw new RuntimeException(thrownError.get()); + } + } + } + + protected Properties getDebeziumProperties() { + final Properties props = new Properties(); + props.putAll(properties); + + // debezium engine configuration + props.setProperty("name", "engine"); + props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); + props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); + props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer + // default values from debezium CommonConnectorConfig + props.setProperty("max.batch.size", "2048"); + props.setProperty("max.queue.size", "8192"); + + if (schemaHistoryManager.isPresent()) { + // https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class + // https://debezium.io/documentation/reference/development/engine.html#_in_the_code + // As mentioned in the documents above, debezium connector for MySQL needs to track the schema + // changes. If we don't do this, we can't fetch records for the table + // We have implemented our own implementation to filter out the schema information from other + // databases that the connector is not syncing + props.setProperty("database.history", "io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory"); + props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString()); + } + + // https://debezium.io/documentation/reference/configuration/avro.html + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + + // debezium names + props.setProperty("name", config.get("database").asText()); + props.setProperty("database.server.name", config.get("database").asText()); + + // db connection configuration + props.setProperty("database.hostname", config.get("host").asText()); + props.setProperty("database.port", config.get("port").asText()); + props.setProperty("database.user", config.get("username").asText()); + props.setProperty("database.dbname", config.get("database").asText()); + + if (config.has("password")) { + props.setProperty("database.password", config.get("password").asText()); + } + + // By default "decimal.handing.mode=precise" which's caused returning this value as a binary. + // The "double" type may cause a loss of precision, so set Debezium's config to store it as a String + // explicitly in its Kafka messages for more details see: + // https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types + // https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation + props.setProperty("decimal.handling.mode", "string"); + + // table selection + final String tableWhitelist = getTableWhitelist(catalog); + props.setProperty("table.include.list", tableWhitelist); + + return props; + } + + @VisibleForTesting + public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) { + return catalog.getStreams().stream() + .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) + .map(ConfiguredAirbyteStream::getStream) + .map(stream -> stream.getNamespace() + "." + stream.getName()) + // debezium needs commas escaped to split properly + .map(x -> StringUtils.escape(x, new char[] {','}, "\\,")) + .collect(Collectors.joining(",")); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java new file mode 100644 index 000000000000..f04690767537 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import io.debezium.config.Configuration; +import io.debezium.relational.history.AbstractDatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecord.Fields; +import io.debezium.relational.history.HistoryRecordComparator; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.function.Consumer; + +/** + * MySQL Debezium connector monitors the database schema evolution over the time and stores the data + * in a database history file. Without this file we can't fetch the records from binlog. We need to + * save the contents of the file. Debezium by default uses + * {@link io.debezium.relational.history.FileDatabaseHistory} class to write the schema information + * in the file. The problem is that the Debezium tracks the schema evolution of all the tables in + * all the databases, because of that the file content can grow. In order to make sure that debezium + * tracks only the schema of the tables that are present in the database that Airbyte is syncing, we + * created this class. In the method {@link #storeRecord(HistoryRecord)}, we introduced a check to + * make sure only those records are being saved whose database name matches the database Airbyte is + * syncing. We tell debezium to use this class by passing it as property in debezium engine. Look + * for "database.history" property in {@link DebeziumRecordPublisher#getDebeziumProperties()} + * Ideally {@link FilteredFileDatabaseHistory} should have extended + * {@link io.debezium.relational.history.FileDatabaseHistory} and overridden the + * {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final + * class and can not be inherited + */ +public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory { + + private final FileDatabaseHistory fileDatabaseHistory; + private static String databaseName; + + public FilteredFileDatabaseHistory() { + this.fileDatabaseHistory = new FileDatabaseHistory(); + } + + /** + * Ideally the databaseName should have been initialized in the constructor of the class. But since + * we supply the class name to debezium and it uses reflection to construct the object of the class, + * we can't pass in the databaseName as a parameter to the constructor. That's why we had to take + * the static approach. + * + * @param databaseName Name of the database that the connector is syncing + */ + public static void setDatabaseName(final String databaseName) { + if (FilteredFileDatabaseHistory.databaseName == null) { + FilteredFileDatabaseHistory.databaseName = databaseName; + } else if (!FilteredFileDatabaseHistory.databaseName.equals(databaseName)) { + throw new RuntimeException( + "Database name has already been set : " + FilteredFileDatabaseHistory.databaseName + + " can't set to : " + databaseName); + } + } + + @Override + public void configure(final Configuration config, + final HistoryRecordComparator comparator, + final DatabaseHistoryListener listener, + final boolean useCatalogBeforeSchema) { + fileDatabaseHistory.configure(config, comparator, listener, useCatalogBeforeSchema); + } + + @Override + public void start() { + fileDatabaseHistory.start(); + } + + @Override + public void storeRecord(final HistoryRecord record) throws DatabaseHistoryException { + if (record == null) { + return; + } + try { + final String dbNameInRecord = record.document().getString(Fields.DATABASE_NAME); + if (databaseName != null && dbNameInRecord != null && !dbNameInRecord.equals(databaseName)) { + return; + } + + /** + * We are using reflection because the method + * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} is + * protected and can not be accessed from here + */ + final Method storeRecordMethod = fileDatabaseHistory.getClass() + .getDeclaredMethod("storeRecord", record.getClass()); + storeRecordMethod.setAccessible(true); + storeRecordMethod.invoke(fileDatabaseHistory, record); + } catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + fileDatabaseHistory.stop(); + // this is just for tests + resetDbName(); + } + + public static void resetDbName() { + databaseName = null; + } + + @Override + protected void recoverRecords(final Consumer records) { + try { + /** + * We are using reflection because the method + * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} is protected + * and can not be accessed from here + */ + final Method recoverRecords = fileDatabaseHistory.getClass() + .getDeclaredMethod("recoverRecords", Consumer.class); + recoverRecords.setAccessible(true); + recoverRecords.invoke(fileDatabaseHistory, records); + } catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean storageExists() { + return fileDatabaseHistory.storageExists(); + } + + @Override + public void initializeStorage() { + fileDatabaseHistory.initializeStorage(); + } + + @Override + public boolean exists() { + return fileDatabaseHistory.exists(); + } + + @Override + public String toString() { + return fileDatabaseHistory.toString(); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java new file mode 100644 index 000000000000..9420b8ba9758 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import io.airbyte.db.DataTypeUtils; +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MSSQLConverter implements CustomConverter { + + private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class); + + private final Set DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "DATETIMEOFFSET", "SMALLDATETIME"); + private final String TIME_TYPE = "TIME"; + private final String SMALLMONEY_TYPE = "SMALLMONEY"; + + @Override + public void configure(Properties props) {} + + @Override + public void converterFor(final RelationalColumn field, + final ConverterRegistration registration) { + if (DATE_TYPES.contains(field.typeName().toUpperCase())) { + registerDate(field, registration); + } else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) { + registerMoney(field, registration); + } else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) { + registerTime(field, registration); + } + } + + private void registerDate(final RelationalColumn field, + final ConverterRegistration registration) { + registration.register(SchemaBuilder.string(), input -> { + if (Objects.isNull(input)) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + + return DebeziumConverterUtils.convertDate(input); + }); + } + + private void registerTime(final RelationalColumn field, + final ConverterRegistration registration) { + registration.register(SchemaBuilder.string(), input -> { + if (Objects.isNull(input)) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + + if (input instanceof Timestamp) { + return DataTypeUtils.toISOTimeString(((Timestamp) input).toLocalDateTime()); + } + + LOGGER.warn("Uncovered time class type '{}'. Use default converter", + input.getClass().getName()); + return input.toString(); + }); + } + + private void registerMoney(final RelationalColumn field, + final ConverterRegistration registration) { + registration.register(SchemaBuilder.float64(), input -> { + if (Objects.isNull(input)) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + + if (input instanceof BigDecimal) { + return ((BigDecimal) input).doubleValue(); + } + + LOGGER.warn("Uncovered money class type '{}'. Use default converter", + input.getClass().getName()); + return input.toString(); + }); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java new file mode 100644 index 000000000000..70f68704d4d9 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import io.airbyte.db.DataTypeUtils; +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Properties; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a custom debezium converter used in MySQL to handle the DATETIME data type. We need a + * custom converter cause by default debezium returns the DATETIME values as numbers. We need to + * convert it to proper format. Ref : + * https://debezium.io/documentation/reference/1.4/development/converters.html This is built from + * reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you + * rename this class then remember to rename the datetime.type property value in + * io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties() (If you don't + * rename, a test would still fail but it might be tricky to figure out where to change the property + * name) + */ +public class MySQLConverter implements CustomConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(MySQLConverter.class); + + private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"}; + private final String[] TEXT_TYPES = {"VARCHAR", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT"}; + + @Override + public void configure(final Properties props) {} + + @Override + public void converterFor(final RelationalColumn field, final ConverterRegistration registration) { + if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerDate(field, registration); + } else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerText(field, registration); + } + } + + private void registerText(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + if (x instanceof byte[]) { + return new String((byte[]) x, StandardCharsets.UTF_8); + } else { + return x.toString(); + } + }); + } + + /** + * The debezium driver replaces Zero-value by Null even when this column is mandatory. According to + * the doc, it should be done by driver, but it fails. + */ + private Object convertDefaultValueNullDate(final RelationalColumn field) { + final var defaultValue = DebeziumConverterUtils.convertDefaultValue(field); + return (defaultValue == null && !field.isOptional() ? DataTypeUtils.toISO8601String(LocalDate.EPOCH) : defaultValue); + } + + private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string(), + x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x)); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java new file mode 100644 index 000000000000..aee741b6aaca --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Properties; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.postgresql.util.PGInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresConverter implements CustomConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class); + + private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"}; + private final String[] BIT_TYPES = {"BIT", "VARBIT"}; + private final String[] MONEY_ITEM_TYPE = {"MONEY"}; + private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"}; + private final String[] TEXT_TYPES = + {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"}; + + @Override + public void configure(final Properties props) {} + + @Override + public void converterFor(final RelationalColumn field, final ConverterRegistration registration) { + if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerDate(field, registration); + } else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) + || Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) + || Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerText(field, registration); + } else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerMoney(field, registration); + } + } + + private void registerText(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + + if (x instanceof byte[]) { + return new String((byte[]) x, StandardCharsets.UTF_8); + } else { + return x.toString(); + } + }); + } + + private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } else if (x instanceof PGInterval) { + return convertInterval((PGInterval) x); + } else { + return DebeziumConverterUtils.convertDate(x); + } + }); + } + + private String convertInterval(final PGInterval pgInterval) { + final StringBuilder resultInterval = new StringBuilder(); + formatDateUnit(resultInterval, pgInterval.getYears(), " year "); + formatDateUnit(resultInterval, pgInterval.getMonths(), " mons "); + formatDateUnit(resultInterval, pgInterval.getDays(), " days "); + + formatTimeValues(resultInterval, pgInterval); + return resultInterval.toString(); + } + + private void registerMoney(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } else if (x instanceof Double) { + final BigDecimal result = BigDecimal.valueOf((Double) x); + if (result.compareTo(new BigDecimal("999999999999999")) == 1 + || result.compareTo(new BigDecimal("-999999999999999")) == -1) { + return null; + } + return result.toString(); + } else { + return x.toString(); + } + }); + } + + private void formatDateUnit(final StringBuilder resultInterval, final int dateUnit, final String s) { + if (dateUnit != 0) { + resultInterval + .append(dateUnit) + .append(s); + } + } + + private void formatTimeValues(final StringBuilder resultInterval, final PGInterval pgInterval) { + if (isNegativeTime(pgInterval)) { + resultInterval.append("-"); + } + // TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE, + final int hours = Math.abs(pgInterval.getHours()); + final int minutes = Math.abs(pgInterval.getMinutes()); + final int seconds = Math.abs(pgInterval.getWholeSeconds()); + resultInterval.append(addFirstDigit(hours)); + resultInterval.append(hours); + resultInterval.append(":"); + resultInterval.append(addFirstDigit(minutes)); + resultInterval.append(minutes); + resultInterval.append(":"); + resultInterval.append(addFirstDigit(seconds)); + resultInterval.append(seconds); + } + + private String addFirstDigit(final int hours) { + return hours <= 9 ? "0" : ""; + } + + private boolean isNegativeTime(final PGInterval pgInterval) { + return pgInterval.getHours() < 0 + || pgInterval.getMinutes() < 0 + || pgInterval.getWholeSeconds() < 0; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java new file mode 100644 index 000000000000..b24cdf71fbe6 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +public enum SnapshotMetadata { + TRUE, + FALSE, + LAST +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java new file mode 100644 index 000000000000..9f1e6d0ea052 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class AirbyteFileOffsetBackingStoreTest { + + @SuppressWarnings("UnstableApiUsage") + @Test + void test() throws IOException { + final Path testRoot = Files.createTempDirectory(Path.of("/tmp"), "offset-store-test"); + + final byte[] bytes = MoreResources.readBytes("test_debezium_offset.dat"); + final Path templateFilePath = testRoot.resolve("template_offset.dat"); + IOs.writeFile(templateFilePath, bytes); + + final Path writeFilePath = testRoot.resolve("offset.dat"); + + final AirbyteFileOffsetBackingStore offsetStore = new AirbyteFileOffsetBackingStore(templateFilePath); + final Map offset = offsetStore.read(); + + final JsonNode asJson = Jsons.jsonNode(offset); + + final AirbyteFileOffsetBackingStore offsetStore2 = new AirbyteFileOffsetBackingStore(writeFilePath); + offsetStore2.persist(asJson); + + final Map stateFromOffsetStoreRoundTrip = offsetStore2.read(); + + // verify that, after a round trip through the offset store, we get back the same data. + assertEquals(offset, stateFromOffsetStoreRoundTrip); + // verify that the file written by the offset store is identical to the template file. + assertTrue(com.google.common.io.Files.equal(templateFilePath.toFile(), writeFilePath.toFile())); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java new file mode 100644 index 000000000000..4de1b36524dd --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.debezium.engine.ChangeEvent; +import java.io.IOException; +import java.time.Instant; +import org.junit.jupiter.api.Test; + +class DebeziumEventUtilsTest { + + @Test + public void testConvertChangeEvent() throws IOException { + final String stream = "names"; + final Instant emittedAt = Instant.now(); + final CdcMetadataInjector cdcMetadataInjector = new DummyMetadataInjector(); + final ChangeEvent insertChangeEvent = mockChangeEvent("insert_change_event.json"); + final ChangeEvent updateChangeEvent = mockChangeEvent("update_change_event.json"); + final ChangeEvent deleteChangeEvent = mockChangeEvent("delete_change_event.json"); + + final AirbyteMessage actualInsert = DebeziumEventUtils.toAirbyteMessage(insertChangeEvent, cdcMetadataInjector, emittedAt); + final AirbyteMessage actualUpdate = DebeziumEventUtils.toAirbyteMessage(updateChangeEvent, cdcMetadataInjector, emittedAt); + final AirbyteMessage actualDelete = DebeziumEventUtils.toAirbyteMessage(deleteChangeEvent, cdcMetadataInjector, emittedAt); + + final AirbyteMessage expectedInsert = createAirbyteMessage(stream, emittedAt, "insert_message.json"); + final AirbyteMessage expectedUpdate = createAirbyteMessage(stream, emittedAt, "update_message.json"); + final AirbyteMessage expectedDelete = createAirbyteMessage(stream, emittedAt, "delete_message.json"); + + deepCompare(expectedInsert, actualInsert); + deepCompare(expectedUpdate, actualUpdate); + deepCompare(expectedDelete, actualDelete); + } + + private static ChangeEvent mockChangeEvent(final String resourceName) throws IOException { + final ChangeEvent mocked = mock(ChangeEvent.class); + final String resource = MoreResources.readResource(resourceName); + when(mocked.value()).thenReturn(resource); + + return mocked; + } + + private static AirbyteMessage createAirbyteMessage(final String stream, final Instant emittedAt, final String resourceName) throws IOException { + final String data = MoreResources.readResource(resourceName); + + final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage() + .withStream(stream) + .withNamespace("public") + .withData(Jsons.deserialize(data)) + .withEmittedAt(emittedAt.toEpochMilli()); + + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(recordMessage); + } + + private static void deepCompare(final Object expected, final Object actual) { + assertEquals(Jsons.deserialize(Jsons.serialize(expected)), Jsons.deserialize(Jsons.serialize(actual))); + } + + public static class DummyMetadataInjector implements CdcMetadataInjector { + + @Override + public void addMetaData(final ObjectNode event, final JsonNode source) { + final long lsn = source.get("lsn").asLong(); + event.put("_ab_cdc_lsn", lsn); + } + + @Override + public String namespace(final JsonNode source) { + return source.get("schema").asText(); + } + + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java new file mode 100644 index 000000000000..31dacbc563ae --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import static org.junit.jupiter.api.Assertions.*; + +import com.google.common.collect.ImmutableList; +import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.SyncMode; +import org.junit.jupiter.api.Test; + +class DebeziumRecordPublisherTest { + + @Test + public void testWhitelistCreation() { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( + CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public").withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public").withSyncMode(SyncMode.INCREMENTAL))); + + final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.n\"aMéS"; + final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); + + assertEquals(expectedWhitelist, actualWhitelist); + } + + @Test + public void testWhitelistFiltersFullRefresh() { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( + CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public").withSyncMode(SyncMode.FULL_REFRESH))); + + final String expectedWhitelist = "public.id_and_name"; + final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); + + assertEquals(expectedWhitelist, actualWhitelist); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java new file mode 100644 index 000000000000..facb86d0bc54 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.debezium.spi.converter.RelationalColumn; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class DebeziumConverterUtilsTest { + + @Test + public void convertDefaultValueTest() { + + final RelationalColumn relationalColumn = mock(RelationalColumn.class); + + when(relationalColumn.isOptional()).thenReturn(true); + Object actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); + Assertions.assertNull(actualColumnDefaultValue, "Default value for optional relational column should be null"); + + when(relationalColumn.isOptional()).thenReturn(false); + when(relationalColumn.hasDefaultValue()).thenReturn(false); + actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); + Assertions.assertNull(actualColumnDefaultValue); + + when(relationalColumn.isOptional()).thenReturn(false); + when(relationalColumn.hasDefaultValue()).thenReturn(true); + final String expectedColumnDefaultValue = "default value"; + when(relationalColumn.defaultValue()).thenReturn(expectedColumnDefaultValue); + actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); + Assertions.assertEquals(actualColumnDefaultValue, expectedColumnDefaultValue); + } + + @Test + public void convertLocalDate() { + final LocalDate localDate = LocalDate.of(2021, 1, 1); + + final String actual = DebeziumConverterUtils.convertDate(localDate); + Assertions.assertEquals("2021-01-01T00:00:00Z", actual); + } + + @Test + public void convertTLocalTime() { + final LocalTime localTime = LocalTime.of(8, 1, 1); + final String actual = DebeziumConverterUtils.convertDate(localTime); + Assertions.assertEquals("08:01:01", actual); + } + + @Test + public void convertLocalDateTime() { + final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1); + + final String actual = DebeziumConverterUtils.convertDate(localDateTime); + Assertions.assertEquals("2021-01-01T08:01:01Z", actual); + } + + @Test + @Disabled + public void convertDuration() { + final Duration duration = Duration.ofHours(100_000); + + final String actual = DebeziumConverterUtils.convertDate(duration); + Assertions.assertEquals("1981-05-29T20:00:00Z", actual); + } + + @Test + public void convertTimestamp() { + final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1); + final Timestamp timestamp = Timestamp.valueOf(localDateTime); + + final String actual = DebeziumConverterUtils.convertDate(timestamp); + Assertions.assertEquals("2021-01-01T08:01:01.000000Z", actual); + } + + @Test + @Disabled + public void convertNumber() { + final Number number = 100_000; + + final String actual = DebeziumConverterUtils.convertDate(number); + Assertions.assertEquals("1970-01-01T03:01:40Z", actual); + } + + @Test + public void convertStringDateFormat() { + final String stringValue = "2021-01-01T00:00:00Z"; + + final String actual = DebeziumConverterUtils.convertDate(stringValue); + Assertions.assertEquals("2021-01-01T00:00:00Z", actual); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_change_event.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_change_event.json new file mode 100644 index 000000000000..07b575bf7e2c --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_change_event.json @@ -0,0 +1,25 @@ +{ + "before": { + "first_name": "san", + "last_name": "goku", + "power": null + }, + "after": null, + "source": { + "version": "1.4.2.Final", + "connector": "postgresql", + "name": "orders", + "ts_ms": 1616775646886, + "snapshot": false, + "db": "db_lwfoyffqvx", + "schema": "public", + "table": "names", + "txId": 498, + "lsn": 23012360, + "xmin": null + }, + "op": "d", + "ts_ms": 1616775646931, + "transaction": null, + "destination": "orders.public.names" +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_message.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_message.json new file mode 100644 index 000000000000..676ee5b74ffe --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/delete_message.json @@ -0,0 +1,8 @@ +{ + "first_name": "san", + "last_name": "goku", + "power": null, + "_ab_cdc_updated_at": "2021-03-26T16:20:46.886Z", + "_ab_cdc_lsn": 23012360, + "_ab_cdc_deleted_at": "2021-03-26T16:20:46.886Z" +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_change_event.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_change_event.json new file mode 100644 index 000000000000..4b2c2fb6e2cf --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_change_event.json @@ -0,0 +1,25 @@ +{ + "before": null, + "after": { + "first_name": "san", + "last_name": "goku", + "power": "Infinity" + }, + "source": { + "version": "1.4.2.Final", + "connector": "postgresql", + "name": "orders", + "ts_ms": 1616775642623, + "snapshot": true, + "db": "db_lwfoyffqvx", + "schema": "public", + "table": "names", + "txId": 495, + "lsn": 23011544, + "xmin": null + }, + "op": "r", + "ts_ms": 1616775642624, + "transaction": null, + "destination": "orders.public.names" +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_message.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_message.json new file mode 100644 index 000000000000..d971d32c1766 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/insert_message.json @@ -0,0 +1,8 @@ +{ + "first_name": "san", + "last_name": "goku", + "power": "Infinity", + "_ab_cdc_updated_at": "2021-03-26T16:20:42.623Z", + "_ab_cdc_lsn": 23011544, + "_ab_cdc_deleted_at": null +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/test_debezium_offset.dat b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/test_debezium_offset.dat new file mode 100644 index 000000000000..c7e7054916ed Binary files /dev/null and b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/test_debezium_offset.dat differ diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_change_event.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_change_event.json new file mode 100644 index 000000000000..da5dcd9c2b06 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_change_event.json @@ -0,0 +1,25 @@ +{ + "before": null, + "after": { + "first_name": "san", + "last_name": "goku", + "power": 10000.2 + }, + "source": { + "version": "1.4.2.Final", + "connector": "postgresql", + "name": "orders", + "ts_ms": 1616775646881, + "snapshot": false, + "db": "db_lwfoyffqvx", + "schema": "public", + "table": "names", + "txId": 497, + "lsn": 23012216, + "xmin": null + }, + "op": "u", + "ts_ms": 1616775646929, + "transaction": null, + "destination": "orders.public.names" +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_message.json b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_message.json new file mode 100644 index 000000000000..89b9a08038aa --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/test/resources/update_message.json @@ -0,0 +1,8 @@ +{ + "first_name": "san", + "last_name": "goku", + "power": 10000.2, + "_ab_cdc_updated_at": "2021-03-26T16:20:46.881Z", + "_ab_cdc_lsn": 23012216, + "_ab_cdc_deleted_at": null +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java new file mode 100644 index 000000000000..79d6dbbd5b31 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -0,0 +1,625 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Streams; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.db.Database; +import io.airbyte.integrations.base.Source; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class CdcSourceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class); + + protected static final String MODELS_SCHEMA = "models_schema"; + protected static final String MODELS_STREAM_NAME = "models"; + private static final Set STREAM_NAMES = Sets + .newHashSet(MODELS_STREAM_NAME); + protected static final String COL_ID = "id"; + protected static final String COL_MAKE_ID = "make_id"; + protected static final String COL_MODEL = "model"; + + protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME, + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER), + Field.of(COL_MODEL, JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))))); + protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers + .toDefaultConfiguredCatalog(CATALOG); + + // set all streams to incremental. + static { + CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); + } + + protected static final List MODEL_RECORDS = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350"))); + + protected void setup() throws SQLException { + createAndPopulateTables(); + } + + private void createAndPopulateTables() { + createAndPopulateActualTable(); + createAndPopulateRandomTable(); + } + + protected void executeQuery(final String query) { + try { + getDatabase().query( + ctx -> ctx + .execute(query)); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + public String columnClause(final Map columnsWithDataType, final Optional primaryKey) { + final StringBuilder columnClause = new StringBuilder(); + int i = 0; + for (final Map.Entry column : columnsWithDataType.entrySet()) { + columnClause.append(column.getKey()); + columnClause.append(" "); + columnClause.append(column.getValue()); + if (i < (columnsWithDataType.size() - 1)) { + columnClause.append(","); + columnClause.append(" "); + } + i++; + } + primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")")); + + return columnClause.toString(); + } + + public void createTable(final String schemaName, final String tableName, final String columnClause) { + executeQuery(createTableQuery(schemaName, tableName, columnClause)); + } + + public String createTableQuery(final String schemaName, final String tableName, final String columnClause) { + return String.format("CREATE TABLE %s.%s(%s);", schemaName, tableName, columnClause); + } + + public void createSchema(final String schemaName) { + executeQuery(createSchemaQuery(schemaName)); + } + + public String createSchemaQuery(final String schemaName) { + return "CREATE DATABASE " + schemaName + ";"; + } + + private void createAndPopulateActualTable() { + createSchema(MODELS_SCHEMA); + createTable(MODELS_SCHEMA, MODELS_STREAM_NAME, + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); + for (final JsonNode recordJson : MODEL_RECORDS) { + writeModelRecord(recordJson); + } + } + + /** + * This database and table is not part of Airbyte sync. It is being created just to make sure the + * databases not being synced by Airbyte are not causing issues with our debezium logic + */ + private void createAndPopulateRandomTable() { + createSchema(MODELS_SCHEMA + "_random"); + createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", + columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"), + Optional.of(COL_ID + "_random"))); + final List MODEL_RECORDS_RANDOM = ImmutableList.of( + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Fiesta-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Focus-random")), + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Ranger-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "GLA-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "A 220-random")), + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "E 350-random"))); + for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) { + writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", + COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random"); + } + } + + protected void writeModelRecord(final JsonNode recordJson) { + writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL); + } + + private void writeRecords( + final JsonNode recordJson, + final String dbName, + final String streamName, + final String idCol, + final String makeIdCol, + final String modelCol) { + executeQuery( + String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName, + idCol, makeIdCol, modelCol, + recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(), + recordJson.get(modelCol).asText())); + } + + protected static Set removeDuplicates(final Set messages) { + final Set existingDataRecordsWithoutUpdated = new HashSet<>(); + final Set output = new HashSet<>(); + + for (final AirbyteRecordMessage message : messages) { + final ObjectNode node = message.getData().deepCopy(); + node.remove("_ab_cdc_updated_at"); + + if (existingDataRecordsWithoutUpdated.contains(node)) { + LOGGER.info("Removing duplicate node: " + node); + } else { + output.add(message); + existingDataRecordsWithoutUpdated.add(node); + } + } + + return output; + } + + protected Set extractRecordMessages(final List messages) { + final List recordMessageList = messages + .stream() + .filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord) + .collect(Collectors.toList()); + final Set recordMessageSet = new HashSet<>(recordMessageList); + + assertEquals(recordMessageList.size(), recordMessageSet.size(), + "Expected no duplicates in airbyte record message output for a single sync."); + + return recordMessageSet; + } + + protected List extractStateMessages(final List messages) { + return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) + .collect(Collectors.toList()); + } + + private void assertExpectedRecords(final Set expectedRecords, final Set actualRecords) { + // assume all streams are cdc. + assertExpectedRecords(expectedRecords, actualRecords, actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet())); + } + + private void assertExpectedRecords(final Set expectedRecords, + final Set actualRecords, + final Set cdcStreams) { + assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES); + } + + private void assertExpectedRecords(final Set expectedRecords, + final Set actualRecords, + final Set cdcStreams, + final Set streamNames) { + final Set actualData = actualRecords + .stream() + .map(recordMessage -> { + assertTrue(streamNames.contains(recordMessage.getStream())); + assertNotNull(recordMessage.getEmittedAt()); + + assertEquals(MODELS_SCHEMA, recordMessage.getNamespace()); + + final JsonNode data = recordMessage.getData(); + + if (cdcStreams.contains(recordMessage.getStream())) { + assertCdcMetaData(data, true); + } else { + assertNullCdcMetaData(data); + } + + removeCDCColumns((ObjectNode) data); + + return data; + }) + .collect(Collectors.toSet()); + + assertEquals(expectedRecords, actualData); + } + + @Test + @DisplayName("On the first sync, produce returns records that exist in the database.") + void testExistingData() throws Exception { + final CdcTargetPosition targetPosition = cdcLatestTargetPosition(); + final AutoCloseableIterator read = getSource().read(getConfig(), CONFIGURED_CATALOG, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read); + + final Set recordMessages = extractRecordMessages(actualRecords); + final List stateMessages = extractStateMessages(actualRecords); + + assertNotNull(targetPosition); + recordMessages.forEach(record -> { + assertEquals(extractPosition(record.getData()), targetPosition); + }); + + assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages); + assertEquals(1, stateMessages.size()); + assertNotNull(stateMessages.get(0).getData()); + assertExpectedStateMessages(stateMessages); + } + + @Test + @DisplayName("When a record is deleted, produces a deletion record.") + void testDelete() throws Exception { + final AutoCloseableIterator read1 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final List stateMessages1 = extractStateMessages(actualRecords1); + assertEquals(1, stateMessages1.size()); + assertNotNull(stateMessages1.get(0).getData()); + assertExpectedStateMessages(stateMessages1); + + executeQuery(String + .format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, + 11)); + + final JsonNode state = stateMessages1.get(0).getData(); + final AutoCloseableIterator read2 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final List recordMessages2 = new ArrayList<>( + extractRecordMessages(actualRecords2)); + final List stateMessages2 = extractStateMessages(actualRecords2); + assertEquals(1, stateMessages2.size()); + assertNotNull(stateMessages2.get(0).getData()); + assertExpectedStateMessages(stateMessages2); + assertEquals(1, recordMessages2.size()); + assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); + assertCdcMetaData(recordMessages2.get(0).getData(), false); + } + + @Test + @DisplayName("When a record is updated, produces an update record.") + void testUpdate() throws Exception { + final String updatedModel = "Explorer"; + final AutoCloseableIterator read1 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final List stateMessages1 = extractStateMessages(actualRecords1); + assertEquals(1, stateMessages1.size()); + assertNotNull(stateMessages1.get(0).getData()); + assertExpectedStateMessages(stateMessages1); + + executeQuery(String + .format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, + COL_MODEL, updatedModel, COL_ID, 11)); + + final JsonNode state = stateMessages1.get(0).getData(); + final AutoCloseableIterator read2 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final List recordMessages2 = new ArrayList<>( + extractRecordMessages(actualRecords2)); + final List stateMessages2 = extractStateMessages(actualRecords2); + assertEquals(1, stateMessages2.size()); + assertNotNull(stateMessages2.get(0).getData()); + assertExpectedStateMessages(stateMessages2); + assertEquals(1, recordMessages2.size()); + assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); + assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText()); + assertCdcMetaData(recordMessages2.get(0).getData(), true); + } + + @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) + @Test + @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") + protected void testRecordsProducedDuringAndAfterSync() throws Exception { + + final int recordsToCreate = 20; + // first batch of records. 20 created here and 6 created in setup method. + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + final AutoCloseableIterator firstBatchIterator = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List dataFromFirstBatch = AutoCloseableIterators + .toListAndClose(firstBatchIterator); + final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); + assertEquals(1, stateAfterFirstBatch.size()); + assertNotNull(stateAfterFirstBatch.get(0).getData()); + assertExpectedStateMessages(stateAfterFirstBatch); + final Set recordsFromFirstBatch = extractRecordMessages( + dataFromFirstBatch); + assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size()); + + // second batch of records again 20 being created + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + final JsonNode state = stateAfterFirstBatch.get(0).getData(); + final AutoCloseableIterator secondBatchIterator = getSource() + .read(getConfig(), CONFIGURED_CATALOG, state); + final List dataFromSecondBatch = AutoCloseableIterators + .toListAndClose(secondBatchIterator); + + final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); + assertEquals(1, stateAfterSecondBatch.size()); + assertNotNull(stateAfterSecondBatch.get(0).getData()); + assertExpectedStateMessages(stateAfterSecondBatch); + + final Set recordsFromSecondBatch = extractRecordMessages( + dataFromSecondBatch); + assertEquals(recordsToCreate, recordsFromSecondBatch.size(), + "Expected 20 records to be replicated in the second sync."); + + // sometimes there can be more than one of these at the end of the snapshot and just before the + // first incremental. + final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( + recordsFromFirstBatch); + final Set recordsFromSecondBatchWithoutDuplicates = removeDuplicates( + recordsFromSecondBatch); + + final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); + assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), + "Expected first sync to include records created while the test was running."); + assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, + recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates + .size()); + } + + @Test + @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") + void testCdcAndFullRefreshInSameSync() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG); + + final List MODEL_RECORDS_2 = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2"))); + + createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); + + for (final JsonNode recordJson : MODEL_RECORDS_2) { + writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, + COL_MAKE_ID, COL_MODEL); + } + + final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream() + .withStream(CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_2", + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER), + Field.of(COL_MODEL, JsonSchemaType.STRING)) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))); + airbyteStream.setSyncMode(SyncMode.FULL_REFRESH); + + final List streams = configuredCatalog.getStreams(); + streams.add(airbyteStream); + configuredCatalog.withStreams(streams); + + final AutoCloseableIterator read1 = getSource() + .read(getConfig(), configuredCatalog, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + + final Set recordMessages1 = extractRecordMessages(actualRecords1); + final List stateMessages1 = extractStateMessages(actualRecords1); + final HashSet names = new HashSet<>(STREAM_NAMES); + names.add(MODELS_STREAM_NAME + "_2"); + assertEquals(1, stateMessages1.size()); + assertNotNull(stateMessages1.get(0).getData()); + assertExpectedStateMessages(stateMessages1); + assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) + .collect(Collectors.toSet()), + recordMessages1, + Collections.singleton(MODELS_STREAM_NAME), + names); + + final JsonNode puntoRecord = Jsons + .jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto")); + writeModelRecord(puntoRecord); + + final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); + final AutoCloseableIterator read2 = getSource() + .read(getConfig(), configuredCatalog, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + assertEquals(1, stateMessages2.size()); + assertNotNull(stateMessages2.get(0).getData()); + assertExpectedStateMessages(stateMessages2); + assertExpectedRecords( + Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) + .collect(Collectors.toSet()), + recordMessages2, + Collections.singleton(MODELS_STREAM_NAME), + names); + } + + @Test + @DisplayName("When no records exist, no records are returned.") + void testNoData() throws Exception { + + executeQuery(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME)); + + final AutoCloseableIterator read = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read); + + final Set recordMessages = extractRecordMessages(actualRecords); + final List stateMessages = extractStateMessages(actualRecords); + + assertExpectedRecords(Collections.emptySet(), recordMessages); + assertEquals(1, stateMessages.size()); + assertNotNull(stateMessages.get(0).getData()); + assertExpectedStateMessages(stateMessages); + } + + @Test + @DisplayName("When no changes have been made to the database since the previous sync, no records are returned.") + void testNoDataOnSecondSync() throws Exception { + final AutoCloseableIterator read1 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); + + final AutoCloseableIterator read2 = getSource() + .read(getConfig(), CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedRecords(Collections.emptySet(), recordMessages2); + assertEquals(1, stateMessages2.size()); + assertNotNull(stateMessages2.get(0).getData()); + assertExpectedStateMessages(stateMessages2); + } + + @Test + void testCheck() throws Exception { + final AirbyteConnectionStatus status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED); + } + + @Test + void testDiscover() throws Exception { + final AirbyteCatalog expectedCatalog = expectedCatalogForDiscover(); + final AirbyteCatalog actualCatalog = getSource().discover(getConfig()); + + assertEquals( + expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList()), + actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList())); + } + + protected AirbyteCatalog expectedCatalogForDiscover() { + final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); + + createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); + + final List streams = expectedCatalog.getStreams(); + // stream with PK + streams.get(0).setSourceDefinedCursor(true); + addCdcMetadataColumns(streams.get(0)); + + final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_2", + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER), + Field.of(COL_MODEL, JsonSchemaType.STRING)); + streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); + streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + addCdcMetadataColumns(streamWithoutPK); + + final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_random", + MODELS_SCHEMA + "_random", + Field.of(COL_ID + "_random", JsonSchemaType.NUMBER), + Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER), + Field.of(COL_MODEL + "_random", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))); + addCdcMetadataColumns(randomStream); + + streams.add(streamWithoutPK); + streams.add(randomStream); + expectedCatalog.withStreams(streams); + return expectedCatalog; + } + + protected abstract CdcTargetPosition cdcLatestTargetPosition(); + + protected abstract CdcTargetPosition extractPosition(JsonNode record); + + protected abstract void assertNullCdcMetaData(JsonNode data); + + protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull); + + protected abstract void removeCDCColumns(ObjectNode data); + + protected abstract void addCdcMetadataColumns(AirbyteStream stream); + + protected abstract Source getSource(); + + protected abstract JsonNode getConfig(); + + protected abstract Database getDatabase(); + + protected abstract void assertExpectedStateMessages(List stateMessages); + +} diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 3ddb54390e49..3fec401df68a 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -15,7 +15,7 @@ dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium') + implementation project(':airbyte-integrations:bases:debezium-v1-4-2') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') @@ -23,7 +23,7 @@ dependencies { implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final' implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-4-2')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 128a46a2afa9..e99e34ca0d60 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -13,7 +13,7 @@ application { dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium') + implementation project(':airbyte-integrations:bases:debezium-v1-4-2') implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:connectors:source-relational-db') @@ -21,7 +21,7 @@ dependencies { implementation 'mysql:mysql-connector-java:8.0.22' implementation 'org.apache.commons:commons-lang3:3.11' - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-4-2')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation libs.connectors.testcontainers.mysql diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 59e1fa31ccf3..82816c246cfe 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -13,7 +13,7 @@ application { dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium') + implementation project(':airbyte-integrations:bases:debezium-v1-9-2') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') @@ -21,7 +21,7 @@ dependencies { implementation 'org.apache.commons:commons-lang3:3.11' implementation libs.postgresql - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation project(":airbyte-json-validation") testImplementation project(':airbyte-test-utils') diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index 0e8ac93958c9..a09044fccd46 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -13,7 +13,7 @@ static Properties getDebeziumProperties(final JsonNode config) { final Properties props = new Properties(); props.setProperty("plugin.name", PostgresUtils.getPluginValue(config.get("replication_method"))); props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - props.setProperty("snapshot.mode", "exported"); + props.setProperty("snapshot.mode", "initial"); props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText()); props.setProperty("publication.name", config.get("replication_method").get("publication").asText()); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index 19df7527dddf..8deeccb53c64 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -91,12 +91,12 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} */ database.query(ctx -> { - ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); - ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); ctx.execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); ctx.execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); ctx.execute("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); ctx.execute("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); + ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); return null; }); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 477784eda98e..2a5b46975c8b 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -20,6 +20,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.Database; import io.airbyte.db.PgLsn; import io.airbyte.db.factory.DSLContextFactory; @@ -31,11 +33,14 @@ import io.airbyte.integrations.debezium.CdcSourceTest; import io.airbyte.integrations.debezium.CdcTargetPosition; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import java.util.List; +import java.util.Set; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterEach; @@ -47,12 +52,12 @@ abstract class CdcPostgresSourceTest extends CdcSourceTest { - private static final String SLOT_NAME_BASE = "debezium_slot"; - private static final String PUBLICATION = "publication"; + protected static final String SLOT_NAME_BASE = "debezium_slot"; + protected static final String PUBLICATION = "publication"; private PostgreSQLContainer container; - private String dbName; - private Database database; + protected String dbName; + protected Database database; private DSLContext dslContext; private PostgresSource source; private JsonNode config; @@ -83,6 +88,7 @@ protected void setup() throws SQLException { final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName; dslContext = getDslContext(config); database = getDatabase(dslContext); + super.setup(); database.query(ctx -> { ctx.execute("SELECT pg_create_logical_replication_slot('" + fullReplicationSlot + "', '" + getPluginName() + "');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); @@ -90,7 +96,6 @@ protected void setup() throws SQLException { return null; }); - super.setup(); } private JsonNode getConfig(final String dbName) { @@ -247,4 +252,69 @@ public String createSchemaQuery(final String schemaName) { return "CREATE SCHEMA " + schemaName + ";"; } + @Override + @Test + public void testRecordsProducedDuringAndAfterSync() throws Exception { + + final int recordsToCreate = 20; + // first batch of records. 20 created here and 6 created in setup method. + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + final AutoCloseableIterator firstBatchIterator = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List dataFromFirstBatch = AutoCloseableIterators + .toListAndClose(firstBatchIterator); + final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); + assertEquals(1, stateAfterFirstBatch.size()); + assertNotNull(stateAfterFirstBatch.get(0).getData()); + assertExpectedStateMessages(stateAfterFirstBatch); + final Set recordsFromFirstBatch = extractRecordMessages( + dataFromFirstBatch); + assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size()); + + // second batch of records again 20 being created + for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + } + + final JsonNode state = stateAfterFirstBatch.get(0).getData(); + final AutoCloseableIterator secondBatchIterator = getSource() + .read(getConfig(), CONFIGURED_CATALOG, state); + final List dataFromSecondBatch = AutoCloseableIterators + .toListAndClose(secondBatchIterator); + + final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); + assertEquals(1, stateAfterSecondBatch.size()); + assertNotNull(stateAfterSecondBatch.get(0).getData()); + assertExpectedStateMessages(stateAfterSecondBatch); + + final Set recordsFromSecondBatch = extractRecordMessages( + dataFromSecondBatch); + assertEquals(recordsToCreate * 2, recordsFromSecondBatch.size(), + "Expected 40 records to be replicated in the second sync."); + + // sometimes there can be more than one of these at the end of the snapshot and just before the + // first incremental. + final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( + recordsFromFirstBatch); + final Set recordsFromSecondBatchWithoutDuplicates = removeDuplicates( + recordsFromSecondBatch); + + final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); + assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), + "Expected first sync to include records created while the test was running."); + assertEquals((recordsToCreate * 3) + recordsCreatedBeforeTestCount, + recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates + .size()); + } } diff --git a/settings.gradle b/settings.gradle index 37920ada90b9..639b31d81880 100644 --- a/settings.gradle +++ b/settings.gradle @@ -90,7 +90,8 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:bases:standard-destination-test' include ':airbyte-integrations:bases:standard-source-test' include ':airbyte-integrations:connector-templates:generator' - include ':airbyte-integrations:bases:debezium' + include ':airbyte-integrations:bases:debezium-v1-4-2' + include ':airbyte-integrations:bases:debezium-v1-9-2' // Needed by normalization integration tests include ':airbyte-integrations:connectors:destination-bigquery'