Skip to content

Commit

Permalink
[SPARK-7375] Avoid defensive copying in exchange operator when sort.s…
Browse files Browse the repository at this point in the history
…erializeMapOutputs takes effect.
  • Loading branch information
JoshRosen committed May 6, 2015
1 parent 845d1d4 commit ad006a4
Showing 1 changed file with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,63 @@ case class Exchange(

override def output: Seq[Attribute] = child.output

/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

private val serializeMapOutputs =
child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
* shuffle code assumes that objects are immutable and hence does not perform its own defensive
* copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
* order to properly shuffle the output of these operators, we need to perform our own copying
* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
* whenever possible. This method encapsulates the logic for choosing when to copy.
*
* In the long run, we might want to push this logic into core's shuffle APIs so that we don't
* have to rely on knowledge of core internals here in SQL.
*
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
*
* @param numPartitions the number of output partitions produced by the shuffle
* @param serializer the serializer that will be used to write rows
* @return true if rows should be copied before being shuffled, false otherwise
*/
private def needToCopyObjectsBeforeShuffle(
numPartitions: Int,
serializer: Serializer): Boolean = {
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
// Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
// However, there are two special cases where we can avoid the copy, described below:
if (numPartitions <= bypassMergeThreshold) {
// If the number of output partitions is sufficiently small, then Spark will fall back to
// the old hash-based shuffle write path which doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
false
} else {
// None of the special cases held, so we must copy.
true
}
} else {
// We're using hash-based shuffle, so we don't need to copy.
false
}
}

private val keyOrdering = {
if (newOrdering.nonEmpty) {
val key = newPartitioning.keyExpressions
Expand All @@ -81,7 +132,7 @@ case class Exchange(

@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

def serializer(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
numPartitions: Int): Serializer = {
Expand Down Expand Up @@ -123,17 +174,11 @@ case class Exchange(
override def execute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
// This is a workaround for SPARK-4479. When:
// 1. sort based shuffle is on, and
// 2. the partition number is under the merge threshold, and
// 3. no ordering is required
// we can avoid the defensive copies to improve performance. In the long run, we probably
// want to include information in shuffle dependencies to indicate whether elements in the
// source RDD should be copied.
val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold

val rdd = if (willMergeSort || newOrdering.nonEmpty) {
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, numPartitions)

val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
Expand All @@ -152,14 +197,14 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Row, Row](rdd, part)
}
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions))

shuffled.setSerializer(serializer)
shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
val keySchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, null, numPartitions)

val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) {
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
child.execute().mapPartitions { iter =>
Expand All @@ -178,17 +223,14 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
}
val keySchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, null, numPartitions))

shuffled.setSerializer(serializer)
shuffled.map(_._1)

case SinglePartition =>
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
// operators like `TakeOrdered` may require an ordering within the partition, and currently
// `SinglePartition` doesn't include ordering information.
// TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(null, valueSchema, 1)

val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions = 1, serializer)) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
child.execute().mapPartitions { iter =>
Expand All @@ -198,8 +240,7 @@ case class Exchange(
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(null, valueSchema, 1))
shuffled.setSerializer(serializer)
shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
Expand Down

0 comments on commit ad006a4

Please sign in to comment.