Skip to content

Commit

Permalink
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
Browse files Browse the repository at this point in the history
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).

Author: jerryshao <saisai.shao@intel.com>

Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits:

be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo
  • Loading branch information
jerryshao authored and JoshRosen committed Oct 20, 2014
1 parent d1966f3 commit c7aeecd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,44 @@ private[spark] object Utils extends Logging {
dir
}

/** Copy all data from an InputStream to an OutputStream */
/** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
* copying is disabled by default unless explicitly set transferToEnabled as true,
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
*/
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false): Long =
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long =
{
var count = 0L
try {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
val size = inChannel.size()

// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + size}
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
} else {
val buf = new Array[Byte](8192)
var n = 0
Expand Down Expand Up @@ -727,7 +748,7 @@ private[spark] object Utils extends Logging {

/**
* Determines if a directory contains any files newer than cutoff seconds.
*
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
* given directory whose last modified time is later than this many seconds ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)

// Size of object batches when reading/writing from serializers.
//
Expand Down Expand Up @@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C](
var out: FileOutputStream = null
var in: FileInputStream = null
try {
out = new FileOutputStream(outputFile)
out = new FileOutputStream(outputFile, true)
for (i <- 0 until numPartitions) {
in = new FileInputStream(partitionWriters(i).fileSegment().file)
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
in.close()
in = null
lengths(i) = size
Expand Down

0 comments on commit c7aeecd

Please sign in to comment.