diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8ed50d3dd2531..86bacf432e0ac 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -431,6 +431,7 @@ def __init__(self, prev, func, preservesPartitioning=False): self._prev_jrdd_deserializer = prev._jrdd_deserializer else: prev_func = prev.func + def pipeline_func(split, iterator): return func(split, prev_func(split, iterator)) self.func = pipeline_func