From b8d7d243ee750d314a3397c4c908aec5e7d87111 Mon Sep 17 00:00:00 2001 From: giwa Date: Sun, 3 Aug 2014 19:25:13 -0700 Subject: [PATCH] implemented reduce and count function in Dstream --- .../python/streaming/network_wordcount.py | 2 ++ python/pyspark/streaming/dstream.py | 27 ++++++++++++------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 2bbb36a6b787e..f6fba4488e238 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -19,5 +19,7 @@ reduced_lines = mapped_lines.reduceByKey(add) reduced_lines.pyprint() + count_lines = mapped_lines.count() + count_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 0ba2b4b38a281..e6cd2eb9a49af 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -22,25 +22,23 @@ def count(self): """ """ - pass - #TODO: make sure count implementation, thiis different from what pyspark does - #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) + # TODO: make sure count implementation, this different from what pyspark does + return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): """ """ - pass - #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self): """ - Since print is reserved name for python, we cannot make a print method function. + Since print is reserved name for python, we cannot define a print method function. This function prints serialized data in RDD in DStream because Scala and Java cannot - deserialized pickled python object. Please use DStream.pyprint() instead to print result. + deserialized pickled python object. Please use DStream.pyprint() instead to print results. Call DStream.print(). """ - #hack to call print function in DStream + # a hack to call print function in DStream getattr(self._jdstream, "print")() def filter(self, f): @@ -79,17 +77,23 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ return PipelinedDStream(self, f, preservesPartitioning) + def reduce(self, func): + """ + + """ + return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) + def reduceByKey(self, func, numPartitions=None): """ Merge the value for each key using an associative reduce function. This will also perform the merging locally on each mapper before - sending resuls to reducer, similarly to a "combiner" in MapReduce. + sending results to reducer, similarly to a "combiner" in MapReduce. Output will be hash-partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. """ - return self.combineByKey(lambda x:x, func, func, numPartitions) + return self.combineByKey(lambda x: x, func, func, numPartitions) def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions = None): @@ -99,6 +103,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, """ if numPartitions is None: numPartitions = self._defaultReducePartitions() + def combineLocally(iterator): combiners = {} for x in iterator: @@ -116,6 +121,7 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self._mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) + def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: @@ -124,6 +130,7 @@ def _mergeCombiners(iterator): else: combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() + return shuffled._mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None):