Skip to content

Commit

Permalink
review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
liguoqiang committed Mar 7, 2014
1 parent e3e56aa commit 3dad595
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,10 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet)
require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(","))
// TODO: All RDDs have continuous index space. How to ensure this?
partitions.foreach{ p =>
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
Expand Down Expand Up @@ -956,8 +958,9 @@ class SparkContext(
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet)
require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(","))
partitions.foreach{ p =>
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
}
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
Expand Down

0 comments on commit 3dad595

Please sign in to comment.