diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 5a56a3d958254..18415fcebe771 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -139,6 +139,12 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineLocally(iterator): combiners = {} for x in iterator: + + #TODO for count operation make sure count implementation + # This is different from what pyspark does + if isinstance(x, int): + x = ("", x) + (k, v) = x if k not in combiners: combiners[k] = createCombiner(v)