From 19797f9fc9b062ee30746c184ad432192ca5e19a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 29 Sep 2014 13:41:44 -0700 Subject: [PATCH] clean up --- python/pyspark/streaming/context.py | 6 +++--- python/pyspark/streaming/tests.py | 4 ++-- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- .../spark/streaming/api/java/JavaStreamingContext.scala | 4 ---- .../apache/spark/streaming/api/python/PythonDStream.scala | 3 ++- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index ce8aec613d08b..425b0a96aa832 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -15,6 +15,9 @@ # limitations under the License. # +from py4j.java_collections import ListConverter +from py4j.java_gateway import java_import + from pyspark import RDD from pyspark.serializers import UTF8Deserializer from pyspark.context import SparkContext @@ -22,9 +25,6 @@ from pyspark.streaming.dstream import DStream from pyspark.streaming.util import RDDFunction -from py4j.java_collections import ListConverter -from py4j.java_gateway import java_import - __all__ = ["StreamingContext"] diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 54d4d9b1f7850..342afde3bffd2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -348,7 +348,7 @@ def test_count_by_value_and_window(self): input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): - return dstream.countByValueAndWindow(6, 1) + return dstream.countByValueAndWindow(5, 1) expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]] self._test_func(input, func, expected) @@ -357,7 +357,7 @@ def test_group_by_key_and_window(self): input = [[('a', i)] for i in range(5)] def func(dstream): - return dstream.groupByKeyAndWindow(4, 1).mapValues(list) + return dstream.groupByKeyAndWindow(3, 1).mapValues(list) expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])], [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ab6a6de074a80..ef7631788f26d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -413,7 +413,7 @@ class StreamingContext private[streaming] ( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { - new TransformedDStream[T](dstreams, (transformFunc)) + new TransformedDStream[T](dstreams, transformFunc) } /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 662cd8d22c6a5..9dc26dc6b32a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -549,10 +549,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { - implicit def fromStreamingContext(ssc: StreamingContext): - JavaStreamingContext = new JavaStreamingContext(ssc) - - implicit def toStreamingContext(jssc: JavaStreamingContext): StreamingContext = jssc.ssc /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 2f20b05991b8e..30c52c15e9e68 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -41,7 +41,7 @@ trait PythonRDDFunction { /** * Wrapper for PythonRDDFunction */ -class RDDFunction(pfunc: PythonRDDFunction) +private[python] class RDDFunction(pfunc: PythonRDDFunction) extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { @@ -68,6 +68,7 @@ class RDDFunction(pfunc: PythonRDDFunction) some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava)) } + // for JFunction2 def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { pfunc.call(time.milliseconds, rdds) }