diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 2f9661333827f..5284ba9343f06 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -17,15 +17,14 @@ package org.apache.spark.examples.streaming; +import static java.util.Arrays.asList; + import java.util.List; import java.util.regex.Pattern; -import scala.Tuple2; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - +import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; @@ -36,6 +35,11 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import scala.Tuple2; + /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. @@ -78,10 +82,11 @@ public static void main(String[] args) { JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); ssc.checkpoint("."); - // Create a JavaReceiverInputDStream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - // Note that no duplication in storage level only for running locally. - // Replication necessary in distributed scenario for fault tolerance. + // Initial RDD input to updateStateByKey + JavaPairRDD initialRDD = ssc.sc() + .parallelizePairs(asList(new Tuple2("hello", 1), new Tuple2 + ("world", 1))); + JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); @@ -101,7 +106,8 @@ public Tuple2 call(String s) { }); // This will give a Dstream made of state (which is the cumulative count of the words) - JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction); + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, new + HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); stateDstream.print(); ssc.start();