Skip to content

Commit

Permalink
Merge pull request #31391: [flink] #31390 emit watermark with empty s…
Browse files Browse the repository at this point in the history
…ource
  • Loading branch information
je-ik authored May 25, 2024
2 parents 0790d69 + 5a822ad commit 836e77e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,14 @@ public SourceOutput<OutputT> getAndMaybeCreateSplitOutput(ReaderOutput<OutputT>
return outputForSplit;
}

public boolean startOrAdvance() throws IOException {
public boolean startOrAdvance(ReaderOutput<OutputT> output) throws IOException {
if (started) {
// associate output with the split
getAndMaybeCreateSplitOutput(output);
return invocationUtil.invokeAdvance(reader);
} else {
started = true;
return invocationUtil.invokeStart(reader);
}
started = true;
return invocationUtil.invokeStart(reader);
}

public @Nullable SourceOutput<OutputT> sourceOutput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> ou
maybeEmitWatermark();
maybeCreateReaderForNewSplits();

ReaderAndOutput reader = nextReaderWithData();
ReaderAndOutput reader = nextReaderWithData(output);
if (reader != null) {
emitRecord(reader, output);
return InputStatus.MORE_AVAILABLE;
Expand Down Expand Up @@ -300,12 +300,14 @@ private void maybeCreateReaderForNewSplits() throws Exception {
}
}

private @Nullable ReaderAndOutput nextReaderWithData() throws IOException {
private @Nullable ReaderAndOutput nextReaderWithData(
ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output) throws IOException {

int numReaders = readers.size();
for (int i = 0; i < numReaders; i++) {
ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex);
currentReaderIndex = (currentReaderIndex + 1) % numReaders;
if (readerAndOutput.startOrAdvance()) {
if (readerAndOutput.startOrAdvance(output)) {
return readerAndOutput;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public List<? extends EmptyUnboundedSource<T>> split(

@Override
public UnboundedReader<T> createReader(
PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) throws IOException {
PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) {
return new UnboundedReader<T>() {
@Override
public boolean start() throws IOException {
return advance();
}

@Override
public boolean advance() throws IOException {
public boolean advance() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Gauge;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.junit.Test;

/** Unite tests for {@link FlinkUnboundedSourceReader}. */
Expand Down Expand Up @@ -228,6 +232,38 @@ public void testWatermark() throws Exception {
public void testWatermarkOnEmptySource() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
new ManuallyTriggeredScheduledExecutorService();
AtomicReference<Instant> watermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
ReaderOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> output =
new ReaderOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>() {
@Override
public void collect(WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> unused) {}

@Override
public void collect(
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> unused, long l) {}

@Override
public void emitWatermark(Watermark w) {
watermark.compareAndSet(
BoundedWindow.TIMESTAMP_MIN_VALUE, Instant.ofEpochMilli(w.getTimestamp()));
}

@Override
public void markIdle() {}

@Override
public SourceOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
createOutputForSplit(String s) {
return this;
}

@Override
public void releaseOutputForSplit(String s) {}

@Override
public void markActive() {}
};
Instant now = Instant.now();
try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
(FlinkUnboundedSourceReader<KV<Integer, Integer>>) createReader(executor, -1L)) {
List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createEmptySplits(2);
Expand All @@ -236,22 +272,34 @@ public void testWatermarkOnEmptySource() throws Exception {
reader.notifyNoMoreSplits();

for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null));
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// move first reader to 'now'
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(0).getBeamSplitSource())
.setWatermark(now);
// force trigger timeout
executor.triggerScheduledTasks();
for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// check we have emitted watermark
assertEquals(now, watermark.get());

// move first reader to end of time
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(0).getBeamSplitSource())
.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null));
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// move the second reader to end of time
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(1).getBeamSplitSource())
.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null));
assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
}
}

Expand Down

0 comments on commit 836e77e

Please sign in to comment.