Skip to content

Commit

Permalink
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based…
Browse files Browse the repository at this point in the history
… shuffle is on

This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`,

1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and
2. avoids defensive copies in `Exchange` operator

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits:

591f2e9 [Cheng Lian] Passes all shuffle suites
0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed
ed5df3c [Cheng Lian] Fixes styling changes
f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on
  • Loading branch information
liancheng authored and marmbrus committed Nov 24, 2014
1 parent 29372b6 commit a6d7b61
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ private[spark] class ExternalSorter[K, V, C](
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else if (bypassMergeSort) {
// SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
if (records.hasNext) {
spillToPartitionFiles(records.map { kv =>
((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
})
}
} else {
// Stick values into our buffer
while (records.hasNext) {
Expand Down Expand Up @@ -336,6 +343,10 @@ private[spark] class ExternalSorter[K, V, C](
* @param collection whichever collection we're using (map or buffer)
*/
private def spillToPartitionFiles(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = {
spillToPartitionFiles(collection.iterator)
}

private def spillToPartitionFiles(iterator: Iterator[((Int, K), C)]): Unit = {
assert(bypassMergeSort)

// Create our file writers if we haven't done so yet
Expand All @@ -350,9 +361,9 @@ private[spark] class ExternalSorter[K, V, C](
}
}

val it = collection.iterator // No need to sort stuff, just write each element out
while (it.hasNext) {
val elem = it.next()
// No need to sort stuff, just write each element out
while (iterator.hasNext) {
val elem = iterator.next()
val partitionId = elem._1._1
val key = elem._1._2
val value = elem._2
Expand Down Expand Up @@ -748,6 +759,12 @@ private[spark] class ExternalSorter[K, V, C](

context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
if (curWriteMetrics != null) {
m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
}
}

lengths
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

// 10 partitions from 4 keys
val NUM_BLOCKS = 10
// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
val a = sc.parallelize(1 to 4, NUM_BLOCKS)
val b = a.map(x => (x, x*2))

// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))
.setSerializer(new KryoSerializer(conf))

val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
Expand All @@ -122,13 +122,13 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

// 10 partitions from 4 keys
val NUM_BLOCKS = 10
// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
val a = sc.parallelize(1 to 4, NUM_BLOCKS)
val b = a.map(x => (x, x*2))

// NOTE: The default Java serializer should create zero-sized blocks
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))

val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = if (sortBasedShuffleOn) {
// This is a workaround for SPARK-4479. When:
// 1. sort based shuffle is on, and
// 2. the partition number is under the merge threshold, and
// 3. no ordering is required
// we can avoid the defensive copies to improve performance. In the long run, we probably
// want to include information in shuffle dependencies to indicate whether elements in the
// source RDD should be copied.
val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
Expand Down Expand Up @@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)

case SinglePartition =>
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
// operators like `TakeOrdered` may require an ordering within the partition, and currently
// `SinglePartition` doesn't include ordering information.
// TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
Expand Down

0 comments on commit a6d7b61

Please sign in to comment.