Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-143] add test for UnboundedSourceWrapper #69

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
Expand All @@ -31,26 +32,36 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
* interface.
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
* {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
*</p>
* For now we support non-parallel, not checkpointed sources.
* For now we only support non-parallel sources.
* */
public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a wrapper for Read.Unbounded not UnboundedSource -- maybe should rename to UnboundedReadWrapper? (Or ReadUnboundedWrapper)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it really is a wrapper for UnboundedSource. In my view Read.Unbounded is just the API construct that itself contains an UnboundedSource. Said source is then handled by the UnboundedSourceWrapper at runtime.


private final String name;
private final UnboundedSource.UnboundedReader<T> reader;
private final UnboundedSource<T, ?> source;

private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;

private volatile boolean isRunning = false;

public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
/** Serialized using custom Java serialization via Jackson */
private transient PipelineOptions pipelineOptions;

/** Instantiated during runtime **/
private transient UnboundedSource.UnboundedReader<T> reader;

public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
this.name = transform.getName();
this.reader = transform.getSource().createReader(options, null);
this.pipelineOptions = pipelineOptions;
this.source = transform.getSource();
}

public String getName() {
Expand All @@ -67,40 +78,51 @@ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
@Override
public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to inject the PipelineOptions in the SourceContext? That way they would be provided in the args to the run method and you would not need to manually serialize/deserialize this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand. SourceContext is part of Flink and even if we inserted the PipelineOptions there, we would have to serialize it, right? We could provide a special SourceContext for Beam. That would still require us to do some casting in the run method or to provide a special SourceFunction interface for Beam. Not sure if that is easier.

if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
throw new RuntimeException(
"We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. " +
"Probably you should consider writing your own Wrapper for this source.");
}

context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();

this.isRunning = true;
isRunning = true;

reader = source.createReader(pipelineOptions, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there's no code here to checkpoint the reader and restore from checkpoints. Is that fault tolerance on the roadmap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is not hard to do with Flink. It is already captured here: https://issues.apache.org/jira/browse/BEAM-130

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like all we can do is to call finalizeCheckpoint() where appropriate. How does one snapshot the actual state? I think the interface needs to be extended to be more general.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, one can have a custom CheckpointMark with the state of the reader/source. Looks good.


boolean inputAvailable = reader.start();

setNextWatermarkTimer(this.runtime);

while (isRunning) {

while (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

if (inputAvailable) {
try {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();
while (isRunning) {

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
if (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

inputAvailable = reader.advance();
if (inputAvailable) {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
}

inputAvailable = reader.advance();
}
}

} finally {
reader.close();
}
}

Expand Down Expand Up @@ -131,4 +153,19 @@ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}


// Special serialization of the PipelineOptions necessary to instantiate the reader.
private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
out.defaultWriteObject();
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(out, pipelineOptions);
}

// Special deserialization of the PipelineOptions necessary to instantiate the reader.
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
ObjectMapper mapper = new ObjectMapper();
pipelineOptions = mapper.readValue(in, PipelineOptions.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Joiner;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.joda.time.Instant;
import org.junit.internal.ArrayComparisonFailure;

import javax.annotation.Nullable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An easy unit test of serialization would be with SerializableUtils.ensureSerializable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a valid test but I don't think it covers all cases. Think about different classloaders being used in the (local/remote) cluster and the user program.

import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;


public class UnboundedSourceITCase extends StreamingProgramTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(learning opportunity Q:) what is ITCase mean in Flink? I'm going to guess "integration test"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally, we've often found it useful to put the @RunWith(JUnit4.class) annotation on test classes. This only helps occasionally, but there are pathological cases where the test system won't pick the right version of JUnit, for example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, there's no @Test anywhere in this file so I'm presumably just wrong. I guess the testing stuff is in the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ITCase suffix stands for integration test. The Surefire Maven plugin executes those based on the suffix. If you look in StreamingProgramTestBase there is a testJob() method annotated with @Test. Not too straight forward, I must admit.


protected static String resultPath;

public UnboundedSourceITCase() {
}

static final String[] EXPECTED_RESULT = new String[]{
"1", "2", "3", "4", "5", "6", "7", "8", "9"};

@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
}

@Override
protected void testProgram() throws Exception {
runProgram(resultPath);
}

private static void runProgram(String resultPath) {

Pipeline p = FlinkTestPipeline.createForStreaming();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgroh may be interested to see how these tests work with streaming pipelines that terminate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd like to see how we implement that in Beam. This is not possible right now, right?


PCollection<String> result = p
.apply(Read.from(new RangeReadSource(1, 10)))
.apply(Window.<Integer>into(new GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(10))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Integer, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
}));

result.apply(TextIO.Write.to(resultPath));

try {
p.run();
} catch(Exception e) {
assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happening here? Is the exception expected, or not?

if yes:

try {
  p.run();
  fail(); // expected exception
} catch (Exception e) {
  // assert
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.junit.rules.ExpectedException ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhalperi It is expected, yes. This is due to missing support to stop streaming pipelines in Beam. In Flink sources may terminate and notify the following operators. For testing, we need to throw a specific exception, that we check in the assert.

@tgroh Makes sense to use ExpectedException. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually org.junit.rules.ExpectedException is not an option here because it's a nested exception that is thrown.

}
}


private static class RangeReadSource extends UnboundedSource<Integer, UnboundedSource.CheckpointMark> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgroh

Wondering out load.

At first I thought maybe you could use Read.from(CountingSource.unbounded()).withMaxNumRecords(numElements) instead of writing RangeReadSource. But I've decided that wouldn't work, because it would look like a BoundedSource to your execution engine.

Thomas, would it make sense to make a Read.Bounded.asFiniteUnboundedSource() or something like that for testing? Does that even make sense in the model?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the model, it's pretty reasonable, though we don't have a way in the model to assert from an UnboundedSource that we're "done" (and thus shutdown the pipleine and stop getting called), so the pipeline would continue to run forever in the absence of an explicit shutdown or configuration enforcing "doneness" (example)

We could write an adapter for reading an Unbounded PCollection from a Bounded Source (like BoundedReadFromUnboundedSource); notably, since we can't checkpoint arbitrary Bounded Sources, this means that we would always have to read the entire contents of the BoundedSource in a single execution - which means that said adapter isn't universally effective - we need to read the entire contents of the source into memory in a single element read of the unbounded source - but it might be a useful testing utility (especially for small inputs)

I'm also working the design for an unbounded-like source for writing tests, located here, which would hopefully be portable into this kind of test


final int from;
final int to;

RangeReadSource(int from, int to) {
this.from = from;
this.to = to;
}


@Override
public List<? extends UnboundedSource<Integer, CheckpointMark>> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sanity, should probably return just ImmutableList.of(this) [or you know new ArrayList<>(this) or your preferred list creation technique]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much cleaner. Thanks.

}

@Override
public UnboundedReader<Integer> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
return new RangeReadReader(options);
}

@Nullable
@Override
public Coder<CheckpointMark> getCheckpointMarkCoder() {
return null;
}

@Override
public void validate() {
}

@Override
public Coder<Integer> getDefaultOutputCoder() {
return BigEndianIntegerCoder.of();
}

private class RangeReadReader extends UnboundedReader<Integer> {

private int current;

public RangeReadReader(PipelineOptions options) {
assertNotNull(options);
current = from;
}

@Override
public boolean start() throws IOException {
return true;
}

@Override
public boolean advance() throws IOException {
current++;
if (current >= to) {
try {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
throw new IOException("The source terminates as expected.");
} catch (IOException e) {
// pass on the exception to terminate the source
throw e;
} catch (Throwable t) {
// expected here from the file check
}
}
return current < to;
}

@Override
public Integer getCurrent() throws NoSuchElementException {
return current;
}

@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return Instant.now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for this test obviously, but a note for future reference.

The UnboundedSource API requires getCurrentTimestamp() to be the same if called repeatedly without an intervening call to advance(). Typically that would mean that you set a timestamp as a private local in advance() and then simply retrieve it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. I've correct that.

}

@Override
public void close() throws IOException {
}

@Override
public Instant getWatermark() {
return Instant.now();
}

@Override
public CheckpointMark getCheckpointMark() {
return null;
}

@Override
public UnboundedSource<Integer, ?> getCurrentSource() {
return RangeReadSource.this;
}
}
}
}