Skip to content

Commit

Permalink
This closes #69
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Mar 23, 2016
2 parents 8d87ee0 + 1504ba7 commit 2f90258
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
Expand All @@ -31,26 +32,36 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
* interface.
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
* {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
*</p>
* For now we support non-parallel, not checkpointed sources.
* For now we support non-parallel sources, checkpointing is WIP.
* */
public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {

private final String name;
private final UnboundedSource.UnboundedReader<T> reader;
private final UnboundedSource<T, ?> source;

private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;

private volatile boolean isRunning = false;

public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
/** Serialized using custom Java serialization via Jackson */
private transient PipelineOptions pipelineOptions;

/** Instantiated during runtime **/
private transient UnboundedSource.UnboundedReader<T> reader;

public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
this.name = transform.getName();
this.reader = transform.getSource().createReader(options, null);
this.pipelineOptions = pipelineOptions;
this.source = transform.getSource();
}

public String getName() {
Expand All @@ -67,40 +78,51 @@ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
@Override
public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
throw new RuntimeException(
"We assume that all sources in Dataflow are EventTimeSourceFunction. " +
"Apparently " + this.name + " is not. " +
"Probably you should consider writing your own Wrapper for this source.");
}

context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();

this.isRunning = true;
isRunning = true;

reader = source.createReader(pipelineOptions, null);

boolean inputAvailable = reader.start();

setNextWatermarkTimer(this.runtime);

while (isRunning) {

while (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

if (inputAvailable) {
try {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();
while (isRunning) {

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
if (!inputAvailable && isRunning) {
// wait a bit until we retry to pull more records
Thread.sleep(50);
inputAvailable = reader.advance();
}

inputAvailable = reader.advance();
if (inputAvailable) {

// get it and its timestamp from the source
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();

// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
}

inputAvailable = reader.advance();
}
}

} finally {
reader.close();
}
}

Expand Down Expand Up @@ -131,4 +153,19 @@ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}


// Special serialization of the PipelineOptions necessary to instantiate the reader.
private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
out.defaultWriteObject();
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(out, pipelineOptions);
}

// Special deserialization of the PipelineOptions necessary to instantiate the reader.
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
ObjectMapper mapper = new ObjectMapper();
pipelineOptions = mapper.readValue(in, PipelineOptions.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.joda.time.Instant;
import org.junit.internal.ArrayComparisonFailure;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;


public class UnboundedSourceITCase extends StreamingProgramTestBase {

protected static String resultPath;

public UnboundedSourceITCase() {
}

static final String[] EXPECTED_RESULT = new String[]{
"1", "2", "3", "4", "5", "6", "7", "8", "9"};

@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
}

@Override
protected void testProgram() throws Exception {
runProgram(resultPath);
}

private static void runProgram(String resultPath) {

Pipeline p = FlinkTestPipeline.createForStreaming();

PCollection<String> result = p
.apply(Read.from(new RangeReadSource(1, 10)))
.apply(Window.<Integer>into(new GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(10))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Integer, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
}));

result.apply(TextIO.Write.to(resultPath));

try {
p.run();
fail();
} catch(Exception e) {
assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
}
}


private static class RangeReadSource extends UnboundedSource<Integer, UnboundedSource.CheckpointMark> {

final int from;
final int to;

RangeReadSource(int from, int to) {
this.from = from;
this.to = to;
}


@Override
public List<? extends UnboundedSource<Integer, CheckpointMark>> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}

@Override
public UnboundedReader<Integer> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
return new RangeReadReader(options);
}

@Nullable
@Override
public Coder<CheckpointMark> getCheckpointMarkCoder() {
return null;
}

@Override
public void validate() {
}

@Override
public Coder<Integer> getDefaultOutputCoder() {
return BigEndianIntegerCoder.of();
}

private class RangeReadReader extends UnboundedReader<Integer> {

private int current;

private long watermark;

public RangeReadReader(PipelineOptions options) {
assertNotNull(options);
current = from;
}

@Override
public boolean start() throws IOException {
return true;
}

@Override
public boolean advance() throws IOException {
current++;
watermark++;

if (current >= to) {
try {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
throw new IOException("The source terminates as expected.");
} catch (IOException e) {
// pass on the exception to terminate the source
throw e;
} catch (Throwable t) {
// expected here from the file check
}
}
return current < to;
}

@Override
public Integer getCurrent() throws NoSuchElementException {
return current;
}

@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return new Instant(current);
}

@Override
public void close() throws IOException {
}

@Override
public Instant getWatermark() {
return new Instant(watermark);
}

@Override
public CheckpointMark getCheckpointMark() {
return null;
}

@Override
public UnboundedSource<Integer, ?> getCurrentSource() {
return RangeReadSource.this;
}
}
}
}


0 comments on commit 2f90258

Please sign in to comment.