Skip to content

Commit

Permalink
Merge pull request apache#3 from apache/master
Browse files Browse the repository at this point in the history
pull latest code to check WatermarkTest
  • Loading branch information
mingmxu authored Feb 20, 2017
2 parents 6aca4d5 + 4e5a762 commit 93caf46
Show file tree
Hide file tree
Showing 52 changed files with 1,085 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,17 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -107,32 +105,6 @@ public class PackageUtilTest {
// 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";

// Hamcrest matcher to assert a string matches a pattern
private static class RegexMatcher extends BaseMatcher<String> {
private final Pattern pattern;

public RegexMatcher(String regex) {
this.pattern = Pattern.compile(regex);
}

@Override
public boolean matches(Object o) {
if (!(o instanceof String)) {
return false;
}
return pattern.matcher((String) o).matches();
}

@Override
public void describeTo(Description description) {
description.appendText(String.format("matches regular expression %s", pattern));
}

public static RegexMatcher matches(String regex) {
return new RegexMatcher(regex);
}
}

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand Down Expand Up @@ -191,12 +192,15 @@ public SparkPipelineResult run(final Pipeline pipeline) {
new JavaStreamingListenerWrapper(
new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));

// register listeners.
// register user-defined listeners.
for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) {
LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}

// register Watermarks listener to broadcast the advanced WMs.
jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc)));

startPipeline = executorService.submit(new Runnable() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.hamcrest.Matchers.is;

import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
Expand Down Expand Up @@ -106,14 +108,16 @@ public <OutputT extends POutput, InputT extends PInput> OutputT apply(

@Override
public SparkPipelineResult run(Pipeline pipeline) {
TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);

// clear metrics singleton
// clear state of Aggregators, Metrics and Watermarks if exists.
AggregatorsAccumulator.clear();
SparkMetricsContainer.clear();
GlobalWatermarkHolder.clear();

TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
SparkPipelineResult result = delegate.run(pipeline);
result.waitUntilFinish();


// make sure the test pipeline finished successfully.
State resultState = result.getState();
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void checkpoint() throws IOException {
}

@VisibleForTesting
static void clear() {
public static void clear() {
synchronized (AggregatorsAccumulator.class) {
instance = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -55,7 +60,7 @@
* <p>This read is a composite of the following steps:
* <ul>
* <li>Create a single-element (per-partition) stream, that contains the (partitioned)
* {@link Source} and an optional {@link UnboundedSource.CheckpointMark} to start from.</li>
* {@link Source} and an optional {@link CheckpointMark} to start from.</li>
* <li>Read from within a stateful operation {@link JavaPairInputDStream#mapWithState(StateSpec)}
* using the {@link StateSpecFunctions#mapSourceFunction(SparkRuntimeContext)} mapping function,
* which manages the state of the CheckpointMark per partition.</li>
Expand All @@ -65,10 +70,11 @@
*/
public class SparkUnboundedSource {

public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc,
SparkRuntimeContext rc,
UnboundedSource<T, CheckpointMarkT> source) {
public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read(
JavaStreamingContext jssc,
SparkRuntimeContext rc,
UnboundedSource<T, CheckpointMarkT> source) {

SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class);
Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
SourceDStream<T, CheckpointMarkT> sourceDStream = new SourceDStream<>(jssc.ssc(), source, rc);
Expand All @@ -82,7 +88,7 @@ JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc,
JavaSparkContext$.MODULE$.<CheckpointMarkT>fakeClassTag());

// call mapWithState to read from a checkpointable sources.
JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[],
JavaMapWithStateDStream<Source<T>, CheckpointMarkT, Tuple2<byte[], Instant>,
Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = inputDStream.mapWithState(
StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc)));

Expand All @@ -109,13 +115,14 @@ public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
WindowedValue.FullWindowedValueCoder.of(
source.getDefaultOutputCoder(),
GlobalWindow.Coder.INSTANCE);
return mapWithStateDStream.flatMap(
JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap(
new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
@Override
public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
return t2._1();
}
}).map(CoderHelpers.fromByteFunction(coder));
return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id));
}

private static <T> String getSourceName(Source<T> source, int id) {
Expand Down Expand Up @@ -173,30 +180,46 @@ public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
// compute parent.
scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
long count = 0;
Instant globalWatermark = new Instant(Long.MIN_VALUE);
SparkWatermarks sparkWatermark = null;
Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
if (parentRDDOpt.isDefined()) {
JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
for (Metadata metadata: parentRDD.collect()) {
count += metadata.getNumRecords();
// a monotonically increasing watermark.
globalWatermark = globalWatermark.isBefore(metadata.getWatermark())
? metadata.getWatermark() : globalWatermark;
// compute the global input watermark - advance to latest of all partitions.
Instant partitionLowWatermark = metadata.getLowWatermark();
globalLowWatermarkForBatch =
globalLowWatermarkForBatch.isBefore(partitionLowWatermark)
? partitionLowWatermark : globalLowWatermarkForBatch;
Instant partitionHighWatermark = metadata.getHighWatermark();
globalHighWatermarkForBatch =
globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
? partitionHighWatermark : globalHighWatermarkForBatch;
}

sparkWatermark =
new SparkWatermarks(
globalLowWatermarkForBatch,
globalHighWatermarkForBatch,
new Instant(validTime.milliseconds()));
// add to watermark queue.
GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark);
}
// report - for RateEstimator and visibility.
report(validTime, count, globalWatermark);
report(validTime, count, sparkWatermark);
return scala.Option.empty();
}

private void report(Time batchTime, long count, Instant watermark) {
private void report(Time batchTime, long count, SparkWatermarks sparkWatermark) {
// metadata - #records read and a description.
scala.collection.immutable.Map<String, Object> metadata =
new scala.collection.immutable.Map.Map1<String, Object>(
StreamInputInfo.METADATA_KEY_DESCRIPTION(),
String.format(
"Read %d records with observed watermark %s, from %s for batch time: %s",
"Read %d records with observed watermarks %s, from %s for batch time: %s",
count,
watermark,
sparkWatermark == null ? "N/A" : sparkWatermark,
sourceName,
batchTime));
StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
Expand All @@ -209,19 +232,25 @@ private void report(Time batchTime, long count, Instant watermark) {
*/
public static class Metadata implements Serializable {
private final long numRecords;
private final Instant watermark;
private final Instant lowWatermark;
private final Instant highWatermark;

public Metadata(long numRecords, Instant watermark) {
public Metadata(long numRecords, Instant lowWatermark, Instant highWatermark) {
this.numRecords = numRecords;
this.watermark = watermark;
this.lowWatermark = lowWatermark;
this.highWatermark = highWatermark;
}

public long getNumRecords() {
return numRecords;
}

public Instant getWatermark() {
return watermark;
public Instant getLowWatermark() {
return lowWatermark;
}

public Instant getHighWatermark() {
return highWatermark;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -94,29 +95,36 @@ private abstract static class SerializableFunction3<T1, T2, T3, T4>
* @return The appropriate {@link org.apache.spark.streaming.StateSpec} function.
*/
public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>,
scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
final SparkRuntimeContext runtimeContext) {

return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>,
Tuple2<Iterable<byte[]>, Metadata>>() {
return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>,
State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() {

@Override
public Tuple2<Iterable<byte[]>, Metadata> apply(
Source<T> source,
scala.Option<CheckpointMarkT> startCheckpointMark,
State<byte[]> state) {
State<Tuple2<byte[], Instant>> state) {

// source as MicrobatchSource
MicrobatchSource<T, CheckpointMarkT> microbatchSource =
(MicrobatchSource<T, CheckpointMarkT>) source;

// Initial high/low watermarks.
Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant highWatermark;

// if state exists, use it, otherwise it's first time so use the startCheckpointMark.
// startCheckpointMark may be EmptyCheckpointMark (the Spark Java API tries to apply
// Optional(null)), which is handled by the UnboundedSource implementation.
Coder<CheckpointMarkT> checkpointCoder = microbatchSource.getCheckpointMarkCoder();
CheckpointMarkT checkpointMark;
if (state.exists()) {
checkpointMark = CoderHelpers.fromByteArray(state.get(), checkpointCoder);
// previous (output) watermark is now the low watermark.
lowWatermark = state.get()._2();
checkpointMark = CoderHelpers.fromByteArray(state.get()._1(), checkpointCoder);
LOG.info("Continue reading from an existing CheckpointMark.");
} else if (startCheckpointMark.isDefined()
&& !startCheckpointMark.get().equals(EmptyCheckpointMark.get())) {
Expand Down Expand Up @@ -154,7 +162,10 @@ public Tuple2<Iterable<byte[]>, Metadata> apply(
finished = !reader.advance();
}

watermark = ((MicrobatchSource.Reader) reader).getWatermark();
// end-of-read watermark is the high watermark, but don't allow decrease.
Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark();
highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark;

// close and checkpoint reader.
reader.close();
LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(),
Expand All @@ -164,11 +175,15 @@ public Tuple2<Iterable<byte[]>, Metadata> apply(
@SuppressWarnings("unchecked")
CheckpointMarkT finishedReadCheckpointMark =
(CheckpointMarkT) ((MicrobatchSource.Reader) reader).getCheckpointMark();
byte[] codedCheckpoint = new byte[0];
if (finishedReadCheckpointMark != null) {
state.update(CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder));
codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
} else {
LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
}
// persist the end-of-read (high) watermark for following read, where it will become
// the next low watermark.
state.update(new Tuple2<>(codedCheckpoint, highWatermark));
} catch (IOException e) {
throw new RuntimeException("Failed to read from reader.", e);
}
Expand All @@ -179,7 +194,7 @@ public Iterator<byte[]> iterator() {
return Iterators.unmodifiableIterator(readValues.iterator());
}
};
return new Tuple2<>(iterable, new Metadata(readValues.size(), watermark));
return new Tuple2<>(iterable, new Metadata(readValues.size(), lowWatermark, highWatermark));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
Expand All @@ -32,6 +33,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

/**
Expand Down Expand Up @@ -104,7 +106,13 @@ public void cache(String storageLevel) {

@Override
public void action() {
rdd.count();
// Empty function to force computation of RDD.
rdd.foreachPartition(new VoidFunction<Iterator<WindowedValue<T>>>() {
@Override
public void call(Iterator<WindowedValue<T>> windowedValueIterator) throws Exception {
// Empty implementation.
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class SparkContextFactory {
* {@code true} then the Spark context will be reused for beam pipelines.
* This property should only be enabled for tests.
*/
static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";

// Spark allows only one context for JVM so this can be static.
private static JavaSparkContext sparkContext;
Expand Down
Loading

0 comments on commit 93caf46

Please sign in to comment.