From 1523b66c5aa4b098c1de3ec55ee44848973b7d00 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 4 Aug 2014 16:07:48 -0700 Subject: [PATCH] WIP --- examples/src/main/python/streaming/test_oprations.py | 5 +++-- python/pyspark/streaming/context.py | 5 +++++ python/pyspark/streaming/dstream.py | 4 +++- python/pyspark/streaming/utils.py | 1 - .../apache/spark/streaming/api/python/PythonDStream.scala | 5 +++-- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py index cb338ced5f228..084902b6a2f0d 100644 --- a/examples/src/main/python/streaming/test_oprations.py +++ b/examples/src/main/python/streaming/test_oprations.py @@ -15,10 +15,11 @@ lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) words = lines.flatMap(lambda line: line.split(" ")) +# ssc.checkpoint("checkpoint") mapped_words = words.map(lambda word: (word, 1)) count = mapped_words.reduceByKey(add) count.pyprint() ssc.start() -# ssc.awaitTermination() - ssc.stop() + ssc.awaitTermination() +# ssc.stop() diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 01201f66421f8..dfaa5cfbbae27 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -133,3 +133,8 @@ def stop(self, stopSparkContext=True): finally: # Stop Callback server SparkContext._gateway.shutdown() + + def checkpoint(self, directory): + """ + """ + self._jssc.checkpoint(directory) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index d6781a3e50e65..937bafc6262b1 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -172,7 +172,8 @@ def add_shuffle_key(split, iterator): with _JavaStackTrace(self.ctx) as st: partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) - jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream() + jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(), + partitioner).asJavaDStream() dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) # This is required so that id(partitionFunc) remains unique, even if # partitionFunc is a lambda: @@ -246,6 +247,7 @@ def takeAndPrint(rdd, time): # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index c60ecd1ed607a..aa5e19adbd927 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -37,7 +37,6 @@ class Java: implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction'] - def msDurationToString(ms): """ Returns a human-readable string representing a duration such as "35ms" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 86b067e1a2810..6f2e4bd0d90c8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.api.python._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.streaming.{Duration, Time} +import org.apache.spark.streaming.{StreamingContext, Duration, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ @@ -139,7 +139,7 @@ DStream[(Long, Array[Byte])](prev.ssc){ } -private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends +private class PythonPairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends DStream[Array[Byte]](prev.ssc){ override def dependencies = List(prev) @@ -180,6 +180,7 @@ class PythonForeachDStream( this.register() } + /* This does not work. Ignore this for now. -TD class PythonTransformedDStream(