Skip to content

Commit

Permalink
[SPARK-4759] Fix driver hanging from coalescing partitions
Browse files Browse the repository at this point in the history
The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction.

This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly.

Author: Andrew Or <andrew@databricks.com>

Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits:

e520d6b [Andrew Or] Oops
3ebf8bd [Andrew Or] A few comments
f370a4e [Andrew Or] Fix tests
2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location

(cherry picked from commit 4f93d0c)
Signed-off-by: Andrew Or <andrew@databricks.com>
  • Loading branch information
Andrew Or committed Jan 21, 2015
1 parent fd6266f commit 0c13eed
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
36 changes: 21 additions & 15 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
* @param preferredLocation the preferred location for this partition
*/
private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: String = ""
) extends Partition {
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: Option[String] = None) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))

@throws(classOf[IOException])
Expand All @@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
val loc = parents.count(p =>
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))

val loc = parents.count { p =>
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
Expand All @@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
*
*/

private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {

def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
Expand Down Expand Up @@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
}
}

private[spark] case class PartitionGroup(prefLoc: String = "") {
private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()

def size = arr.size
}

private object PartitionGroup {
def apply(prefLoc: String): PartitionGroup = {
require(prefLoc != "", "Preferred location must not be empty")
PartitionGroup(Some(prefLoc))
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")

// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
Expand Down

0 comments on commit 0c13eed

Please sign in to comment.