From fdc36c9294111c0bf6bbf068945492e0e2ddbce4 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 21 Mar 2017 10:25:30 +0100 Subject: [PATCH] [BEAM-1726] Obey watermark hold in DoFnOperato.processWatermark2() --- .../translation/wrappers/streaming/DoFnOperator.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 4ea52635235af..f063c8df69943 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -503,7 +503,14 @@ public void processWatermark2(Watermark mark) throws Exception { pushbackDoFnRunner.finishBundle(); // maybe output a new watermark - processWatermark1(new Watermark(currentInputWatermark)); + Instant watermarkHold = stateInternals.watermarkHold(); + + long potentialOutputWatermark = Math.min(currentInputWatermark, watermarkHold.getMillis()); + + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } } }