Skip to content

Commit

Permalink
Merge pull request apache#2 from aljoscha/finish-pr-2263-flink-passer…
Browse files Browse the repository at this point in the history
…t-check

[BEAM-1726] Obey watermark hold in DoFnOperato.processWatermark2()
  • Loading branch information
aviemzur authored Mar 21, 2017
2 parents a974f3c + fdc36c9 commit 6aba015
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,14 @@ public void processWatermark2(Watermark mark) throws Exception {
pushbackDoFnRunner.finishBundle();

// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
Instant watermarkHold = stateInternals.watermarkHold();

long potentialOutputWatermark = Math.min(currentInputWatermark, watermarkHold.getMillis());

if (potentialOutputWatermark > currentOutputWatermark) {
currentOutputWatermark = potentialOutputWatermark;
output.emitWatermark(new Watermark(currentOutputWatermark));
}
}
}

Expand Down

0 comments on commit 6aba015

Please sign in to comment.