Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception if we close engine before snapshot is complete + increase timeout for subsequent records #4730

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.3.6",
"dockerImageTag": "0.3.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.6
dockerImageTag: 0.3.7
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
icon: postgresql.svg
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -54,13 +55,14 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(5, TimeUnit.SECONDS);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES);

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
CdcTargetPosition targetPosition,
Expand All @@ -71,6 +73,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
}

@Override
Expand All @@ -90,13 +93,17 @@ protected ChangeEvent<String, String> computeNext() {
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
if (next == null) {
LOGGER.info("Closing cause next is returned as null");
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}

JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);

// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (shouldSignalClose(next)) {
if (shouldSignalClose(eventAsJson)) {
requestClose();
}
receivedFirstRecord = true;
Expand All @@ -105,14 +112,35 @@ protected ChangeEvent<String, String> computeNext() {
return endOfData();
}

private boolean hasSnapshotFinished(JsonNode eventAsJson) {
SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
return SnapshotMetadata.TRUE != snapshot;
}

/**
* Debezium was built as an ever running process which keeps on listening for new changes on DB and
* immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
* to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the
* beginning of the sync we define a target position in the logs of the DB. This can be an LSN or
* anything specific to the DB which can help us identify that we have reached a specific position
* in the log based replication When we start processing records from debezium, we extract the the
* log position from the metadata of the record and compare it with our target that we defined at
* the beginning of the sync. If we have reached the target position, we shutdown the debezium
* engine 2. The TargetPosition logic might not always work and in order to tackle that we have
* another logic where if we do not receive records from debezium for a given duration, we ask
* debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is
* running for the first time, we let it complete the snapshot and only after the completion of
* snapshot we should shutdown the engine. If we are closing the engine before completion of
* snapshot, we throw an exception
*/
@Override
public void close() throws Exception {
requestClose.call();
throwExceptionIfSnapshotNotFinished();
}

private boolean shouldSignalClose(ChangeEvent<String, String> event) {

return targetPosition.reachedTargetPosition(Jsons.deserialize(event.value()));
private boolean shouldSignalClose(JsonNode eventAsJson) {
return targetPosition.reachedTargetPosition(eventAsJson);
}

private void requestClose() {
Expand All @@ -121,6 +149,13 @@ private void requestClose() {
} catch (Exception e) {
throw new RuntimeException(e);
}
throwExceptionIfSnapshotNotFinished();
}

private void throwExceptionIfSnapshotNotFinished() {
if (!hasSnapshotFinished) {
throw new RuntimeException("Closing down debezium engine but snapshot has not finished");
}
}

private static class WaitTime {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.version=0.3.7
LABEL io.airbyte.name=airbyte/source-postgres