diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index da9d11f689aef..45575f14fa798 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,12 +704,15 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) - pipe.wait() - if pipe.returncode: - raise Exception("Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode)) - return result + def check_return_code(): + pipe.wait() + if pipe.returncode: + raise Exception("Pipe function `%s' exited " + "with error code %d" % (command, pipe.returncode)) + else: + return None + return (x.rstrip(b'\n').decode('utf-8') for x in + chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) def foreach(self, f):