From 6e1c7e2798bb0b7b120e28a07c5d70fa162dd724 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Fri, 10 Jul 2015 19:29:32 -0700 Subject: [PATCH] [SPARK-7735] [PYSPARK] Raise Exception on non-zero exit from pipe commands This will allow problems with piped commands to be detected. This will also allow tasks to be retried where errors are rare (such as network problems in piped commands). Author: Scott Taylor Closes #6262 from megatron-me-uk/patch-2 and squashes the following commits: 04ae1d5 [Scott Taylor] Remove spurious empty line 98fa101 [Scott Taylor] fix blank line style error 574b564 [Scott Taylor] Merge pull request #2 from megatron-me-uk/patch-4 0c1e762 [Scott Taylor] Update rdd pipe method for checkCode ab9a2e1 [Scott Taylor] Update rdd pipe tests for checkCode eb4801c [Scott Taylor] fix fail_condition b0ac3a4 [Scott Taylor] Merge pull request #1 from megatron-me-uk/megatron-me-uk-patch-1 a307d13 [Scott Taylor] update rdd tests to test pipe modes 34fcdc3 [Scott Taylor] add optional argument 'mode' for rdd.pipe a0c0161 [Scott Taylor] fix generator issue 8a9ef9c [Scott Taylor] make check_return_code an iterator 0486ae3 [Scott Taylor] style fixes 8ed89a6 [Scott Taylor] Chain generators to prevent potential deadlock 4153b02 [Scott Taylor] fix list.sort returns None 491d3fc [Scott Taylor] Pass a function handle to assertRaises 3344a21 [Scott Taylor] wrap assertRaises with QuietTest 3ab8c7a [Scott Taylor] remove whitespace for style cc1a73d [Scott Taylor] fix style issues in pipe test 8db4073 [Scott Taylor] Add a test for rdd pipe functions 1b3dc4e [Scott Taylor] fix missing space around operator style 0974f98 [Scott Taylor] add space between words in multiline string 45f4977 [Scott Taylor] fix line too long style error 5745d85 [Scott Taylor] Remove space to fix style f552d49 [Scott Taylor] Catch non-zero exit from pipe commands --- python/pyspark/rdd.py | 16 ++++++++++++++-- python/pyspark/tests.py | 12 ++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 79dafb0a4ef27..3218bed5c74fc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -700,12 +700,14 @@ def groupBy(self, f, numPartitions=None): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}): + def pipe(self, command, env={}, checkCode=False): """ Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() [u'1', u'2', u'', u'3'] + + :param checkCode: whether or not to check the return value of the shell command. """ def func(iterator): pipe = Popen( @@ -717,7 +719,17 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + + def check_return_code(): + pipe.wait() + if checkCode and pipe.returncode: + raise Exception("Pipe function `%s' exited " + "with error code %d" % (command, pipe.returncode)) + else: + for i in range(0): + yield i + return (x.rstrip(b'\n').decode('utf-8') for x in + chain(iter(pipe.stdout.readline, b''), check_return_code())) return self.mapPartitions(func) def foreach(self, f): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 17256dfc95744..c5c0add49d02c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -885,6 +885,18 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): for size in sizes: self.assertGreater(size, 0) + def test_pipe_functions(self): + data = ['1', '2', '3'] + rdd = self.sc.parallelize(data) + with QuietTest(self.sc): + self.assertEqual([], rdd.pipe('cc').collect()) + self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect) + result = rdd.pipe('cat').collect() + result.sort() + [self.assertEqual(x, y) for x, y in zip(data, result)] + self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect) + self.assertEqual([], rdd.pipe('grep 4').collect()) + class ProfilerTests(PySparkTestCase):