Skip to content

Commit

Permalink
Merge pull request #29102: [flink] Flush buffer during drain operatio…
Browse files Browse the repository at this point in the history
…n for requiresStableInput operator
  • Loading branch information
je-ik authored Oct 24, 2023
2 parents 3f05945 + 1a27d2c commit bf5ded4
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ public interface FlinkPipelineOptions

void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);

@Description(
"Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,"
+ "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.")
@Default.Boolean(false)
Boolean getEnableStableInputDrain();

void setEnableStableInputDrain(Boolean enableStableInputDrain);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public class DoFnOperator<InputT, OutputT>
/** If true, we must process elements only after a checkpoint is finished. */
final boolean requiresStableInput;

/**
* If both requiresStableInput and this parameter are true, we must flush the buffer during drain
* operation.
*/
final boolean enableStableInputDrain;

final int numConcurrentCheckpoints;

private final boolean usesOnWindowExpiration;
Expand Down Expand Up @@ -323,6 +329,8 @@ public DoFnOperator(
+ Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
}

this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain();

this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();

this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing();
Expand Down Expand Up @@ -626,6 +634,12 @@ void flushData() throws Exception {
while (bundleStarted) {
invokeFinishBundle();
}
if (requiresStableInput && enableStableInputDrain) {
// Flush any buffered events here before draining the pipeline. Note that this is best-effort
// and requiresStableInput contract might be violated in cases where buffer processing fails.
bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE);
updateOutputWatermark();
}
if (currentOutputWatermark < Long.MAX_VALUE) {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,98 @@ public void finishBundle(FinishBundleContext context) {
WindowedValue.valueInGlobalWindow("finishBundle")));
}

@Test
public void testExactlyOnceBufferingFlushDuringDrain() throws Exception {
FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setCheckpointingInterval(1L);
options.setEnableStableInputDrain(true);

TupleTag<String> outputTag = new TupleTag<>("main-output");
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());

numStartBundleCalled = 0;
DoFn<String, String> doFn =
new DoFn<String, String>() {
@StartBundle
public void startBundle(StartBundleContext context) {
numStartBundleCalled += 1;
}

@ProcessElement
// Use RequiresStableInput to force buffering elements
@RequiresStableInput
public void processElement(ProcessContext context) {
context.output(context.element());
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
context.output(
"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
}
};

DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory<>(
outputTag,
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE),
new SerializablePipelineOptions(options));

Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
() ->
new DoFnOperator<>(
doFn,
"stepName",
windowedValueCoder,
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
options,
null,
null,
DoFnSchemaInformation.create(),
Collections.emptyMap());

DoFnOperator<String, String> doFnOperator = doFnOperatorSupplier.get();
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);

testHarness.open();

testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a")));
testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b")));

assertThat(Iterables.size(testHarness.getOutput()), is(0));
assertThat(numStartBundleCalled, is(0));

// Simulate pipeline drain scenario
OperatorSubtaskState backup = testHarness.snapshot(0, 0);
doFnOperator.flushData();

assertThat(numStartBundleCalled, is(1));
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle")));

doFnOperator = doFnOperatorSupplier.get();
testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator);
testHarness.open();

doFnOperator.notifyCheckpointComplete(0L);

assertThat(numStartBundleCalled, is(1));
assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable());
}

@Test
public void testExactlyOnceBufferingKeyed() throws Exception {
FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enableStableInputDrain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>executionModeForBatch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enable_stable_input_drain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>execution_mode_for_batch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down

0 comments on commit bf5ded4

Please sign in to comment.