Skip to content

Commit

Permalink
basic function test cases are passed
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 15, 2014
1 parent 2112638 commit 536def4
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 118 deletions.
209 changes: 159 additions & 50 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""
from itertools import chain
import os
import time
import unittest
import operator
Expand All @@ -34,9 +33,6 @@
from pyspark.streaming.duration import *


SPARK_HOME = os.environ["SPARK_HOME"]


class PySparkStreamingTestCase(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
Expand All @@ -49,7 +45,7 @@ def tearDown(self):
self.ssc._sc.stop()
# Why does it long time to terminaete StremaingContext and SparkContext?
# Should we change the sleep time if this depends on machine spec?
time.sleep(5)
time.sleep(8)

@classmethod
def tearDownClass(cls):
Expand All @@ -59,8 +55,17 @@ def tearDownClass(cls):

class TestBasicOperationsSuite(PySparkStreamingTestCase):
"""
Input and output of this TestBasicOperationsSuite is the equivalent to
Scala TestBasicOperationsSuite.
2 tests for each function for batach deserializer and unbatch deserilizer because
we cannot change the deserializer after streaming process starts.
Default numInputPartitions is 2.
If the number of input element is over 3, that DStream use batach deserializer.
If not, that DStream use unbatch deserializer.
Most of the operation uses UTF8 deserializer to get value from Scala.
I am wondering if these test are enough or not.
All tests input should have list of lists. This represents stream.
Every batch interval, the first object of list are chosen to make DStream.
Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
"""
def setUp(self):
PySparkStreamingTestCase.setUp(self)
Expand All @@ -75,8 +80,8 @@ def tearDown(self):
def tearDownClass(cls):
PySparkStreamingTestCase.tearDownClass()

def test_map(self):
"""Basic operation test for DStream.map"""
def test_map_batch(self):
"""Basic operation test for DStream.map with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
Expand All @@ -85,8 +90,18 @@ def test_func(dstream):
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMap(self):
"""Basic operation test for DStream.faltMap"""
def test_map_unbatach(self):
"""Basic operation test for DStream.map with unbatch deserializer"""
test_input = [range(1, 4), range(4, 7), range(7, 10)]

def test_func(dstream):
return dstream.map(lambda x: str(x))
expected_output = map(lambda x: map(lambda y: str(y), x), test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMap_batch(self):
"""Basic operation test for DStream.faltMap with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
Expand All @@ -96,8 +111,19 @@ def test_func(dstream):
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_filter(self):
"""Basic operation test for DStream.filter"""
def test_flatMap_unbatch(self):
"""Basic operation test for DStream.faltMap with unbatch deserializer"""
test_input = [range(1, 4), range(4, 7), range(7, 10)]

def test_func(dstream):
return dstream.flatMap(lambda x: (x, x * 2))
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_filter_batch(self):
"""Basic operation test for DStream.filter with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
Expand All @@ -106,21 +132,38 @@ def test_func(dstream):
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_count(self):
"""Basic operation test for DStream.count"""
#test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
test_input = [range(1, 5), range(1,10), range(1,20)]
def test_filter_unbatch(self):
"""Basic operation test for DStream.filter with unbatch deserializer"""
test_input = [range(1, 4), range(4, 7), range(7, 10)]

def test_func(dstream):
return dstream.filter(lambda x: x % 2 == 0)
expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_count_batch(self):
"""Basic operation test for DStream.count with batch deserializer"""
test_input = [range(1, 5), range(1, 10), range(1, 20)]

def test_func(dstream):
print "count"
dstream.count().pyprint()
return dstream.count()
expected_output = map(lambda x: [len(x)], test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_reduce(self):
"""Basic operation test for DStream.reduce"""

def test_count_unbatch(self):
"""Basic operation test for DStream.count with unbatch deserializer"""
test_input = [[], [1], range(1, 3), range(1, 4)]

def test_func(dstream):
return dstream.count()
expected_output = map(lambda x: [len(x)], test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_reduce_batch(self):
"""Basic operation test for DStream.reduce with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
Expand All @@ -129,67 +172,132 @@ def test_func(dstream):
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_reduceByKey(self):
"""Basic operation test for DStream.reduceByKey"""
#test_input = [["a", "a", "b"], ["", ""], []]
test_input = [["a", "a", "b", "b"], ["", "", "", ""], []]
def test_reduce_unbatch(self):
"""Basic operation test for DStream.reduce with unbatch deserializer"""
test_input = [[1], range(1, 3), range(1, 4)]

def test_func(dstream):
return dstream.reduce(operator.add)
expected_output = map(lambda x: [reduce(operator.add, x)], test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_reduceByKey_batch(self):
"""Basic operation test for DStream.reduceByKey with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 2)], [("", 4)]]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_reduceByKey_unbatch(self):
"""Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
print "reduceByKey"
dstream.map(lambda x: (x, 1)).pyprint()
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
#expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
expected_output = [[("a", 2), ("b", 2)], [("", 4)], []]
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_mapValues(self):
"""Basic operation test for DStream.mapValues"""
#test_input = [["a", "a", "b"], ["", ""], []]
test_input = [["a", "a", "b", "b"], ["", "", "", ""], []]
def test_mapValues_batch(self):
"""Basic operation test for DStream.mapValues with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10)
#expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
expected_output = [[("a", 12), ("b", 12)], [("", 14)], []]
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 12)], [("", 14)]]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMapValues(self):
"""Basic operation test for DStream.flatMapValues"""
#test_input = [["a", "a", "b"], ["", ""], []]
test_input = [["a", "a", "b", "b"], ["", "", "",""], []]
def test_mapValues_unbatch(self):
"""Basic operation test for DStream.mapValues with unbatch deserializer"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10))
#expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)], []]
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_glom(self):
"""Basic operation test for DStream.glom"""
def test_flatMapValues_batch(self):
"""Basic operation test for DStream.flatMapValues with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)]]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMapValues_unbatch(self):
"""Basic operation test for DStream.flatMapValues with unbatch deserializer"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_glom_batch(self):
"""Basic operation test for DStream.glom with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]
numSlices = 2

def test_func(dstream):
return dstream.glom()
expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]]
expected_output = [[[1, 2], [3, 4]], [[5, 6], [7, 8]], [[9, 10], [11, 12]]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def test_glom_unbatach(self):
"""Basic operation test for DStream.glom with unbatch deserialiser"""
test_input = [range(1, 4), range(4, 7), range(7, 10)]
numSlices = 2

def test_func(dstream):
return dstream.glom()
expected_output = [[[1], [2, 3]], [[4], [5, 6]], [[7], [8, 9]]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def test_mapPartitions(self):
"""Basic operation test for DStream.mapPartitions"""
def test_mapPartitions_batch(self):
"""Basic operation test for DStream.mapPartitions with batch deserializer"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]
numSlices = 2

def test_func(dstream):
def f(iterator): yield sum(iterator)
def f(iterator):
yield sum(iterator)
return dstream.mapPartitions(f)
expected_output = [[3, 7], [11, 15], [19, 23]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def test_mapPartitions_unbatch(self):
"""Basic operation test for DStream.mapPartitions with unbatch deserializer"""
test_input = [range(1, 4), range(4, 7), range(7, 10)]
numSlices = 2

def test_func(dstream):
def f(iterator):
yield sum(iterator)
return dstream.mapPartitions(f)
expected_output = [[1, 5], [4, 11], [7, 17]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
"""Start stream and return the output"""
# Generate input stream with user-defined input
Expand All @@ -212,6 +320,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
# check if the output is the same length of expexted output
if len(expected_output) == len(self.result):
break

return self.result

if __name__ == "__main__":
Expand Down
11 changes: 0 additions & 11 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import time
import socket
import traceback
import itertools
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
from pyspark.accumulators import _accumulatorRegistry
Expand Down Expand Up @@ -75,16 +74,6 @@ def main(infile, outfile):
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
print "deserializer in worker: %s" % str(deserializer)
iterator, walk = itertools.tee(iterator)
if isinstance(walk, int):
print "this is int"
print walk
else:
try:
print list(walk)
except:
print list(walk)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception as e:
# Write the error to stderr in addition to trying to pass it back to
Expand Down
Loading

0 comments on commit 536def4

Please sign in to comment.