Skip to content

Commit

Permalink
[SPARK-1690] Tolerating empty elements when saving Python RDD to text…
Browse files Browse the repository at this point in the history
… files

Tolerate empty strings in PythonRDD

Author: Kan Zhang <kzhang@apache.org>

Closes #644 from kanzhang/SPARK-1690 and squashes the following commits:

c62ad33 [Kan Zhang] Adding Python doctest
473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python RDD to text files
(cherry picked from commit 6c2691d)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
  • Loading branch information
kanzhang authored and pwendell committed May 10, 2014
1 parent 2a669a7 commit ac86af8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
Array.empty[Byte]
null
}
} catch {

Expand All @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](

var _nextObj = read()

def hasNext = _nextObj.length != 0
def hasNext = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,14 @@ def saveAsTextFile(self, path):
>>> from glob import glob
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
Empty lines are tolerated when saving to text files.
>>> tempFile2 = NamedTemporaryFile(delete=True)
>>> tempFile2.close()
>>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
>>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
'\\n\\n\\nbar\\nfoo\\n'
"""
def func(split, iterator):
for x in iterator:
Expand Down

0 comments on commit ac86af8

Please sign in to comment.