diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index fe893210ab089..8396c4f960e81 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -43,7 +43,7 @@ def setUp(self): def tearDown(self): # Do not call pyspark.streaming.context.StreamingContext.stop directly because - # we do not wait to shutdown call back server and py4j client + # we do not wait to shutdown py4j client. self.ssc._jssc.stop() self.ssc._sc.stop() # Why does it long time to terminate StremaingContext and SparkContext? @@ -74,7 +74,6 @@ def setUp(self): PySparkStreamingTestCase.setUp(self) self.timeout = 10 # seconds self.numInputPartitions = 2 - self.result = list() def tearDown(self): PySparkStreamingTestCase.tearDown(self) @@ -426,7 +425,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # Apply test function to stream. test_stream = test_func(test_input_stream) # Add job to get output from stream. - test_stream._test_output(self.result) + result = list() + test_stream._test_output(result) self.ssc.start() start_time = time.time() @@ -438,10 +438,10 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): break self.ssc.awaitTermination(50) # Check if the output is the same length of expexted output. - if len(expected_output) == len(self.result): + if len(expected_output) == len(result): break - return self.result + return result class TestSaveAsFilesSuite(PySparkStreamingTestCase): def setUp(self):