Skip to content

Commit

Permalink
fixed pep8 violation
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 21, 2014
1 parent b7dab85 commit 0d30109
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 31 deletions.
10 changes: 5 additions & 5 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class StreamingContext(object):
"""

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, sparkContext=None, duration=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, sparkContext=None, duration=None):
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -63,8 +63,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
if sparkContext is None:
# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
else:
self._sc = sparkContext

Expand Down Expand Up @@ -107,7 +107,7 @@ def awaitTermination(self, timeout=None):
else:
self._jssc.awaitTermination(timeout)

#TODO: add storageLevel
# TODO: add storageLevel
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
Expand Down
9 changes: 5 additions & 4 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def filter(self, f):
"""
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
def func(iterator):
return ifilter(f, iterator)
return self.mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
Expand Down Expand Up @@ -130,7 +131,7 @@ def reduceByKey(self, func, numPartitions=None):
return self.combineByKey(lambda x: x, func, func, numPartitions)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = None):
numPartitions=None):
"""
Count the number of elements for each key, and return the result to the
master as a dictionary
Expand All @@ -153,7 +154,7 @@ def combineLocally(iterator):
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
if not k in combiners:
if k not in combiners:
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
Expand Down Expand Up @@ -188,7 +189,7 @@ def add_shuffle_key(split, iterator):
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
id(partitionFunc))
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
partitioner).asJavaDStream()
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def _is_duration(self, instance):
if not isinstance(instance, Duration):
raise TypeError("This should be Duration")


def Milliseconds(milliseconds):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand All @@ -346,6 +347,7 @@ def Milliseconds(milliseconds):
"""
return Duration(milliseconds)


def Seconds(seconds):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand All @@ -359,6 +361,7 @@ def Seconds(seconds):
"""
return Duration(seconds * 1000)


def Minutes(minutes):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand Down
43 changes: 24 additions & 19 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import sys

if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest
import unittest2 as unittest
else:
import unittest

from pyspark.context import SparkContext
from pyspark.streaming.context import StreamingContext
Expand All @@ -57,7 +57,7 @@ def tearDown(self):

@classmethod
def tearDownClass(cls):
# Make sure tp shutdown the callback server
# Make sure tp shutdown the callback server
SparkContext._gateway._shutdown_callback_server()


Expand All @@ -71,7 +71,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
All tests input should have list of lists(3 lists are default). This list represents stream.
Every batch interval, the first object of list are chosen to make DStream.
e.g The first list in the list is input of the first batch.
e.g The first list in the list is input of the first batch.
Please see the BasicTestSuits in Scala which is close to this implementation.
"""
def setUp(self):
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_flatMap_batch(self):

def test_func(dstream):
return dstream.flatMap(lambda x: (x, x * 2))
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)
Expand Down Expand Up @@ -191,12 +191,12 @@ def test_func(dstream):
def test_reduceByKey_batch(self):
"""Basic operation test for DStream.reduceByKey with batch deserializer."""
test_input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
[("", 1),("", 1), ("", 1), ("", 1)],
[("", 1), ("", 1), ("", 1), ("", 1)],
[(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]

def test_func(dstream):
return dstream.reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3 ,1)]]
expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3, 1)]]
output = self._run_stream(test_input, test_func, expected_output)
for result in (output, expected_output):
self._sort_result_based_on_key(result)
Expand All @@ -216,13 +216,13 @@ def test_func(dstream):

def test_mapValues_batch(self):
"""Basic operation test for DStream.mapValues with batch deserializer."""
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
[("", 4), (1, 1), (2, 2), (3, 3)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]

def test_func(dstream):
return dstream.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
[("", 14), (1, 11), (2, 12), (3, 13)],
[(1, 11), (2, 11), (3, 11), (4, 11)]]
output = self._run_stream(test_input, test_func, expected_output)
Expand Down Expand Up @@ -250,7 +250,8 @@ def test_flatMapValues_batch(self):

def test_func(dstream):
return dstream.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12), ("c", 1), ("c", 11), ("d", 1), ("d", 11)],
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
("c", 1), ("c", 11), ("d", 1), ("d", 11)],
[("", 4), ("", 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
[(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]]
output = self._run_stream(test_input, test_func, expected_output)
Expand Down Expand Up @@ -344,7 +345,7 @@ def test_func(dstream):

def test_groupByKey_batch(self):
"""Basic operation test for DStream.groupByKey with batch deserializer."""
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]

Expand All @@ -361,7 +362,7 @@ def test_func(dstream):

def test_groupByKey_unbatch(self):
"""Basic operation test for DStream.groupByKey with unbatch deserializer."""
test_input = [[(1, 1), (2, 1), (3, 1)],
test_input = [[(1, 1), (2, 1), (3, 1)],
[(1, 1), (1, 1), ("", 1)],
[("a", 1), ("a", 1), ("b", 1)]]

Expand All @@ -378,12 +379,13 @@ def test_func(dstream):

def test_combineByKey_batch(self):
"""Basic operation test for DStream.combineByKey with batch deserializer."""
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]

def test_func(dstream):
def add(a, b): return a + str(b)
def add(a, b):
return a + str(b)
return dstream.combineByKey(str, add, add)
expected_output = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")],
[(1, "111"), (2, "11"), (3, "1")],
Expand All @@ -395,10 +397,13 @@ def add(a, b): return a + str(b)

def test_combineByKey_unbatch(self):
"""Basic operation test for DStream.combineByKey with unbatch deserializer."""
test_input = [[(1, 1), (2, 1), (3, 1)], [(1, 1), (1, 1), ("", 1)], [("a", 1), ("a", 1), ("b", 1)]]
test_input = [[(1, 1), (2, 1), (3, 1)],
[(1, 1), (1, 1), ("", 1)],
[("a", 1), ("a", 1), ("b", 1)]]

def test_func(dstream):
def add(a, b): return a + str(b)
def add(a, b):
return a + str(b)
return dstream.combineByKey(str, add, add)
expected_output = [[(1, "1"), (2, "1"), (3, "1")],
[(1, "11"), ("", "1")],
Expand Down Expand Up @@ -445,7 +450,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
# Check time out.
if (current_time - start_time) > self.timeout:
break
# StreamingContext.awaitTermination is not used to wait because
# StreamingContext.awaitTermination is not used to wait because
# if py4j server is called every 50 milliseconds, it gets an error.
time.sleep(0.05)
# Check if the output is the same length of expected output.
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def msDurationToString(ms):
>>> msDurationToString(3600000)
'1.00 h'
"""
#TODO: add doctest
second = 1000
minute = 60 * second
hour = 60 * minute
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def main(infile, outfile):
SparkFiles._is_running_on_worker = True

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
Expand Down
4 changes: 3 additions & 1 deletion python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/tests.py"
run_test "pyspark/mllib/util.py"
run_test "pyspark/streaming/tests.py"
if [ -n "$_RUN_STREAMING_TESTS" ]; then
run_test "pyspark/streaming/tests.py"
fi

if [[ $FAILED == 0 ]]; then
echo -en "\033[32m" # Green
Expand Down

0 comments on commit 0d30109

Please sign in to comment.