From f552d49127d9e43799d5728f52682a1609fdedb8 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 19 May 2015 12:01:30 +0100 Subject: [PATCH] Catch non-zero exit from pipe commands This will allow problems with piped commands to be detected. This will also allow tasks to be retried where errors are rare (such as network problems in piped commands). --- python/pyspark/rdd.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 70db4bbe4cbc5..f83945a105efc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,7 +704,11 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + pipe.wait() + if pipe.returncode: + raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode) ) + return result return self.mapPartitions(func) def foreach(self, f):