diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 4ea52635235af..f063c8df69943 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -503,7 +503,14 @@ public void processWatermark2(Watermark mark) throws Exception { pushbackDoFnRunner.finishBundle(); // maybe output a new watermark - processWatermark1(new Watermark(currentInputWatermark)); + Instant watermarkHold = stateInternals.watermarkHold(); + + long potentialOutputWatermark = Math.min(currentInputWatermark, watermarkHold.getMillis()); + + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } } }