diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index dbb6fdf1694ad..12023374333a2 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -33,8 +33,8 @@ class StreamingContext(object): """ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, - gateway=None, sparkContext=None, duration=None): + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + gateway=None, sparkContext=None, duration=None): """ Create a new StreamingContext. At least the master and app name and duration should be set, either through the named parameters here or through C{conf}. @@ -63,8 +63,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, if sparkContext is None: # Create the Python Sparkcontext self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, - pyFiles=pyFiles, environment=environment, batchSize=batchSize, - serializer=serializer, conf=conf, gateway=gateway) + pyFiles=pyFiles, environment=environment, batchSize=batchSize, + serializer=serializer, conf=conf, gateway=gateway) else: self._sc = sparkContext @@ -107,7 +107,7 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination(timeout) - #TODO: add storageLevel + # TODO: add storageLevel def socketTextStream(self, hostname, port): """ Create an input from TCP source hostname:port. Data is received using diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index f91a3b8a355d2..b1d1b0d8dc165 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -75,7 +75,8 @@ def filter(self, f): """ Return a new DStream containing only the elements that satisfy predicate. """ - def func(iterator): return ifilter(f, iterator) + def func(iterator): + return ifilter(f, iterator) return self.mapPartitions(func) def flatMap(self, f, preservesPartitioning=False): @@ -130,7 +131,7 @@ def reduceByKey(self, func, numPartitions=None): return self.combineByKey(lambda x: x, func, func, numPartitions) def combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numPartitions = None): + numPartitions=None): """ Count the number of elements for each key, and return the result to the master as a dictionary @@ -153,7 +154,7 @@ def combineLocally(iterator): def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: - if not k in combiners: + if k not in combiners: combiners[k] = v else: combiners[k] = mergeCombiners(combiners[k], v) @@ -188,7 +189,7 @@ def add_shuffle_key(split, iterator): keyed._bypass_serializer = True with _JavaStackTrace(self.ctx) as st: partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, - id(partitionFunc)) + id(partitionFunc)) jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream() dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py index fa03410f3f8e2..495ac2edff198 100644 --- a/python/pyspark/streaming/duration.py +++ b/python/pyspark/streaming/duration.py @@ -333,6 +333,7 @@ def _is_duration(self, instance): if not isinstance(instance, Duration): raise TypeError("This should be Duration") + def Milliseconds(milliseconds): """ Helper function that creates instance of [[pysparkstreaming.duration]] representing @@ -346,6 +347,7 @@ def Milliseconds(milliseconds): """ return Duration(milliseconds) + def Seconds(seconds): """ Helper function that creates instance of [[pysparkstreaming.duration]] representing @@ -359,6 +361,7 @@ def Seconds(seconds): """ return Duration(seconds * 1000) + def Minutes(minutes): """ Helper function that creates instance of [[pysparkstreaming.duration]] representing diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4af48ee8f86b4..2ed099b1004c3 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -32,9 +32,9 @@ import sys if sys.version_info[:2] <= (2, 6): - import unittest2 as unittest - else: - import unittest + import unittest2 as unittest +else: + import unittest from pyspark.context import SparkContext from pyspark.streaming.context import StreamingContext @@ -57,7 +57,7 @@ def tearDown(self): @classmethod def tearDownClass(cls): - # Make sure tp shutdown the callback server + # Make sure tp shutdown the callback server SparkContext._gateway._shutdown_callback_server() @@ -71,7 +71,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase): All tests input should have list of lists(3 lists are default). This list represents stream. Every batch interval, the first object of list are chosen to make DStream. - e.g The first list in the list is input of the first batch. + e.g The first list in the list is input of the first batch. Please see the BasicTestSuits in Scala which is close to this implementation. """ def setUp(self): @@ -112,7 +112,7 @@ def test_flatMap_batch(self): def test_func(dstream): return dstream.flatMap(lambda x: (x, x * 2)) - expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))), + expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))), test_input) output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) @@ -191,12 +191,12 @@ def test_func(dstream): def test_reduceByKey_batch(self): """Basic operation test for DStream.reduceByKey with batch deserializer.""" test_input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)], - [("", 1),("", 1), ("", 1), ("", 1)], + [("", 1), ("", 1), ("", 1), ("", 1)], [(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]] def test_func(dstream): return dstream.reduceByKey(operator.add) - expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3 ,1)]] + expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3, 1)]] output = self._run_stream(test_input, test_func, expected_output) for result in (output, expected_output): self._sort_result_based_on_key(result) @@ -216,13 +216,13 @@ def test_func(dstream): def test_mapValues_batch(self): """Basic operation test for DStream.mapValues with batch deserializer.""" - test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], + test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [("", 4), (1, 1), (2, 2), (3, 3)], [(1, 1), (2, 1), (3, 1), (4, 1)]] def test_func(dstream): return dstream.mapValues(lambda x: x + 10) - expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)], + expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)], [("", 14), (1, 11), (2, 12), (3, 13)], [(1, 11), (2, 11), (3, 11), (4, 11)]] output = self._run_stream(test_input, test_func, expected_output) @@ -250,7 +250,8 @@ def test_flatMapValues_batch(self): def test_func(dstream): return dstream.flatMapValues(lambda x: (x, x + 10)) - expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12), ("c", 1), ("c", 11), ("d", 1), ("d", 11)], + expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12), + ("c", 1), ("c", 11), ("d", 1), ("d", 11)], [("", 4), ("", 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)], [(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]] output = self._run_stream(test_input, test_func, expected_output) @@ -344,7 +345,7 @@ def test_func(dstream): def test_groupByKey_batch(self): """Basic operation test for DStream.groupByKey with batch deserializer.""" - test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)], + test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)], [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] @@ -361,7 +362,7 @@ def test_func(dstream): def test_groupByKey_unbatch(self): """Basic operation test for DStream.groupByKey with unbatch deserializer.""" - test_input = [[(1, 1), (2, 1), (3, 1)], + test_input = [[(1, 1), (2, 1), (3, 1)], [(1, 1), (1, 1), ("", 1)], [("a", 1), ("a", 1), ("b", 1)]] @@ -378,12 +379,13 @@ def test_func(dstream): def test_combineByKey_batch(self): """Basic operation test for DStream.combineByKey with batch deserializer.""" - test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)], - [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], + test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)], + [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] def test_func(dstream): - def add(a, b): return a + str(b) + def add(a, b): + return a + str(b) return dstream.combineByKey(str, add, add) expected_output = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")], [(1, "111"), (2, "11"), (3, "1")], @@ -395,10 +397,13 @@ def add(a, b): return a + str(b) def test_combineByKey_unbatch(self): """Basic operation test for DStream.combineByKey with unbatch deserializer.""" - test_input = [[(1, 1), (2, 1), (3, 1)], [(1, 1), (1, 1), ("", 1)], [("a", 1), ("a", 1), ("b", 1)]] + test_input = [[(1, 1), (2, 1), (3, 1)], + [(1, 1), (1, 1), ("", 1)], + [("a", 1), ("a", 1), ("b", 1)]] def test_func(dstream): - def add(a, b): return a + str(b) + def add(a, b): + return a + str(b) return dstream.combineByKey(str, add, add) expected_output = [[(1, "1"), (2, "1"), (3, "1")], [(1, "11"), ("", "1")], @@ -445,7 +450,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # Check time out. if (current_time - start_time) > self.timeout: break - # StreamingContext.awaitTermination is not used to wait because + # StreamingContext.awaitTermination is not used to wait because # if py4j server is called every 50 milliseconds, it gets an error. time.sleep(0.05) # Check if the output is the same length of expected output. diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 651ba363957ea..cf90952543fc0 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -54,7 +54,6 @@ def msDurationToString(ms): >>> msDurationToString(3600000) '1.00 h' """ - #TODO: add doctest second = 1000 minute = 60 * second hour = 60 * minute diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index ceb50b4f99acd..77a9c4a0e0677 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -58,7 +58,7 @@ def main(infile, outfile): SparkFiles._is_running_on_worker = True # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here + sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) diff --git a/python/run-tests b/python/run-tests index ef4994d4e4b00..d5560dad69dc4 100755 --- a/python/run-tests +++ b/python/run-tests @@ -82,7 +82,9 @@ run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" run_test "pyspark/mllib/util.py" -run_test "pyspark/streaming/tests.py" +if [ -n "$_RUN_STREAMING_TESTS" ]; then + run_test "pyspark/streaming/tests.py" +fi if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green