Skip to content

Commit

Permalink
[BEAM-1726] Obey watermark hold in DoFnOperato.processWatermark2()
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 21, 2017
1 parent a974f3c commit fdc36c9
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,14 @@ public void processWatermark2(Watermark mark) throws Exception {
pushbackDoFnRunner.finishBundle();

// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
Instant watermarkHold = stateInternals.watermarkHold();

long potentialOutputWatermark = Math.min(currentInputWatermark, watermarkHold.getMillis());

if (potentialOutputWatermark > currentOutputWatermark) {
currentOutputWatermark = potentialOutputWatermark;
output.emitWatermark(new Watermark(currentOutputWatermark));
}
}
}

Expand Down

0 comments on commit fdc36c9

Please sign in to comment.