From 62644dc95ca2f49ad33f69c6c5f688f424f03bc7 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 19 Feb 2017 19:52:22 +0200 Subject: [PATCH] [BEAM-1512] Optimize leaf transforms materialization --- .../beam/runners/spark/translation/BoundedDataset.java | 10 +++++++++- .../spark/translation/streaming/UnboundedDataset.java | 9 ++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 1cfb0e02ed1a8..5e198467abb21 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -20,6 +20,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; +import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -32,6 +33,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; /** @@ -104,7 +106,13 @@ public void cache(String storageLevel) { @Override public void action() { - rdd.count(); + // Empty function to force computation of RDD. + rdd.foreachPartition(new VoidFunction>>() { + @Override + public void call(Iterator> windowedValueIterator) throws Exception { + // Empty implementation. + } + }); } @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index d059c7e3dec31..49e558444d010 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; @@ -99,12 +98,8 @@ public void cache(String storageLevel) { @Override public void action() { - dStream.foreachRDD(new VoidFunction>>() { - @Override - public void call(JavaRDD> rdd) throws Exception { - rdd.count(); - } - }); + // Force computation of DStream. + dStream.dstream().register(); } @Override