Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers #1106

Closed

Conversation

WangTaoTheTonic
Copy link
Contributor

If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization.

We should do randomization every time we dispatch a driver, in order to better balance drivers.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@CodingCat
Copy link
Contributor

did you see any performance impact on the current strategy, "randomization at the start of every schedule point" is used not only at Master but also TaskSchedulerImpl...

until so far, it works fine......

@WangTaoTheTonic
Copy link
Contributor Author

You mean the increased shuffles may lead to a bad performance?

@CodingCat
Copy link
Contributor

yes, potentially...

and considering the long-term running of the cluster, eventually, the load is well balanced with the current strategy.....i.e. this commit only contributes to the case that the user submits a lot of drivers in a single batch and then waits for the result...

@WangTaoTheTonic
Copy link
Contributor Author

Another situation is that the works lists changes frequently, which will make drivers relaunching a lot.

Even when resources in all workers is not enough at same time, the drivers array could become larger as users keep submitting.

@CodingCat
Copy link
Contributor

I'm not sure about efficiency of changing to another mode for the extreme case that some worker (which is exactly the one running a lot of drivers) joins and leaves constantly....

for the second case, when the resources in all workers is not enough at same time, the driver cannot be scheduled.....

@WangTaoTheTonic
Copy link
Contributor Author

Make it short, the commit will better balance the load strategy when there comes a lot of drivers, while not result in bad performance when drivers is few as the randomization frequency depends on drivers number.

@andrewor14
Copy link
Contributor

@WangTaoTheTonic Can you up merge this with master? Also could you file a JIRA associated with this PR?

for (driver <- waitingDrivers) {
for (driver <- waitingDrivers) {
val aliveWorkers = workers.filter(_.state == WorkerState.ALIVE)
val shuffledWorkers = Random.shuffle(aliveWorkers) // Randomization helps balance drivers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates multiple copies of all the workers and could be slow for large clusters. Maybe it's a better idea to keep track of a list of alive workers instead

@WangTaoTheTonic
Copy link
Contributor Author

The PR is: https://issues.apache.org/jira/browse/SPARK-3411.
Cause the filter will create copy of worker, so I change the way of filtering.
The shuffle will create copies too, could we change its way ?
@andrewor14 Please check and offer some advice.

@WangTaoTheTonic WangTaoTheTonic changed the title Optimize the schedule procedure in Master [SPARK-3411]Optimize the schedule procedure in Master Sep 5, 2014
@JoshRosen
Copy link
Contributor

Can you give this PR a more descriptive title? "Optimize the schedule procedure in Master" sounds like it could describe many different changes, so it's kind of hard to figure out what this PR might do by reading the current title.

I'd go with something like "[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers."

@JoshRosen
Copy link
Contributor

I agree that this seems like a bit of a rare corner-case.

@andrewor14
Copy link
Contributor

@WangTaoTheTonic I looked at this more and I think it will actually be slower with the new changes. Before this patch we shuffle all the workers only once, but here we shuffle it for each waiting driver. If we have thousands of workers, shuffling can become fairly expensive.

As for the original problem you set out to solve, it seems to me that the existing code is already doing the randomization correctly. This is because we draw the first worker from a list of shuffled workers, so it will already be random by the time we draw from it.

@JoshRosen
Copy link
Contributor

If the problem is all of the drivers landing on the same randomly-chosen worker, I suppose you could treat the randomized list as a circular buffer and go through it in a round-robin order when picking which worker to launch the next driver on.

@andrewor14
Copy link
Contributor

Oh I see. Wouldn't it be sufficient to just pop the head of shuffledWorkers after allocating each driver then? Or just keep an iterator for shuffledWorkers to never reuse the same worker.

@JoshRosen
Copy link
Contributor

Yeah, I suppose so, but there was one corner-case that I was concerned about (that is addressed by treating it as a circular buffer):

Let's say we have a scenario where we are trying to schedule three drivers but there are only two workers that they can run on, and let's also say that there's initially enough capacity to run all three drivers. In that case, I think we would pop the head of shuffleWorkers until it becomes empty and not schedule the third driver. This other driver would get scheduled on a subsequent call to Master.schedule(), but I guess that only happens when new apps join or when resource availability changes, so we might wait longer than is necessary to launch this driver.

The circular buffer brings its own problems, though: let's say that there are no valid locations where we can schedule the driver. In this case, we should stop looping through the buffer so that we don't go into an infinite loop.

@andrewor14
Copy link
Contributor

Hm, it looks like launchDriver is asynchronous, so there seems to be no easy way to identify workers that have already been scheduled to launch a driver. This means even with the outstanding changes we might schedule too many drivers on the same worker. Now, we could keep track of the worker's remaining memory and cores after scheduling them to launch drivers, but this adds some complexity:

(semi-pseudocode)

// ID -> scheduled resources as a 2-tuple (memory, cores)
val scheduledWorkerResources = new HashMap[Int, (Int, Int)]

val shuffledWorkers = Random.shuffle(workers).iterator.filter(_.state == ALIVE)
for each waiting driver {
  if (shuffledWorkers.hasNext) {
    val candidateWorker = shuffledWorkers.next()
    val (scheduledMemory, scheduledCores) =
      scheduledWorkerResources.get(candidateWorker.id).getOrElse(0, 0)
    val remainingMemory = candidateWorker.memory - scheduledMemory
    val remainingCores = candidateWorker.cores - scheduledCores
    // Compare remaining resources to account for workers that have already been scheduled
    if (remainingMemory > driver.mem && remainingCores > driver.cores) {
      launchDriver(candidateWorker)
      ...
      // update scheduledWorkerResources 
      // add back this used worker into the pool to iterate through
    }
  }
}

@WangTaoTheTonic WangTaoTheTonic changed the title [SPARK-3411]Optimize the schedule procedure in Master [SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers Sep 6, 2014
@WangTaoTheTonic
Copy link
Contributor Author

To @JoshRosen , the pr title is already modified, so is the jira.
@andrewor14 i think keeping track the workers' resource is too complex. So I choose worker to launch the driver with round robin to avoid doing shuffle every time.
If all workers have no enough mem and core, we just skip this worker to schedule the next one, which is consistent with before.
What do you think guys?

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have started for PR 1106 at commit bc91bb1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have finished for PR 1106 at commit bc91bb1.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have started for PR 1106 at commit 2ca3091.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have finished for PR 1106 at commit 2ca3091.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have started for PR 1106 at commit b6560cf.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have finished for PR 1106 at commit b6560cf.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val startPos = curPos
curPos = (curPos + 1) % aliveWorkerNum
var launched = false
while (curPos - 1 != startPos && !launched) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat minor, but why don't we just declare val startPos = curPos after we increment it? Then we don't need the -1 here

@andrewor14
Copy link
Contributor

@WangTaoTheTonic This mostly looks good. I left a few more minor comments. Anything else to add @markhamstra and @JoshRosen?

val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
var curPos = 0
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be just:

for (driver <- waitingDrivers.toList)

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 1106 at commit d1a928b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 1106 at commit d1a928b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression]
    • case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction
    • case class Abs(child: Expression) extends UnaryExpression

@WangTaoTheTonic WangTaoTheTonic force-pushed the fixBalanceDrivers branch 2 times, most recently from 76346bc to d1a928b Compare September 10, 2014 09:49
@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 1106 at commit d1a928b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 1106 at commit d1a928b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression]
    • case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction
    • case class Abs(child: Expression) extends UnaryExpression

@andrewor14
Copy link
Contributor

Thanks @WangTaoTheTonic I'm merging this

@asfgit asfgit closed this in 558962a Sep 10, 2014
asfgit pushed a commit that referenced this pull request Sep 17, 2014
I think, this issue is caused by #1106

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #2436 from sarutak/SPARK-3571 and squashes the following commits:

7a4deea [Kousuke Saruta] Modified Master.scala to use numWorkersVisited and numWorkersAlive instead of stopPos
4e51e35 [Kousuke Saruta] Modified Master to prevent from 0 divide
4817ecd [Kousuke Saruta] Brushed up previous change
71e84b6 [Kousuke Saruta] Modified Master to enable schedule normally
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 15, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in apache#1106 which was erased due to the merging of apache#731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes apache#11702 from CodingCat/SPARK-13803.
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants