Skip to content

Commit

Permalink
[BEAM-144] solve reader serialization issue
Browse files Browse the repository at this point in the history
Now, we initialize the UnboundedSourceReader at runtime which requires
us to keep a copy of the PipelineOptions. This should be fine here
because we are at the lowest point of the execution stack.
  • Loading branch information
mxm committed Mar 23, 2016
1 parent 63a2d54 commit 73bbb1c
Showing 1 changed file with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
Expand All @@ -31,26 +32,36 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
* interface.
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
* {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
*</p>
* For now we support non-parallel, not checkpointed sources.
* For now we only support non-parallel sources.
* */
public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {

private final String name;
private final UnboundedSource.UnboundedReader<T> reader;
private final UnboundedSource<T, ?> source;

private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;

private volatile boolean isRunning = false;

public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
/** Serialized using custom Java serialization via Jackson */
private transient PipelineOptions pipelineOptions;

/** Instantiated during runtime **/
private transient UnboundedSource.UnboundedReader<T> reader;

public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
this.name = transform.getName();
this.reader = transform.getSource().createReader(options, null);
this.pipelineOptions = pipelineOptions;
this.source = transform.getSource();
}

public String getName() {
Expand All @@ -67,40 +78,51 @@ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
@Override
public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
throw new RuntimeException(
"We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. " +
"Probably you should consider writing your own Wrapper for this source.");
}

context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();

this.isRunning = true;
isRunning = true;

reader = source.createReader(pipelineOptions, null);

boolean inputAvailable = reader.start();

setNextWatermarkTimer(this.runtime);

while (isRunning) {

while (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

if (inputAvailable) {
try {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();
while (isRunning) {

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
if (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

inputAvailable = reader.advance();
if (inputAvailable) {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
}

inputAvailable = reader.advance();
}
}

} finally {
reader.close();
}
}

Expand Down Expand Up @@ -131,4 +153,19 @@ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}


// Special serialization of the PipelineOptions necessary to instantiate the reader.
private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
out.defaultWriteObject();
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(out, pipelineOptions);
}

// Special deserialization of the PipelineOptions necessary to instantiate the reader.
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
ObjectMapper mapper = new ObjectMapper();
pipelineOptions = mapper.readValue(in, PipelineOptions.class);
}
}

0 comments on commit 73bbb1c

Please sign in to comment.