Skip to content

Commit

Permalink
Avoid using empty string as default preferred location
Browse files Browse the repository at this point in the history
This is causing the TaskSetManager to try to schedule certain
tasks on the host "" (empty string). The intended semantics here,
however, is that the partition does not have preferred location,
and the TSM should schedule the corresponding task in accordance.
  • Loading branch information
Andrew Or committed Dec 8, 2014
1 parent 8817fc7 commit 2f7dfb6
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
* @param preferredLocation the preferred location for this partition
*/
private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: String = ""
) extends Partition {
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: Option[String] = None) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))

@throws(classOf[IOException])
Expand All @@ -55,9 +54,11 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
val loc = parents.count(p =>
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))

val loc = parents.count { p =>
preferredLocation.exists { l =>
rdd.context.getPreferredLocs(rdd, p.index).map(_.host).contains(l)
}
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
Expand All @@ -73,9 +74,9 @@ private[spark] case class CoalescedRDDPartition(
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -113,7 +114,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}

Expand Down Expand Up @@ -147,7 +148,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
*
*/

private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {

def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
Expand Down Expand Up @@ -341,8 +342,11 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
}
}

private[spark] case class PartitionGroup(prefLoc: String = "") {
private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()

def size = arr.size
}

private object PartitionGroup {
def apply(prefLoc: String): PartitionGroup = PartitionGroup(Some(prefLoc))
}

0 comments on commit 2f7dfb6

Please sign in to comment.