Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception if we close engine before snapshot is complete + increase timeout for subsequent records #4730

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -54,13 +55,14 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(5, TimeUnit.SECONDS);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES);

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
CdcTargetPosition targetPosition,
Expand All @@ -71,6 +73,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
}

@Override
Expand All @@ -90,13 +93,17 @@ protected ChangeEvent<String, String> computeNext() {
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
if (next == null) {
LOGGER.info("Closing cause next is returned as null");
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}

JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);

// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (shouldSignalClose(next)) {
if (shouldSignalClose(eventAsJson)) {
requestClose();
}
receivedFirstRecord = true;
Expand All @@ -105,14 +112,19 @@ protected ChangeEvent<String, String> computeNext() {
return endOfData();
}

private boolean hasSnapshotFinished(JsonNode eventAsJson) {
SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
return SnapshotMetadata.TRUE != snapshot;
}

@Override
public void close() throws Exception {
requestClose.call();
throwExceptionIfSnapshotNotFinished();
}

private boolean shouldSignalClose(ChangeEvent<String, String> event) {

return targetPosition.reachedTargetPosition(Jsons.deserialize(event.value()));
private boolean shouldSignalClose(JsonNode eventAsJson) {
return targetPosition.reachedTargetPosition(eventAsJson);
}

private void requestClose() {
Expand All @@ -121,6 +133,13 @@ private void requestClose() {
} catch (Exception e) {
throw new RuntimeException(e);
}
throwExceptionIfSnapshotNotFinished();
}

private void throwExceptionIfSnapshotNotFinished() {
if (!hasSnapshotFinished) {
throw new RuntimeException("Closing down debezium engine but snapshot has not finished");
}
}

private static class WaitTime {
Expand Down