diff --git a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index d46b843cb811..383c2e63f65f 100644 --- a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -71,13 +71,13 @@ public abstract class CdcSourceTest { private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class); - private static final String MODELS_SCHEMA = "models_schema"; - private static final String MODELS_STREAM_NAME = "models"; + protected static final String MODELS_SCHEMA = "models_schema"; + protected static final String MODELS_STREAM_NAME = "models"; private static final Set STREAM_NAMES = Sets .newHashSet(MODELS_STREAM_NAME); - private static final String COL_ID = "id"; - private static final String COL_MAKE_ID = "make_id"; - private static final String COL_MODEL = "model"; + protected static final String COL_ID = "id"; + protected static final String COL_MAKE_ID = "make_id"; + protected static final String COL_MODEL = "model"; protected static final String DB_NAME = MODELS_SCHEMA; private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( @@ -89,7 +89,7 @@ public abstract class CdcSourceTest { Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))))); - private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers + protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers .toDefaultConfiguredCatalog(CATALOG); // set all streams to incremental. @@ -105,7 +105,7 @@ public abstract class CdcSourceTest { Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")), Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350"))); - protected void setup() { + protected void setup() throws SQLException { createAndPopulateTables(); } @@ -155,8 +155,7 @@ private void createAndPopulateActualTable() { */ private void createAndPopulateRandomTable() { createSchema(MODELS_SCHEMA + "_random"); - createTable(MODELS_SCHEMA + "_random", - MODELS_STREAM_NAME + "_random", + createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random", COL_ID + "_random")); @@ -284,13 +283,13 @@ private void assertExpectedRecords(Set expectedRecords, @Test @DisplayName("On the first sync, produce returns records that exist in the database.") void testExistingData() throws Exception { + CdcTargetPosition targetPosition = cdcLatestTargetPosition(); final AutoCloseableIterator read = getSource().read(getConfig(), CONFIGURED_CATALOG, null); final List actualRecords = AutoCloseableIterators.toListAndClose(read); final Set recordMessages = extractRecordMessages(actualRecords); final List stateMessages = extractStateMessages(actualRecords); - CdcTargetPosition targetPosition = cdcLatestTargetPosition(); assertNotNull(targetPosition); recordMessages.forEach(record -> { assertEquals(extractPosition(record.getData()), targetPosition); @@ -559,6 +558,17 @@ void testCheck() throws Exception { @Test void testDiscover() throws Exception { + final AirbyteCatalog expectedCatalog = expectedCatalogForDiscover(); + final AirbyteCatalog actualCatalog = getSource().discover(getConfig()); + + assertEquals( + expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList()), + actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList())); + } + + protected AirbyteCatalog expectedCatalogForDiscover() { final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200)", COL_ID, COL_MAKE_ID, COL_MODEL)); @@ -580,14 +590,7 @@ void testDiscover() throws Exception { streams.add(streamWithoutPK); expectedCatalog.withStreams(streams); - - final AirbyteCatalog actualCatalog = getSource().discover(getConfig()); - - assertEquals( - expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) - .collect(Collectors.toList()), - actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) - .collect(Collectors.toList())); + return expectedCatalog; } protected abstract CdcTargetPosition cdcLatestTargetPosition(); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 96c9e605ae03..18e179e441f9 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -62,7 +62,6 @@ public abstract class AbstractJdbcSource extends AbstractRelationalDbSource. We - * deserialize it to a Map so that the state file can be human readable. If we ever - * discover that any of the contents of these offset files is not string serializable we will likely - * have to drop the human readability support and just base64 encode it. - */ -public class AirbyteFileOffsetBackingStore { - - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class); - - private final Path offsetFilePath; - - public AirbyteFileOffsetBackingStore(final Path offsetFilePath) { - this.offsetFilePath = offsetFilePath; - } - - public Path getOffsetFilePath() { - return offsetFilePath; - } - - public CdcState read() { - final Map raw = load(); - - final Map mappedAsStrings = raw.entrySet().stream().collect(Collectors.toMap( - e -> byteBufferToString(e.getKey()), - e -> byteBufferToString(e.getValue()))); - final JsonNode asJson = Jsons.jsonNode(mappedAsStrings); - - LOGGER.info("debezium state: {}", asJson); - - return new CdcState().withState(asJson); - } - - @SuppressWarnings("unchecked") - public void persist(CdcState cdcState) { - final Map mapAsString = - cdcState != null && cdcState.getState() != null ? Jsons.object(cdcState.getState(), Map.class) : Collections.emptyMap(); - final Map mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap( - e -> stringToByteBuffer(e.getKey()), - e -> stringToByteBuffer(e.getValue()))); - - FileUtils.deleteQuietly(offsetFilePath.toFile()); - save(mappedAsStrings); - } - - private static String byteBufferToString(ByteBuffer byteBuffer) { - Preconditions.checkNotNull(byteBuffer); - return new String(byteBuffer.array(), StandardCharsets.UTF_8); - } - - private static ByteBuffer stringToByteBuffer(String s) { - Preconditions.checkNotNull(s); - return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); - } - - /** - * See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this - * method is not public. - */ - @SuppressWarnings("unchecked") - private Map load() { - try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) { - final Object obj = is.readObject(); - if (!(obj instanceof HashMap)) - throw new ConnectException("Expected HashMap but found " + obj.getClass()); - final Map raw = (Map) obj; - final Map data = new HashMap<>(); - for (Map.Entry mapEntry : raw.entrySet()) { - final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; - final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; - data.put(key, value); - } - - return data; - } catch (NoSuchFileException | EOFException e) { - // NoSuchFileException: Ignore, may be new. - // EOFException: Ignore, this means the file was missing or corrupt - return Collections.emptyMap(); - } catch (IOException | ClassNotFoundException e) { - throw new ConnectException(e); - } - } - - /** - * See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this - * method is not public. - */ - private void save(Map data) { - try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) { - Map raw = new HashMap<>(); - for (Map.Entry mapEntry : data.entrySet()) { - byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; - byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; - raw.put(key, value); - } - os.writeObject(raw); - } catch (IOException e) { - throw new ConnectException(e); - } - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumEventUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumEventUtils.java deleted file mode 100644 index 145d69aa8678..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumEventUtils.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.debezium.engine.ChangeEvent; -import java.time.Instant; - -public class DebeziumEventUtils { - - public static AirbyteMessage toAirbyteMessage(ChangeEvent event, Instant emittedAt) { - final JsonNode debeziumRecord = Jsons.deserialize(event.value()); - final JsonNode before = debeziumRecord.get("before"); - final JsonNode after = debeziumRecord.get("after"); - final JsonNode source = debeziumRecord.get("source"); - - final JsonNode data = formatDebeziumData(before, after, source); - final String schemaName = source.get("schema").asText(); - final String streamName = source.get("table").asText(); - - final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage() - .withStream(streamName) - .withNamespace(schemaName) - .withEmittedAt(emittedAt.toEpochMilli()) - .withData(data); - - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(airbyteRecordMessage); - } - - // warning mutates input args. - private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) { - final ObjectNode base = (ObjectNode) (after.isNull() ? before : after); - - long transactionMillis = source.get("ts_ms").asLong(); - long lsn = source.get("lsn").asLong(); - - base.put("_ab_cdc_updated_at", transactionMillis); - base.put("_ab_cdc_lsn", lsn); - - if (after.isNull()) { - base.put("_ab_cdc_deleted_at", transactionMillis); - } else { - base.put("_ab_cdc_deleted_at", (Long) null); - } - - return base; - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java deleted file mode 100644 index 2507e2faec63..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordIterator.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import com.google.common.collect.AbstractIterator; -import io.airbyte.commons.concurrency.VoidCallable; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.MoreBooleans; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.db.PgLsn; -import io.debezium.engine.ChangeEvent; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Optional; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The record iterator is the consumer (in the producer / consumer relationship with debezium) is - * responsible for 1. making sure every record produced by the record publisher is processed 2. - * signalling to the record publisher when it is time for it to stop producing records. It emits - * this signal either when the publisher had not produced a new record for a long time or when it - * has processed at least all of the records that were present in the database when the source was - * started. Because the publisher might publish more records between the consumer sending this - * signal and the publisher actually shutting down, the consumer must stay alive as long as the - * publisher is not closed or if there are any new records for it to process (even if the publisher - * is closed). - */ -public class DebeziumRecordIterator extends AbstractIterator> - implements AutoCloseableIterator> { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class); - - private static final TimeUnit SLEEP_TIME_UNIT = TimeUnit.SECONDS; - private static final int SLEEP_TIME_AMOUNT = 5; - - private final LinkedBlockingQueue> queue; - private final PgLsn targetLsn; - private final Supplier publisherStatusSupplier; - private final VoidCallable requestClose; - - public DebeziumRecordIterator(LinkedBlockingQueue> queue, - PgLsn targetLsn, - Supplier publisherStatusSupplier, - VoidCallable requestClose) { - this.queue = queue; - this.targetLsn = targetLsn; - this.publisherStatusSupplier = publisherStatusSupplier; - this.requestClose = requestClose; - } - - @Override - protected ChangeEvent computeNext() { - /* - * keep trying until the publisher is closed or until the queue is empty. the latter case is - * possible when the publisher has shutdown but the consumer has not yet processed all messages it - * emitted. - */ - while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { - final ChangeEvent next; - try { - next = queue.poll(SLEEP_TIME_AMOUNT, SLEEP_TIME_UNIT); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - // if within the allotted time the consumer could not get a record, tell the producer to shutdown. - if (next == null) { - requestClose(); - LOGGER.info("no record found. polling again."); - continue; - } - - /* - * if the last record matches the target LSN, it is time to tell the producer to shutdown. note: - * that it is possible for the producer to emit more events after the shutdown is signaled. we - * guarantee we get up to a certain LSN but we don't necessarily stop exactly at it. we can go past - * it a little bit. - */ - if (shouldSignalClose(next)) { - requestClose(); - } - - return next; - } - return endOfData(); - } - - @Override - public void close() throws Exception { - requestClose.call(); - } - - /** - * Determine whether the given event is at or above the LSN we are looking to stop at. The logic - * here is a little nuanced. When running in "snapshot" mode, the LSN in all of the events is the - * LSN at the time that Debezium ran the query to get the records (not the LSN of when the record - * was last updated). So we need to handle records emitted from a snapshot record specially. - * Therefore the logic is, if the LSN is below the target LSN then we should keep going (this is - * easy; same for snapshot and non-snapshot). If the LSN is greater than or equal to the target we - * check to see if the record is a snapshot record. If it is not a snapshot record we should stop. - * If it is a snapshot record (and it is not the last snapshot record) then we should keep going. If - * it is the last snapshot record, then we should stop. - * - * @param event - event with LSN to check. - * @return whether or not the event is at or above the LSN we are looking for. - */ - private boolean shouldSignalClose(ChangeEvent event) { - final PgLsn eventLsn = extractLsn(event); - - if (targetLsn.compareTo(eventLsn) > 0) { - return false; - } else { - final SnapshotMetadata snapshotMetadata = getSnapshotMetadata(event); - // if not snapshot or is snapshot but last record in snapshot. - return SnapshotMetadata.TRUE != snapshotMetadata; - } - } - - private SnapshotMetadata getSnapshotMetadata(ChangeEvent event) { - try { - /* - * Debezium emits EmbeddedEngineChangeEvent, but that class is not public and it is hidden behind - * the ChangeEvent iface. The EmbeddedEngineChangeEvent contains the information about whether the - * record was emitted in snapshot mode or not, which we need to determine whether to stop producing - * records or not. Thus we use reflection to access that hidden information. - */ - final Method sourceRecordMethod = event.getClass().getMethod("sourceRecord"); - sourceRecordMethod.setAccessible(true); - final SourceRecord sourceRecord = (SourceRecord) sourceRecordMethod.invoke(event); - final String snapshot = ((Struct) sourceRecord.value()).getStruct("source").getString("snapshot"); - - if (snapshot == null) { - return null; - } - - // the snapshot field is an enum of true, false, and last. - return SnapshotMetadata.valueOf(snapshot.toUpperCase()); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - private PgLsn extractLsn(ChangeEvent event) { - return Optional.ofNullable(event.value()) - .flatMap(value -> Optional.ofNullable(Jsons.deserialize(value).get("source"))) - .flatMap(source -> Optional.ofNullable(source.get("lsn").asText())) - .map(Long::parseLong) - .map(PgLsn::fromLong) - .orElseThrow(() -> new IllegalStateException("Could not find LSN")); - } - - private void requestClose() { - try { - requestClose.call(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - enum SnapshotMetadata { - TRUE, - FALSE, - LAST - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java deleted file mode 100644 index 5843e7547c08..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.SyncMode; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; -import io.debezium.engine.spi.OffsetCommitPolicy; -import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import org.codehaus.plexus.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DebeziumRecordPublisher implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class); - private final ExecutorService executor; - private DebeziumEngine> engine; - - private final JsonNode config; - private final ConfiguredAirbyteCatalog catalog; - private final AirbyteFileOffsetBackingStore offsetManager; - - private final AtomicBoolean hasClosed; - private final AtomicBoolean isClosing; - private final AtomicReference thrownError; - private final CountDownLatch engineLatch; - - public DebeziumRecordPublisher(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) { - this.config = config; - this.catalog = catalog; - this.offsetManager = offsetManager; - this.hasClosed = new AtomicBoolean(false); - this.isClosing = new AtomicBoolean(false); - this.thrownError = new AtomicReference<>(); - this.executor = Executors.newSingleThreadExecutor(); - this.engineLatch = new CountDownLatch(1); - } - - public void start(Queue> queue) { - engine = DebeziumEngine.create(Json.class) - .using(getDebeziumProperties(config, catalog, offsetManager)) - .using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) - .notifying(e -> { - // debezium outputs a tombstone event that has a value of null. this is an artifact of how it - // interacts with kafka. we want to ignore it. - // more on the tombstone: - // https://debezium.io/documentation/reference/configuration/event-flattening.html - if (e.value() != null) { - queue.add(e); - } - }) - .using((success, message, error) -> { - LOGGER.info("Debezium engine shutdown."); - thrownError.set(error); - engineLatch.countDown(); - }) - .build(); - - // Run the engine asynchronously ... - executor.execute(engine); - } - - public boolean hasClosed() { - return hasClosed.get(); - } - - public void close() throws Exception { - if (isClosing.compareAndSet(false, true)) { - // consumers should assume records can be produced until engine has closed. - if (engine != null) { - engine.close(); - } - - // wait for closure before shutting down executor service - engineLatch.await(5, TimeUnit.MINUTES); - - // shut down and await for thread to actually go down - executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); - - // after the engine is completely off, we can mark this as closed - hasClosed.set(true); - - if (thrownError.get() != null) { - throw new RuntimeException(thrownError.get()); - } - } - } - - protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) { - final Properties props = new Properties(); - - // debezium engine configuration - props.setProperty("name", "engine"); - props.setProperty("plugin.name", "pgoutput"); - props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); - props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); - props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer - props.setProperty("snapshot.mode", "exported"); - - // https://debezium.io/documentation/reference/configuration/avro.html - props.setProperty("key.converter.schemas.enable", "false"); - props.setProperty("value.converter.schemas.enable", "false"); - - // debezium names - props.setProperty("name", config.get("database").asText()); - props.setProperty("database.server.name", config.get("database").asText()); - - // db connection configuration - props.setProperty("database.hostname", config.get("host").asText()); - props.setProperty("database.port", config.get("port").asText()); - props.setProperty("database.user", config.get("username").asText()); - props.setProperty("database.dbname", config.get("database").asText()); - - if (config.has("password")) { - props.setProperty("database.password", config.get("password").asText()); - } - - props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText()); - props.setProperty("publication.name", config.get("replication_method").get("publication").asText()); - - // By default "decimal.handing.mode=precise" which's caused returning this value as a binary. - // The "double" type may cause a loss of precision, so set Debezium's config to store it as a String - // explicitly in its Kafka messages for more details see: - // https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-decimal-types - // https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation - props.setProperty("decimal.handling.mode", "string"); - - // table selection - final String tableWhitelist = getTableWhitelist(catalog); - props.setProperty("table.include.list", tableWhitelist); - props.setProperty("database.include.list", config.get("database").asText()); - - // recommended when using pgoutput - props.setProperty("publication.autocreate.mode", "disabled"); - - return props; - } - - @VisibleForTesting - protected static String getTableWhitelist(ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams().stream() - .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) - .map(ConfiguredAirbyteStream::getStream) - .map(stream -> stream.getNamespace() + "." + stream.getName()) - // debezium needs commas escaped to split properly - .map(x -> StringUtils.escape(x, new char[] {','}, "\\,")) - .collect(Collectors.joining(",")); - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcConnectorMetadataInjector.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcConnectorMetadataInjector.java new file mode 100644 index 000000000000..1d143fe6f933 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcConnectorMetadataInjector.java @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import static io.airbyte.integrations.source.postgres.PostgresSource.CDC_LSN; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.integrations.debezium.CdcMetadataInjector; + +public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector { + + @Override + public void addMetaData(ObjectNode event, JsonNode source) { + long lsn = source.get("lsn").asLong(); + event.put(CDC_LSN, lsn); + } + + @Override + public String namespace(JsonNode source) { + return source.get("schema").asText(); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java new file mode 100644 index 000000000000..3223e829c9e8 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Properties; + +public class PostgresCdcProperties { + + static Properties getDebeziumProperties(JsonNode config) { + final Properties props = new Properties(); + props.setProperty("plugin.name", "pgoutput"); + props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + props.setProperty("snapshot.mode", "exported"); + + props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText()); + props.setProperty("publication.name", config.get("replication_method").get("publication").asText()); + + // recommended when using pgoutput + props.setProperty("publication.autocreate.mode", "disabled"); + + return props; + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcSavedInfoFetcher.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcSavedInfoFetcher.java new file mode 100644 index 000000000000..de712f9a4be2 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcSavedInfoFetcher.java @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.debezium.CdcSavedInfoFetcher; +import io.airbyte.integrations.source.relationaldb.models.CdcState; +import java.util.Optional; + +public class PostgresCdcSavedInfoFetcher implements CdcSavedInfoFetcher { + + private final JsonNode savedOffset; + + public PostgresCdcSavedInfoFetcher(CdcState savedState) { + final boolean savedStatePresent = savedState != null && savedState.getState() != null; + this.savedOffset = savedStatePresent ? savedState.getState() : null; + } + + @Override + public JsonNode getSavedOffset() { + return savedOffset; + } + + @Override + public Optional getSavedSchemaHistory() { + return Optional.empty(); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcStateHandler.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcStateHandler.java new file mode 100644 index 000000000000..331baba5dadf --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcStateHandler.java @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.debezium.CdcStateHandler; +import io.airbyte.integrations.source.relationaldb.StateManager; +import io.airbyte.integrations.source.relationaldb.models.CdcState; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresCdcStateHandler implements CdcStateHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcStateHandler.class); + private final StateManager stateManager; + + public PostgresCdcStateHandler(StateManager stateManager) { + this.stateManager = stateManager; + } + + @Override + public AirbyteMessage saveState(Map offset, String dbHistory) { + final JsonNode asJson = Jsons.jsonNode(offset); + LOGGER.info("debezium state: {}", asJson); + CdcState cdcState = new CdcState().withState(asJson); + stateManager.getCdcStateManager().setCdcState(cdcState); + final AirbyteStateMessage stateMessage = stateManager.emit(); + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java new file mode 100644 index 000000000000..6f5fe9440ce1 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java @@ -0,0 +1,93 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.PgLsn; +import io.airbyte.db.PostgresUtils; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.airbyte.integrations.debezium.internals.SnapshotMetadata; +import java.sql.SQLException; +import java.util.Objects; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresCdcTargetPosition implements CdcTargetPosition { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcTargetPosition.class); + private final PgLsn targetLsn; + + public PostgresCdcTargetPosition(PgLsn targetLsn) { + this.targetLsn = targetLsn; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PostgresCdcTargetPosition) { + PostgresCdcTargetPosition cdcTargetPosition = (PostgresCdcTargetPosition) obj; + return cdcTargetPosition.targetLsn.compareTo(targetLsn) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(targetLsn.asLong()); + } + + static PostgresCdcTargetPosition targetPosition(JdbcDatabase database) { + try { + PgLsn lsn = PostgresUtils.getLsn(database); + LOGGER.info("identified target lsn: " + lsn); + return new PostgresCdcTargetPosition(lsn); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean reachedTargetPosition(JsonNode valueAsJson) { + final PgLsn eventLsn = extractLsn(valueAsJson); + + if (targetLsn.compareTo(eventLsn) > 0) { + return false; + } else { + SnapshotMetadata snapshotMetadata = SnapshotMetadata.valueOf(valueAsJson.get("source").get("snapshot").asText().toUpperCase()); + // if not snapshot or is snapshot but last record in snapshot. + return SnapshotMetadata.TRUE != snapshotMetadata; + } + } + + private PgLsn extractLsn(JsonNode valueAsJson) { + return Optional.ofNullable(valueAsJson.get("source")) + .flatMap(source -> Optional.ofNullable(source.get("lsn").asText())) + .map(Long::parseLong) + .map(PgLsn::fromLong) + .orElseThrow(() -> new IllegalStateException("Could not find LSN")); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index d16e2a470b15..cc1a08513d91 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -24,6 +24,8 @@ package io.airbyte.integrations.source.postgres; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; import static java.util.stream.Collectors.toList; import com.fasterxml.jackson.databind.JsonNode; @@ -33,50 +35,36 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.commons.util.CompositeIterator; -import io.airbyte.commons.util.MoreIterators; -import io.airbyte.db.PgLsn; -import io.airbyte.db.PostgresUtils; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.relationaldb.StateManager; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.SyncMode; -import io.debezium.engine.ChangeEvent; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.sql.JDBCType; import java.sql.PreparedStatement; -import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PostgresSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class); + public static final String CDC_LSN = "_ab_cdc_lsn"; static final String DRIVER_CLASS = "org.postgresql.Driver"; @@ -193,28 +181,6 @@ public AutoCloseableIterator read(JsonNode config, ConfiguredAir return super.read(config, catalog, state); } - private static PgLsn getLsn(JdbcDatabase database) { - try { - return PostgresUtils.getLsn(database); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private AirbyteFileOffsetBackingStore initializeState(StateManager stateManager) { - final Path cdcWorkingDir; - try { - cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc"); - } catch (IOException e) { - throw new RuntimeException(e); - } - final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat"); - - final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath); - offsetManager.persist(stateManager.getCdcStateManager().getCdcState()); - return offsetManager; - } - @Override public List> getIncrementalIterators(JdbcDatabase database, ConfiguredAirbyteCatalog catalog, @@ -229,51 +195,13 @@ public List> getIncrementalIterators(JdbcD * have a check here as well to make sure that if no table is in INCREMENTAL mode then skip this * part */ - if (isCdc(database.getSourceConfig())) { - // State works differently in CDC than it does in convention incremental. The state is written to an - // offset file that debezium reads from. Then once all records are replicated, we read back that - // offset file (which will have been updated by debezium) and set it in the state. There is no - // incremental updating of the state structs in the CDC impl. - final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager); - - final PgLsn targetLsn = getLsn(database); - LOGGER.info("identified target lsn: " + targetLsn); - - final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); - - final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(database.getSourceConfig(), catalog, offsetManager); - publisher.start(queue); - - // handle state machine around pub/sub logic. - final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( - queue, - targetLsn, - publisher::hasClosed, - publisher::close); - - // convert to airbyte message. - final AutoCloseableIterator messageIterator = AutoCloseableIterators.transform( - eventIterator, - (event) -> DebeziumEventUtils.toAirbyteMessage(event, emittedAt)); - - // our goal is to get the state at the time this supplier is called (i.e. after all message records - // have been produced) - final Supplier stateMessageSupplier = () -> { - stateManager.getCdcStateManager().setCdcState(offsetManager.read()); - final AirbyteStateMessage stateMessage = stateManager.emit(); - return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); - }; - - // wrap the supplier in an iterator so that we can concat it to the message iterator. - final Iterator stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier); - - // this structure guarantees that the debezium engine will be closed, before we attempt to emit the - // state file. we want this so that we have a guarantee that the debezium offset file (which we use - // to produce the state file) is up-to-date. - final CompositeIterator messageIteratorWithStateDecorator = AutoCloseableIterators - .concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator)); - - return Collections.singletonList(messageIteratorWithStateDecorator); + JsonNode sourceConfig = database.getSourceConfig(); + if (isCdc(sourceConfig)) { + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, PostgresCdcTargetPosition.targetPosition(database), + PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false); + return handler.getIncrementalIterators(new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), + new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(), emittedAt); + } else { return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/AirbyteFileOffsetBackingStoreTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/AirbyteFileOffsetBackingStoreTest.java deleted file mode 100644 index 66bfa02fab43..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/AirbyteFileOffsetBackingStoreTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.source.relationaldb.models.CdcState; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import org.junit.jupiter.api.Test; - -class AirbyteFileOffsetBackingStoreTest { - - @SuppressWarnings("UnstableApiUsage") - @Test - void test() throws IOException { - final Path testRoot = Files.createTempDirectory(Path.of("/tmp"), "offset-store-test"); - - final byte[] bytes = MoreResources.readBytes("test_debezium_offset.dat"); - final Path templateFilePath = testRoot.resolve("template_offset.dat"); - IOs.writeFile(templateFilePath, bytes); - - final Path writeFilePath = testRoot.resolve("offset.dat"); - - final AirbyteFileOffsetBackingStore offsetStore = new AirbyteFileOffsetBackingStore(templateFilePath); - final CdcState stateFromTemplateFile = offsetStore.read(); - - final AirbyteFileOffsetBackingStore offsetStore2 = new AirbyteFileOffsetBackingStore(writeFilePath); - offsetStore2.persist(stateFromTemplateFile); - - final CdcState stateFromOffsetStoreRoundTrip = offsetStore2.read(); - - // verify that, after a round trip through the offset store, we get back the same data. - assertEquals(stateFromTemplateFile, stateFromOffsetStoreRoundTrip); - // verify that the file written by the offset store is identical to the template file. - assertTrue(com.google.common.io.Files.equal(templateFilePath.toFile(), writeFilePath.toFile())); - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index eb8277754387..4a45f6b6ef11 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -24,8 +24,11 @@ package io.airbyte.integrations.source.postgres; -import static java.lang.Thread.sleep; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static io.airbyte.integrations.source.postgres.PostgresSource.CDC_LSN; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -33,174 +36,91 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.collect.Streams; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.string.Strings; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.Database; import io.airbyte.db.Databases; -import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.db.PgLsn; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.debezium.CdcSourceTest; +import io.airbyte.integrations.debezium.CdcTargetPosition; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.jooq.DSLContext; import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -class CdcPostgresSourceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(CdcPostgresSourceTest.class); +class CdcPostgresSourceTest extends CdcSourceTest { private static final String SLOT_NAME_BASE = "debezium_slot"; - private static final String MAKES_SCHEMA = "public"; - private static final String MAKES_STREAM_NAME = "makes"; - private static final String MODELS_SCHEMA = "staging"; - private static final String MODELS_STREAM_NAME = "models"; - private static final Set STREAM_NAMES = Sets.newHashSet(MAKES_STREAM_NAME, MODELS_STREAM_NAME); - private static final String COL_ID = "id"; - private static final String COL_MAKE = "make"; - private static final String COL_MAKE_ID = "make_id"; - private static final String COL_MODEL = "model"; private static final String PUBLICATION = "publication"; - - private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - MAKES_STREAM_NAME, - MAKES_SCHEMA, - Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), - Field.of(COL_MAKE, JsonSchemaPrimitive.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), - CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME, - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), - Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), - Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))))); - private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); - - // set all streams to incremental. - static { - CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); - } - - private static final List MAKE_RECORDS = ImmutableList.of( - Jsons.jsonNode(ImmutableMap.of(COL_ID, 1, COL_MAKE, "Ford")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 2, COL_MAKE, "Mercedes"))); - - private static final List MODEL_RECORDS = ImmutableList.of( - Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350"))); - - private static PostgreSQLContainer PSQL_DB; + private PostgreSQLContainer container; private String dbName; private Database database; private PostgresSource source; + private JsonNode config; - @BeforeAll - static void init() { - PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine") - .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") - .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); - PSQL_DB.start(); - } - - @AfterAll - static void tearDown() { - PSQL_DB.close(); + @AfterEach + void tearDown() throws Exception { + database.close(); + container.close(); } @BeforeEach - void setup() throws Exception { + protected void setup() throws SQLException { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") + .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); + container.start(); source = new PostgresSource(); - dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final String initScriptName = "init_" + dbName.concat(".sql"); final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); - PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); + PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), container); - final JsonNode config = getConfig(PSQL_DB, dbName); + config = getConfig(dbName); final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName; database = getDatabaseFromConfig(config); database.query(ctx -> { ctx.execute("SELECT pg_create_logical_replication_slot('" + fullReplicationSlot + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); - ctx.execute("CREATE SCHEMA " + MODELS_SCHEMA + ";"); - ctx.execute(String.format("CREATE TABLE %s.%s(%s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MAKES_SCHEMA, MAKES_STREAM_NAME, COL_ID, - COL_MAKE, COL_ID)); - ctx.execute(String.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", - MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); - - for (JsonNode recordJson : MAKE_RECORDS) { - writeMakeRecord(ctx, recordJson); - } - - for (JsonNode recordJson : MODEL_RECORDS) { - writeModelRecord(ctx, recordJson); - } return null; }); + + super.setup(); } - private JsonNode getConfig(PostgreSQLContainer psqlDb, String dbName) { + private JsonNode getConfig(String dbName) { final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("replication_slot", SLOT_NAME_BASE + "_" + dbName) .put("publication", PUBLICATION) .build()); return Jsons.jsonNode(ImmutableMap.builder() - .put("host", psqlDb.getHost()) - .put("port", psqlDb.getFirstMappedPort()) + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) .put("database", dbName) - .put("username", psqlDb.getUsername()) - .put("password", psqlDb.getPassword()) + .put("username", container.getUsername()) + .put("password", container.getPassword()) .put("ssl", false) .put("replication_method", replicationMethod) .build()); @@ -218,239 +138,10 @@ private Database getDatabaseFromConfig(JsonNode config) { SQLDialect.POSTGRES); } - @Test - @DisplayName("On the first sync, produce returns records that exist in the database.") - void testExistingData() throws Exception { - final AutoCloseableIterator read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords = AutoCloseableIterators.toListAndClose(read); - - final Set recordMessages = extractRecordMessages(actualRecords); - final List stateMessages = extractStateMessages(actualRecords); - - assertExpectedRecords(Stream.concat(MAKE_RECORDS.stream(), MODEL_RECORDS.stream()).collect(Collectors.toSet()), recordMessages); - assertExpectedStateMessages(stateMessages); - } - - @Test - @DisplayName("When a record is deleted, produces a deletion record.") - void testDelete() throws Exception { - final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final List stateMessages1 = extractStateMessages(actualRecords1); - - assertExpectedStateMessages(stateMessages1); - - database.query(ctx -> { - ctx.execute(String.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, 11)); - return null; - }); - - final JsonNode state = stateMessages1.get(0).getData(); - final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - final List recordMessages2 = new ArrayList<>(extractRecordMessages(actualRecords2)); - final List stateMessages2 = extractStateMessages(actualRecords2); - - assertExpectedStateMessages(stateMessages2); - assertEquals(1, recordMessages2.size()); - assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); - assertNotNull(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_LSN)); - assertNotNull(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_UPDATED_AT)); - assertNotNull(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_DELETED_AT)); - } - - @Test - @DisplayName("When a record is updated, produces an update record.") - void testUpdate() throws Exception { - final String updatedModel = "Explorer"; - final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final List stateMessages1 = extractStateMessages(actualRecords1); - - assertExpectedStateMessages(stateMessages1); - - database.query(ctx -> { - ctx.execute(String.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11)); - return null; - }); - - final JsonNode state = stateMessages1.get(0).getData(); - final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - final List recordMessages2 = new ArrayList<>(extractRecordMessages(actualRecords2)); - final List stateMessages2 = extractStateMessages(actualRecords2); - - assertExpectedStateMessages(stateMessages2); - assertEquals(1, recordMessages2.size()); - assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); - assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText()); - assertNotNull(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_LSN)); - assertNotNull(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_UPDATED_AT)); - assertTrue(recordMessages2.get(0).getData().get(AbstractJdbcSource.CDC_DELETED_AT).isNull()); - } - - @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) - @Test - @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") - void testRecordsProducedDuringAndAfterSync() throws Exception { - final int recordsToCreate = 20; - final AtomicInteger recordsCreated = new AtomicInteger(); - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); - executorService.scheduleAtFixedRate(() -> { - Exceptions.toRuntime(() -> database.query(ctx -> { - if (recordsCreated.get() < recordsToCreate) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap.of(COL_ID, 100 + recordsCreated.get(), COL_MAKE_ID, 1, COL_MODEL, "F-" + recordsCreated.get())); - writeModelRecord(ctx, record); - - recordsCreated.incrementAndGet(); - } - return null; - })); - }, 0, 500, TimeUnit.MILLISECONDS); - - final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - assertExpectedStateMessages(extractStateMessages(actualRecords1)); - - while (recordsCreated.get() != recordsToCreate) { - LOGGER.info("waiting for records to be created."); - sleep(500); - } - executorService.shutdown(); - - final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); - final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - - assertExpectedStateMessages(extractStateMessages(actualRecords2)); - - // sometimes there can be more than one of these at the end of the snapshot and just before the - // first incremental. - final Set recordMessages1 = removeDuplicates(extractRecordMessages(actualRecords1)); - final Set recordMessages2 = removeDuplicates(extractRecordMessages(actualRecords2)); - - final int recordsCreatedBeforeTestCount = MAKE_RECORDS.size() + MODEL_RECORDS.size(); - assertTrue(recordsCreatedBeforeTestCount < recordMessages1.size(), "Expected first sync to include records created while the test was running."); - assertTrue(0 < recordMessages2.size(), "Expected records to be replicated in the second sync."); - LOGGER.info("recordsToCreate = " + recordsToCreate); - LOGGER.info("recordsCreatedBeforeTestCount = " + recordsCreatedBeforeTestCount); - LOGGER.info("recordMessages1.size() = " + recordMessages1.size()); - LOGGER.info("recordMessages2.size() = " + recordMessages2.size()); - assertEquals(recordsToCreate + recordsCreatedBeforeTestCount, recordMessages1.size() + recordMessages2.size()); - } - - private static Set removeDuplicates(Set messages) { - final Set existingDataRecordsWithoutUpdated = new HashSet<>(); - final Set output = new HashSet<>(); - - for (AirbyteRecordMessage message : messages) { - ObjectNode node = message.getData().deepCopy(); - node.remove("_ab_cdc_updated_at"); - - if (existingDataRecordsWithoutUpdated.contains(node)) { - LOGGER.info("Removing duplicate node: " + node); - } else { - output.add(message); - existingDataRecordsWithoutUpdated.add(node); - } - } - - return output; - } - - @Test - @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") - void testCdcAndFullRefreshInSameSync() throws Exception { - final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG); - // set make stream to full refresh. - configuredCatalog.getStreams().get(0).setSyncMode(SyncMode.FULL_REFRESH); - - final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - - final Set recordMessages1 = extractRecordMessages(actualRecords1); - final List stateMessages1 = extractStateMessages(actualRecords1); - - assertExpectedStateMessages(stateMessages1); - assertExpectedRecords( - Stream.concat(MAKE_RECORDS.stream(), MODEL_RECORDS.stream()).collect(Collectors.toSet()), - recordMessages1, - Collections.singleton(MODELS_STREAM_NAME)); - - final JsonNode fiatRecord = Jsons.jsonNode(ImmutableMap.of(COL_ID, 3, COL_MAKE, "Fiat")); - final JsonNode puntoRecord = Jsons.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto")); - database.query(ctx -> { - writeMakeRecord(ctx, fiatRecord); - writeModelRecord(ctx, puntoRecord); - return null; - }); - - final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); - final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), configuredCatalog, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - - final Set recordMessages2 = extractRecordMessages(actualRecords2); - final List stateMessages2 = extractStateMessages(actualRecords2); - - assertExpectedStateMessages(stateMessages2); - // only make stream should full refresh. - assertExpectedRecords( - Streams.concat(MAKE_RECORDS.stream(), Stream.of(fiatRecord, puntoRecord)).collect(Collectors.toSet()), - recordMessages2, - Collections.singleton(MODELS_STREAM_NAME)); - } - - @Test - @DisplayName("When no records exist, no records are returned.") - void testNoData() throws Exception { - database.query(ctx -> { - ctx.execute(String.format("DELETE FROM %s.%s", MAKES_SCHEMA, MAKES_STREAM_NAME)); - return null; - }); - - database.query(ctx -> { - ctx.execute(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME)); - return null; - }); - - final AutoCloseableIterator read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords = AutoCloseableIterators.toListAndClose(read); - - final Set recordMessages = extractRecordMessages(actualRecords); - final List stateMessages = extractStateMessages(actualRecords); - - assertExpectedRecords(Collections.emptySet(), recordMessages); - assertExpectedStateMessages(stateMessages); - } - - @Test - @DisplayName("When no changes have been made to the database since the previous sync, no records are returned.") - void testNoDataOnSecondSync() throws Exception { - final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); - - final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - - final Set recordMessages2 = extractRecordMessages(actualRecords2); - final List stateMessages2 = extractStateMessages(actualRecords2); - - assertExpectedRecords(Collections.emptySet(), recordMessages2); - assertExpectedStateMessages(stateMessages2); - } - - @Test - void testCheck() { - final AirbyteConnectionStatus status = source.check(getConfig(PSQL_DB, dbName)); - assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED); - } - @Test void testCheckWithoutPublication() throws SQLException { database.query(ctx -> ctx.execute("DROP PUBLICATION " + PUBLICATION + ";")); - final AirbyteConnectionStatus status = source.check(getConfig(PSQL_DB, dbName)); + final AirbyteConnectionStatus status = source.check(config); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); } @@ -459,7 +150,7 @@ void testCheckWithoutReplicationSlot() throws SQLException { final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName; database.query(ctx -> ctx.execute("SELECT pg_drop_replication_slot('" + fullReplicationSlot + "');")); - final AirbyteConnectionStatus status = source.check(getConfig(PSQL_DB, dbName)); + final AirbyteConnectionStatus status = source.check(config); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); } @@ -468,7 +159,7 @@ void testReadWithoutPublication() throws SQLException { database.query(ctx -> ctx.execute("DROP PUBLICATION " + PUBLICATION + ";")); assertThrows(Exception.class, () -> { - source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); + source.read(config, CONFIGURED_CATALOG, null); }); } @@ -478,116 +169,109 @@ void testReadWithoutReplicationSlot() throws SQLException { database.query(ctx -> ctx.execute("SELECT pg_drop_replication_slot('" + fullReplicationSlot + "');")); assertThrows(Exception.class, () -> { - source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); + source.read(config, CONFIGURED_CATALOG, null); }); } - @Test - void testDiscover() throws Exception { - final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); + @Override + protected void assertExpectedStateMessages(List stateMessages) { + assertEquals(1, stateMessages.size()); + assertNotNull(stateMessages.get(0).getData()); + } - // stream with PK - expectedCatalog.getStreams().get(0).setSourceDefinedCursor(true); - addCdcMetadataColumns(expectedCatalog.getStreams().get(0)); + @Override + protected CdcTargetPosition cdcLatestTargetPosition() { + JdbcDatabase database = Databases.createJdbcDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:postgresql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText()), + "org.postgresql.Driver"); + return PostgresCdcTargetPosition.targetPosition(database); + } - // stream with no PK. - expectedCatalog.getStreams().get(1).setSourceDefinedPrimaryKey(Collections.emptyList()); - expectedCatalog.getStreams().get(1).setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); - addCdcMetadataColumns(expectedCatalog.getStreams().get(1)); + @Override + protected CdcTargetPosition extractPosition(JsonNode record) { + return new PostgresCdcTargetPosition(PgLsn.fromLong(record.get(CDC_LSN).asLong())); + } - database.query(ctx -> ctx.execute(String.format("ALTER TABLE %s.%s DROP CONSTRAINT models_pkey", MODELS_SCHEMA, MODELS_STREAM_NAME))); + @Override + protected void assertNullCdcMetaData(JsonNode data) { + assertNull(data.get(CDC_LSN)); + assertNull(data.get(CDC_UPDATED_AT)); + assertNull(data.get(CDC_DELETED_AT)); + } - final AirbyteCatalog actualCatalog = source.discover(getConfig(PSQL_DB, dbName)); + @Override + protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { + assertNotNull(data.get(CDC_LSN)); + assertNotNull(data.get(CDC_UPDATED_AT)); + if (deletedAtNull) { + assertTrue(data.get(CDC_DELETED_AT).isNull()); + } else { + assertFalse(data.get(CDC_DELETED_AT).isNull()); + } + } - assertEquals( - expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)).collect(Collectors.toList()), - actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)).collect(Collectors.toList())); + @Override + protected void removeCDCColumns(ObjectNode data) { + data.remove(CDC_LSN); + data.remove(CDC_UPDATED_AT); + data.remove(CDC_DELETED_AT); } - private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { + @Override + protected void addCdcMetadataColumns(AirbyteStream stream) { ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); - properties.set(AbstractJdbcSource.CDC_LSN, numberType); - properties.set(AbstractJdbcSource.CDC_UPDATED_AT, numberType); - properties.set(AbstractJdbcSource.CDC_DELETED_AT, numberType); - - return stream; - } + properties.set(CDC_LSN, numberType); + properties.set(CDC_UPDATED_AT, numberType); + properties.set(CDC_DELETED_AT, numberType); - private void writeMakeRecord(DSLContext ctx, JsonNode recordJson) { - ctx.execute(String.format("INSERT INTO %s.%s (%s, %s) VALUES (%s, '%s');", MAKES_SCHEMA, MAKES_STREAM_NAME, COL_ID, COL_MAKE, - recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE).asText())); } - private void writeModelRecord(DSLContext ctx, JsonNode recordJson) { - ctx.execute( - String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, - recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE_ID).asInt(), recordJson.get(COL_MODEL).asText())); + @Override + protected Source getSource() { + return source; } - private Set extractRecordMessages(List messages) { - final List recordMessageList = messages - .stream() - .filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord) - .collect(Collectors.toList()); - final Set recordMessageSet = new HashSet<>(recordMessageList); - - assertEquals(recordMessageList.size(), recordMessageSet.size(), "Expected no duplicates in airbyte record message output for a single sync."); - - return recordMessageSet; - } - - private List extractStateMessages(List messages) { - return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState).collect(Collectors.toList()); + @Override + protected JsonNode getConfig() { + return config; } - private static void assertExpectedStateMessages(List stateMessages) { - assertEquals(1, stateMessages.size()); - assertNotNull(stateMessages.get(0).getData()); + @Override + protected Database getDatabase() { + return database; } - private static void assertExpectedRecords(Set expectedRecords, Set actualRecords) { - // assume all streams are cdc. - assertExpectedRecords( - expectedRecords, - actualRecords, - actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet())); + @Override + public String createSchemaQuery(String schemaName) { + return "CREATE SCHEMA " + schemaName + ";"; } - private static void assertExpectedRecords(Set expectedRecords, Set actualRecords, Set cdcStreams) { - final Set actualData = actualRecords - .stream() - .map(recordMessage -> { - assertTrue(STREAM_NAMES.contains(recordMessage.getStream())); - assertNotNull(recordMessage.getEmittedAt()); - if (recordMessage.getStream().equals(MAKES_STREAM_NAME)) { - assertEquals(MAKES_SCHEMA, recordMessage.getNamespace()); - } else { - assertEquals(MODELS_SCHEMA, recordMessage.getNamespace()); - } - - final JsonNode data = recordMessage.getData(); - - if (cdcStreams.contains(recordMessage.getStream())) { - assertNotNull(data.get(AbstractJdbcSource.CDC_LSN)); - assertNotNull(data.get(AbstractJdbcSource.CDC_UPDATED_AT)); - } else { - assertNull(data.get(AbstractJdbcSource.CDC_LSN)); - assertNull(data.get(AbstractJdbcSource.CDC_UPDATED_AT)); - assertNull(data.get(AbstractJdbcSource.CDC_DELETED_AT)); - } - - ((ObjectNode) data).remove(AbstractJdbcSource.CDC_LSN); - ((ObjectNode) data).remove(AbstractJdbcSource.CDC_UPDATED_AT); - ((ObjectNode) data).remove(AbstractJdbcSource.CDC_DELETED_AT); - - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(expectedRecords, actualData); + @Override + protected AirbyteCatalog expectedCatalogForDiscover() { + AirbyteCatalog catalog = super.expectedCatalogForDiscover(); + List streams = catalog.getStreams(); + + AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_random", + MODELS_SCHEMA + "_random", + Field.of(COL_ID + "_random", JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID + "_random", JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL + "_random", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))); + addCdcMetadataColumns(randomStream); + streams.add(randomStream); + catalog.withStreams(streams); + return catalog; } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumEventUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumEventUtilsTest.java deleted file mode 100644 index e1090ec51898..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumEventUtilsTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.debezium.engine.ChangeEvent; -import java.io.IOException; -import java.time.Instant; -import org.junit.jupiter.api.Test; - -class DebeziumEventUtilsTest { - - @Test - public void testConvertChangeEvent() throws IOException { - final String stream = "names"; - final Instant emittedAt = Instant.now(); - ChangeEvent insertChangeEvent = mockChangeEvent("insert_change_event.json"); - ChangeEvent updateChangeEvent = mockChangeEvent("update_change_event.json"); - ChangeEvent deleteChangeEvent = mockChangeEvent("delete_change_event.json"); - - final AirbyteMessage actualInsert = DebeziumEventUtils.toAirbyteMessage(insertChangeEvent, emittedAt); - final AirbyteMessage actualUpdate = DebeziumEventUtils.toAirbyteMessage(updateChangeEvent, emittedAt); - final AirbyteMessage actualDelete = DebeziumEventUtils.toAirbyteMessage(deleteChangeEvent, emittedAt); - - final AirbyteMessage expectedInsert = createAirbyteMessage(stream, emittedAt, "insert_message.json"); - final AirbyteMessage expectedUpdate = createAirbyteMessage(stream, emittedAt, "update_message.json"); - final AirbyteMessage expectedDelete = createAirbyteMessage(stream, emittedAt, "delete_message.json"); - - deepCompare(expectedInsert, actualInsert); - deepCompare(expectedUpdate, actualUpdate); - deepCompare(expectedDelete, actualDelete); - } - - private static ChangeEvent mockChangeEvent(String resourceName) throws IOException { - final ChangeEvent mocked = mock(ChangeEvent.class); - final String resource = MoreResources.readResource(resourceName); - when(mocked.value()).thenReturn(resource); - - return mocked; - } - - private static AirbyteMessage createAirbyteMessage(String stream, Instant emittedAt, String resourceName) throws IOException { - final String data = MoreResources.readResource(resourceName); - - final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage() - .withStream(stream) - .withNamespace("public") - .withData(Jsons.deserialize(data)) - .withEmittedAt(emittedAt.toEpochMilli()); - - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(recordMessage); - } - - private static void deepCompare(Object expected, Object actual) { - assertEquals(Jsons.deserialize(Jsons.serialize(expected)), Jsons.deserialize(Jsons.serialize(actual))); - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisherTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisherTest.java deleted file mode 100644 index 22eae9bb0102..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisherTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.source.postgres; - -import static org.junit.jupiter.api.Assertions.*; - -import com.google.common.collect.ImmutableList; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.SyncMode; -import org.junit.jupiter.api.Test; - -class DebeziumRecordPublisherTest { - - @Test - public void testWhitelistCreation() { - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( - CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public").withSyncMode(SyncMode.INCREMENTAL))); - - final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.n\"aMéS"; - final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); - - assertEquals(expectedWhitelist, actualWhitelist); - } - - @Test - public void testWhitelistFiltersFullRefresh() { - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( - CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public").withSyncMode(SyncMode.FULL_REFRESH))); - - final String expectedWhitelist = "public.id_and_name"; - final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); - - assertEquals(expectedWhitelist, actualWhitelist); - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_change_event.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_change_event.json deleted file mode 100644 index 07b575bf7e2c..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": { - "first_name": "san", - "last_name": "goku", - "power": null - }, - "after": null, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775646886, - "snapshot": false, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 498, - "lsn": 23012360, - "xmin": null - }, - "op": "d", - "ts_ms": 1616775646931, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_message.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_message.json deleted file mode 100644 index a14eab66fe17..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/delete_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": null, - "_ab_cdc_updated_at": 1616775646886, - "_ab_cdc_lsn": 23012360, - "_ab_cdc_deleted_at": 1616775646886 -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_change_event.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_change_event.json deleted file mode 100644 index 4b2c2fb6e2cf..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": null, - "after": { - "first_name": "san", - "last_name": "goku", - "power": "Infinity" - }, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775642623, - "snapshot": true, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 495, - "lsn": 23011544, - "xmin": null - }, - "op": "r", - "ts_ms": 1616775642624, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_message.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_message.json deleted file mode 100644 index 46abad6a267a..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/insert_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": "Infinity", - "_ab_cdc_updated_at": 1616775642623, - "_ab_cdc_lsn": 23011544, - "_ab_cdc_deleted_at": null -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/test_debezium_offset.dat b/airbyte-integrations/connectors/source-postgres/src/test/resources/test_debezium_offset.dat deleted file mode 100644 index c7e7054916ed..000000000000 Binary files a/airbyte-integrations/connectors/source-postgres/src/test/resources/test_debezium_offset.dat and /dev/null differ diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/update_change_event.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/update_change_event.json deleted file mode 100644 index da5dcd9c2b06..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/update_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": null, - "after": { - "first_name": "san", - "last_name": "goku", - "power": 10000.2 - }, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775646881, - "snapshot": false, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 497, - "lsn": 23012216, - "xmin": null - }, - "op": "u", - "ts_ms": 1616775646929, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/resources/update_message.json b/airbyte-integrations/connectors/source-postgres/src/test/resources/update_message.json deleted file mode 100644 index 757c5833d253..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/resources/update_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": 10000.2, - "_ab_cdc_updated_at": 1616775646881, - "_ab_cdc_lsn": 23012216, - "_ab_cdc_deleted_at": null -}