Skip to content

Commit

Permalink
Update destinations to handle new state messages (#13670)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Jun 14, 2022
1 parent 50f2a34 commit dd3178e
Show file tree
Hide file tree
Showing 8 changed files with 751 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DefaultDestStateLifecycleManager;
import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DestStateLifecycleManager;
import io.airbyte.integrations.destination.record_buffer.BufferingStrategy;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -80,17 +82,11 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private final Map<AirbyteStreamNameNamespacePair, Long> streamToIgnoredRecordCount;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final BufferingStrategy bufferingStrategy;
private final DestStateLifecycleManager stateManager;

private boolean hasStarted;
private boolean hasClosed;

// represents the last state message for which all of it records have been flushed to tmp storage in
// the destination.
private AirbyteMessage lastFlushedToTmpDstState;
// presents the last state message whose state is waiting to be flushed to tmp storage in the
// destination.
private AirbyteMessage pendingState;

public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final VoidCallable onStart,
final BufferingStrategy bufferingStrategy,
Expand All @@ -107,6 +103,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.isValidRecord = isValidRecord;
this.streamToIgnoredRecordCount = new HashMap<>();
this.bufferingStrategy = bufferingStrategy;
this.stateManager = new DefaultDestStateLifecycleManager();
}

@Override
Expand Down Expand Up @@ -143,21 +140,18 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
}

} else if (message.getType() == Type.STATE) {
pendingState = message;
stateManager.addState(message);
} else {
LOGGER.warn("Unexpected message: " + message.getType());
}

}

private void markStatesAsFlushedToTmpDestination() {
if (pendingState != null) {
lastFlushedToTmpDstState = pendingState;
pendingState = null;
}
stateManager.markPendingAsFlushed();
}

private void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
Expand All @@ -181,20 +175,26 @@ protected void close(final boolean hasFailed) throws Exception {
bufferingStrategy.close();

try {
// if no state was emitted (i.e. full refresh), if there were still no failures, then we can
// still succeed.
if (lastFlushedToTmpDstState == null) {
// flushed is empty in 2 cases:
// 1. either it is full refresh (no state is emitted necessarily).
// 2. it is stream but no states were flushed.
// in both of these cases, if there was a failure, we should not bother committing. otherwise,
// attempt to commit.
if (stateManager.listFlushed().isEmpty()) {
onClose.accept(hasFailed);
} else {
// if any state message flushed that means we can still go for at least a partial success.
/*
* if any state message was flushed that means we should try to commit what we have. if
* hasFailed=false, then it could be full success. if hasFailed=true, then going for partial
* success.
*/
onClose.accept(false);
}

// if onClose succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
if (lastFlushedToTmpDstState != null) {
outputRecordCollector.accept(lastFlushedToTmpDstState);
}
stateManager.markFlushedAsCommitted();
stateManager.listCommitted().forEach(outputRecordCollector);
} catch (final Exception e) {
LOGGER.error("Close failed.", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.dest_state_lifecycle_manager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import java.util.Queue;
import java.util.function.Supplier;

/**
* Detects the type of the state being received by anchoring on the first state type it sees. Fail
* if receives states of multiple types--each instance of this class can only support state messages
* of one type. The protocol specifies that a source should emit state messages of a single type
* during a sync, so a single instance of this manager is sufficient for a destination to track
* state during a sync.
*
* Strategy: Delegates state messages of each type to a StateManager that is appropriate to that
* state type.
*
* Per the protocol, if state type is not set, assumes the LEGACY state type.
*/
public class DefaultDestStateLifecycleManager implements DestStateLifecycleManager {

private AirbyteStateType stateType;
private final Supplier<DestStateLifecycleManager> internalStateManagerSupplier;

public DefaultDestStateLifecycleManager() {
this(new DestSingleStateLifecycleManager(), new DestStreamStateLifecycleManager());
}

@VisibleForTesting
DefaultDestStateLifecycleManager(final DestStateLifecycleManager singleStateManager, final DestStateLifecycleManager streamStateManager) {
stateType = null;
// allows us to delegate calls to the appropriate underlying state manager.
internalStateManagerSupplier = () -> {
if (stateType == AirbyteStateType.GLOBAL || stateType == AirbyteStateType.LEGACY || stateType == null) {
return singleStateManager;
} else if (stateType == AirbyteStateType.STREAM) {
return streamStateManager;
} else {
throw new IllegalArgumentException("unrecognized state type");
}
};
}

@Override
public void addState(final AirbyteMessage message) {
Preconditions.checkArgument(message.getType() == Type.STATE, "Messages passed to State Manager must be of type STATE.");
Preconditions.checkArgument(isStateTypeCompatible(stateType, message.getState().getStateType()));

setManagerStateTypeIfNotSet(message);

internalStateManagerSupplier.get().addState(message);
}

/**
* Given the type of previously recorded state by the state manager, determines if a newly added
* state message's type is compatible. Based on the previously set state type, determines if a new
* one is compatible. If the previous state is null, any new state is compatible. If new state type
* is null, it should be treated as LEGACY. Thus, previousStateType == LEGACY and newStateType ==
* null IS compatible. All other state types are compatible based on equality.
*
* @param previousStateType - state type previously recorded by the state manager
* @param newStateType - state message of a newly added message
* @return true if compatible, otherwise false
*/
private static boolean isStateTypeCompatible(final AirbyteStateType previousStateType, final AirbyteStateType newStateType) {
return previousStateType == null || previousStateType == AirbyteStateType.LEGACY && newStateType == null || previousStateType == newStateType;
}

/**
* If the state type for the manager is not set, sets it using the state type from the message. If
* the type on the message is null, we assume it is LEGACY. After the first, state message is added
* to the manager, the state type is set and is immutable.
*
* @param message - state message whose state will be used if internal state type is not set
*/
private void setManagerStateTypeIfNotSet(final AirbyteMessage message) {
// detect and set state type.
if (stateType == null) {
if (message.getState().getStateType() == null) {
stateType = AirbyteStateType.LEGACY;
} else {
stateType = message.getState().getStateType();
}
}
}

@Override
public void markPendingAsFlushed() {
internalStateManagerSupplier.get().markPendingAsFlushed();
}

@Override
public Queue<AirbyteMessage> listFlushed() {
return internalStateManagerSupplier.get().listFlushed();
}

@Override
public void markFlushedAsCommitted() {
internalStateManagerSupplier.get().markFlushedAsCommitted();
}

@Override
public Queue<AirbyteMessage> listCommitted() {
return internalStateManagerSupplier.get().listCommitted();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.dest_state_lifecycle_manager;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
* This {@link DestStateLifecycleManager} handles any state where there is a guarantee that any
* single state message represents the state for the ENTIRE connection. At the time of writing,
* GLOBAL and LEGACY state types are the state type that match this pattern.
*
* Does NOT store duplicates. Because each state message represents the entire state for the
* connection, it only stores (and emits) the LAST state it received at each phase.
*/
public class DestSingleStateLifecycleManager implements DestStateLifecycleManager {

private AirbyteMessage lastPendingState;
private AirbyteMessage lastFlushedState;
private AirbyteMessage lastCommittedState;

@Override
public void addState(final AirbyteMessage message) {
lastPendingState = message;
}

@VisibleForTesting
Queue<AirbyteMessage> listPending() {
return stateMessageToQueue(lastPendingState);
}

@Override
public void markPendingAsFlushed() {
if (lastPendingState != null) {
lastFlushedState = lastPendingState;
lastPendingState = null;
}
}

@Override
public Queue<AirbyteMessage> listFlushed() {
return stateMessageToQueue(lastFlushedState);
}

@Override
public void markFlushedAsCommitted() {
if (lastFlushedState != null) {
lastCommittedState = lastFlushedState;
lastFlushedState = null;
}
}

@Override
public Queue<AirbyteMessage> listCommitted() {
return stateMessageToQueue(lastCommittedState);
}

private static Queue<AirbyteMessage> stateMessageToQueue(final AirbyteMessage stateMessage) {
return new LinkedList<>(stateMessage == null ? Collections.emptyList() : List.of(stateMessage));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.dest_state_lifecycle_manager;

import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Queue;

/**
* This class manages the lifecycle of state message. It tracks state messages that are in 3 states:
* <ol>
* <li>pending - associated records have been accepted by the connector but has NOT been pushed to
* the destination</li>
* <li>flushed - associated records have been flushed to tmp storage in the destination but have NOT
* been committed</li>
* <li>committed - associated records have been committed</li>
* </ol>
*/
public interface DestStateLifecycleManager {

/**
* Accepts a state into the manager. The state starts in a pending state.
*
* @param message - airbyte message of type state
*/
void addState(AirbyteMessage message);

/**
* Moves any tracked state messages that are currently pending to flushed.
*/
void markPendingAsFlushed();

/**
* List all tracked state messages that are flushed.
*
* @return list of state messages
*/
Queue<AirbyteMessage> listFlushed();

/**
* Moves any tracked state messages that are currently flushed to committed.
*/
void markFlushedAsCommitted();

/**
* List all tracked state messages that are committed.
*
* @return list of state messages
*/
Queue<AirbyteMessage> listCommitted();

}
Loading

0 comments on commit dd3178e

Please sign in to comment.