Skip to content

Commit

Permalink
SPARK-1099:Spark's local mode should probably respect spark.cores.max…
Browse files Browse the repository at this point in the history
… by default

This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099
And this is what I do in this patch (also commented in the JIRA) @aarondav

 This is really a behavioral change, so I do this with great caution, and welcome any review advice:

1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores
The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default.
So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here.
2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern"
rules:
a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue
b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it
c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max
3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules.

Author: qqsun8819 <jin.oyj@alibaba-inc.com>

Closes apache#110 from qqsun8819/local-cores and squashes the following commits:

731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor
78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern
6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it
  • Loading branch information
qqsun8819 authored and aarondav committed Mar 19, 2014
1 parent 67fa71c commit 1678931
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,10 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
// Use user specified in config, up to all available cores
val realCores = Runtime.getRuntime.availableProcessors()
val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores)
val backend = new LocalBackend(scheduler, toUseCores)
scheduler.initialize(backend)
scheduler

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._
class FileSuite extends FunSuite with LocalSparkContext {

test("text files") {
sc = new SparkContext("local", "test")
sc = new SparkContext("local[1]", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
Expand Down Expand Up @@ -176,7 +176,7 @@ class FileSuite extends FunSuite with LocalSparkContext {

test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
sc = new SparkContext("local", "test")
sc = new SparkContext("local[1]", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {

def createTaskScheduler(master: String): TaskSchedulerImpl = {
def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
Expand All @@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite
}

test("local") {
val sched = createTaskScheduler("local")
var conf = new SparkConf()
conf.set("spark.cores.max", "1")
val sched = createTaskScheduler("local", conf)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 1)
case _ => fail()
}
}

test("local-cores-exceed") {
val cores = Runtime.getRuntime.availableProcessors() + 1
var conf = new SparkConf()
conf.set("spark.cores.max", cores.toString)
val sched = createTaskScheduler("local", conf)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
}

test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
Expand Down

0 comments on commit 1678931

Please sign in to comment.