Skip to content

Commit

Permalink
Improve example to look like scala example
Browse files Browse the repository at this point in the history
  • Loading branch information
gasparms committed Feb 13, 2015
1 parent 4d8785c commit f527328
Showing 1 changed file with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.examples.streaming;

import static java.util.Arrays.asList;

import java.util.List;
import java.util.regex.Pattern;

import scala.Tuple2;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
Expand All @@ -36,6 +35,11 @@
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import scala.Tuple2;

/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
* second starting with initial value of word count.
Expand Down Expand Up @@ -78,10 +82,11 @@ public static void main(String[] args) {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(".");

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
// Initial RDD input to updateStateByKey
JavaPairRDD<String, Integer> initialRDD = ssc.sc()
.parallelizePairs(asList(new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>
("world", 1)));

JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);

Expand All @@ -101,7 +106,8 @@ public Tuple2<String, Integer> call(String s) {
});

// This will give a Dstream made of state (which is the cumulative count of the words)
JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction);
JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction, new
HashPartitioner(ssc.sc().defaultParallelism()), initialRDD);

stateDstream.print();
ssc.start();
Expand Down

0 comments on commit f527328

Please sign in to comment.