From 3dad595e0509535d28a04ea21fd49488aec5b7af Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Fri, 7 Mar 2014 10:29:12 +0800 Subject: [PATCH] review comment --- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b1b6491ab5678..eef469631141d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -851,8 +851,10 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) + // TODO: All RDDs have continuous index space. How to ensure this? + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -956,8 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob(