Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 29, 2014
1 parent 6ebceca commit 19797f9
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 11 deletions.
6 changes: 3 additions & 3 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
# limitations under the License.
#

from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import

from pyspark import RDD
from pyspark.serializers import UTF8Deserializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
from pyspark.streaming.util import RDDFunction

from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import

__all__ = ["StreamingContext"]


Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def test_count_by_value_and_window(self):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]

def func(dstream):
return dstream.countByValueAndWindow(6, 1)
return dstream.countByValueAndWindow(5, 1)

expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
self._test_func(input, func, expected)
Expand All @@ -357,7 +357,7 @@ def test_group_by_key_and_window(self):
input = [[('a', i)] for i in range(5)]

def func(dstream):
return dstream.groupByKeyAndWindow(4, 1).mapValues(list)
return dstream.groupByKeyAndWindow(3, 1).mapValues(list)

expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class StreamingContext private[streaming] (
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
new TransformedDStream[T](dstreams, (transformFunc))
new TransformedDStream[T](dstreams, transformFunc)
}

/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* JavaStreamingContext object contains a number of utility functions.
*/
object JavaStreamingContext {
implicit def fromStreamingContext(ssc: StreamingContext):
JavaStreamingContext = new JavaStreamingContext(ssc)

implicit def toStreamingContext(jssc: JavaStreamingContext): StreamingContext = jssc.ssc

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait PythonRDDFunction {
/**
* Wrapper for PythonRDDFunction
*/
class RDDFunction(pfunc: PythonRDDFunction)
private[python] class RDDFunction(pfunc: PythonRDDFunction)
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {

def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
Expand All @@ -68,6 +68,7 @@ class RDDFunction(pfunc: PythonRDDFunction)
some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava))
}

// for JFunction2
def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
pfunc.call(time.milliseconds, rdds)
}
Expand Down

0 comments on commit 19797f9

Please sign in to comment.