Skip to content

Commit

Permalink
This closes apache#2046
Browse files Browse the repository at this point in the history
  • Loading branch information
Sela committed Feb 20, 2017
2 parents aa45ccb + a2f0615 commit 4e5a762
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
Expand All @@ -32,6 +33,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

/**
Expand Down Expand Up @@ -104,7 +106,13 @@ public void cache(String storageLevel) {

@Override
public void action() {
rdd.count();
// Empty function to force computation of RDD.
rdd.foreachPartition(new VoidFunction<Iterator<WindowedValue<T>>>() {
@Override
public void call(Iterator<WindowedValue<T>> windowedValueIterator) throws Exception {
// Empty implementation.
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,12 +114,8 @@ public void cache(String storageLevel) {

@Override
public void action() {
dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
@Override
public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
rdd.count();
}
});
// Force computation of DStream.
dStream.dstream().register();
}

@Override
Expand Down

0 comments on commit 4e5a762

Please sign in to comment.