From a5bc4a9896132cfbe51e9c7bfa88c4f91d3c38db Mon Sep 17 00:00:00 2001 From: Subodh Kant Chaturvedi Date: Wed, 14 Jul 2021 01:19:38 +0530 Subject: [PATCH] throw exception if we close engine before snapshot is complete + increase timeout for subsequent records (#4730) * throw exception if we close engine before snapshot is complete + increase timeout for subsequent records * add comment + bump postgres version to use new changes --- .../decd338e-5647-4c0b-adf4-da0e75f5a750.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../internals/DebeziumRecordIterator.java | 45 ++++++++++++++++--- .../connectors/source-postgres/Dockerfile | 2 +- 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index 0de2f166e995..7e0d517aa88d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.3.6", + "dockerImageTag": "0.3.7", "documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres", "icon": "postgresql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 08cca095e0c1..be08178f40ac 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -46,7 +46,7 @@ - sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 name: Postgres dockerRepository: airbyte/source-postgres - dockerImageTag: 0.3.6 + dockerImageTag: 0.3.7 documentationUrl: https://hub.docker.com/r/airbyte/source-postgres icon: postgresql.svg - sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003 diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java index 2c3c4d6c8950..38e9e8298f08 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java @@ -24,6 +24,7 @@ package io.airbyte.integrations.debezium.internals; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.json.Jsons; @@ -54,13 +55,14 @@ public class DebeziumRecordIterator extends AbstractIterator> queue; private final CdcTargetPosition targetPosition; private final Supplier publisherStatusSupplier; private final VoidCallable requestClose; private boolean receivedFirstRecord; + private boolean hasSnapshotFinished; public DebeziumRecordIterator(LinkedBlockingQueue> queue, CdcTargetPosition targetPosition, @@ -71,6 +73,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue> q this.publisherStatusSupplier = publisherStatusSupplier; this.requestClose = requestClose; this.receivedFirstRecord = false; + this.hasSnapshotFinished = true; } @Override @@ -90,13 +93,17 @@ protected ChangeEvent computeNext() { // if within the timeout, the consumer could not get a record, it is time to tell the producer to // shutdown. if (next == null) { + LOGGER.info("Closing cause next is returned as null"); requestClose(); LOGGER.info("no record found. polling again."); continue; } + JsonNode eventAsJson = Jsons.deserialize(next.value()); + hasSnapshotFinished = hasSnapshotFinished(eventAsJson); + // if the last record matches the target file position, it is time to tell the producer to shutdown. - if (shouldSignalClose(next)) { + if (shouldSignalClose(eventAsJson)) { requestClose(); } receivedFirstRecord = true; @@ -105,14 +112,35 @@ protected ChangeEvent computeNext() { return endOfData(); } + private boolean hasSnapshotFinished(JsonNode eventAsJson) { + SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase()); + return SnapshotMetadata.TRUE != snapshot; + } + + /** + * Debezium was built as an ever running process which keeps on listening for new changes on DB and + * immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order + * to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the + * beginning of the sync we define a target position in the logs of the DB. This can be an LSN or + * anything specific to the DB which can help us identify that we have reached a specific position + * in the log based replication When we start processing records from debezium, we extract the the + * log position from the metadata of the record and compare it with our target that we defined at + * the beginning of the sync. If we have reached the target position, we shutdown the debezium + * engine 2. The TargetPosition logic might not always work and in order to tackle that we have + * another logic where if we do not receive records from debezium for a given duration, we ask + * debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is + * running for the first time, we let it complete the snapshot and only after the completion of + * snapshot we should shutdown the engine. If we are closing the engine before completion of + * snapshot, we throw an exception + */ @Override public void close() throws Exception { requestClose.call(); + throwExceptionIfSnapshotNotFinished(); } - private boolean shouldSignalClose(ChangeEvent event) { - - return targetPosition.reachedTargetPosition(Jsons.deserialize(event.value())); + private boolean shouldSignalClose(JsonNode eventAsJson) { + return targetPosition.reachedTargetPosition(eventAsJson); } private void requestClose() { @@ -121,6 +149,13 @@ private void requestClose() { } catch (Exception e) { throw new RuntimeException(e); } + throwExceptionIfSnapshotNotFinished(); + } + + private void throwExceptionIfSnapshotNotFinished() { + if (!hasSnapshotFinished) { + throw new RuntimeException("Closing down debezium engine but snapshot has not finished"); + } } private static class WaitTime { diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 0412f847823f..460e8529390f 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.6 +LABEL io.airbyte.version=0.3.7 LABEL io.airbyte.name=airbyte/source-postgres