Skip to content

Commit

Permalink
attempt to commit if any records were flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 26, 2021
1 parent c9a6c1e commit d39b2a7
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@
* <p>
* All other message types are ignored.
* </p>
*
* <p>
* Throughout the lifecycle of the consumer, records get promoted from buffered to flushed to
* committed. A record when it is received is immediately buffered. When the buffer fills up, all
* buffered records are flushed out of memory using the user-provided recordWriter. When this flush
* happens, a state message is moved from pending to flushed. On close, if the user-provided onClose
* function is successful, then the flushed state record is considered committed and is then
* emitted. We expect this class to only ever emit either 1 state message (in the case of a full or
* partial success) or 0 state messages (in the case where the onClose step was never reached or did
* not complete without exception).
* </p>
*/
public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {

Expand All @@ -85,7 +96,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private boolean hasStarted;
private boolean hasClosed;

private AirbyteMessage lastCommittedState;
private AirbyteMessage lastFlushedState;
private AirbyteMessage pendingState;

public BufferedStreamConsumer(Consumer<AirbyteMessage> outputRecordCollector,
Expand Down Expand Up @@ -165,7 +176,7 @@ private void flushQueueToDestination() throws Exception {
}

if (pendingState != null) {
lastCommittedState = pendingState;
lastFlushedState = pendingState;
pendingState = null;
}
}
Expand All @@ -192,13 +203,12 @@ protected void close(boolean hasFailed) throws Exception {
}

try {
onClose.accept(hasFailed);
// todo (cgardens) - For now we are using this conditional to maintain existing behavior. When we
// enable checkpointing, we will need to get feedback from onClose on whether any data was persisted
// or not. If it was then, the state message will be emitted.
if (!hasFailed && lastCommittedState != null) {
outputRecordCollector.accept(lastCommittedState);
}
// means at least one state message worth of records were committed, so we will try complete the
// task.
onClose.accept(lastFlushedState != null);
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
outputRecordCollector.accept(lastFlushedState);
} catch (Exception e) {
LOGGER.error("on close failed.", e);
}
Expand Down

0 comments on commit d39b2a7

Please sign in to comment.