Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #28370

Closed

Conversation

prakharjain09
Copy link
Contributor

@prakharjain09 prakharjain09 commented Apr 27, 2020

What changes were proposed in this pull request?

After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

Why are the changes needed?

We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

  1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

  2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

  3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added UTs.

@prakharjain09
Copy link
Contributor Author

prakharjain09 commented Apr 27, 2020

Recreated PR for #27864 to identify and fix test failures. Please refer to #27864 for more information/context.

@prakharjain09
Copy link
Contributor Author

Can someone please enable Jenkins test runs on this.

cc - @holdenk @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

ok to test

@holdenk
Copy link
Contributor

holdenk commented Apr 27, 2020

Jenkins ok to test

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for re-opening the PR, and sorry I merged the other one incorrectly. Hopefully we can get this passing CI quickly and merged in this week :)

stopped = true
logInfo("Stopping block replication thread")
blockReplicationThread.interrupt()
blockReplicationThread.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is joining the thread here might block if the interrupt didn't work as we want it to, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk This entire BlockManagerDecommissionManager#stop() method is called by decommissionManager.foreach(_.stop()) inside BlockManager#stop().

decommissionManager is None by default (as storage decommissioning feature is behind a config and by default disabled). So this code shouldn't event trigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but when it is turned on (in your test case) this might keep the worker process from exiting when we ask it to stop?

Copy link
Contributor Author

@prakharjain09 prakharjain09 Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Yeah - But all the tests that are failing in jenkins build are not the ones written in this PR. So that means all those tests must be running with storage-decommissioning-flag disabled?

  1. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121988/testReport/
  2. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121917/testReport/

Is the same Spark application going to get used across all these tests? I was assuming that new SparkApp will be created and destroyed for my specific tests (as BlockManagerDecommissionSuite creates new SparkContext as part of test).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's hanging with remaining alive workers. Try taking this out and see if it makes a difference.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is joining the thread here might block if the interrupt didn't work as we want it to, what do you think?

I think the initial concern is regarding join waiting infinitely as the "stop" is not percolated deep enough to stop early.... May be it needs to be refactored?

@SparkQA
Copy link

SparkQA commented Apr 27, 2020

Test build #121917 has finished for PR 28370 at commit bb324f9.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 28, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121934 has finished for PR 28370 at commit bb324f9.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121988 has finished for PR 28370 at commit 5847c1c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AggregateWithHaving(
  • abstract class CurrentTimestampLike() extends LeafExpression with CodegenFallback
  • case class CurrentTimestamp() extends CurrentTimestampLike
  • case class Now() extends CurrentTimestampLike
  • case class DateAddInterval(

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #122005 has finished for PR 28370 at commit 5847c1c.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AggregateWithHaving(
  • abstract class CurrentTimestampLike() extends LeafExpression with CodegenFallback
  • case class CurrentTimestamp() extends CurrentTimestampLike
  • case class Now() extends CurrentTimestampLike
  • case class DateAddInterval(

@ScrapCodes
Copy link
Member

Hi Folks, I am new to this, and please forgive me for a basic question. I am wondering how much time do we get between a SIGPWR is received and then Worker is preempted? Is it viable(reliable) to migrate all the blocks?

@holdenk
Copy link
Contributor

holdenk commented Apr 29, 2020

@ScrapCodes It depends on your cluster, but it could be anywhere from 1 second to several hours. Generally, though I'd expect most situations to be in the minutes time frame. Eventually, we can start using this code path as part of voluntary scale down though and there we have even more control.

@ScrapCodes
Copy link
Member

Alright this make sense, especially for a voluntary scale down. Thank you @holdenk for explaining.

@holdenk
Copy link
Contributor

holdenk commented Apr 30, 2020

@ScrapCodes: In the future (and I've filed a JIRA for this), for non-voluntary scale downs we can try and prioritize blocks, but I think this is a solid first step :)

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122629 has started for PR 28370 at commit c343056.

@holdenk
Copy link
Contributor

holdenk commented May 14, 2020

Seeing weird network issues on Jenkins.
Jenkins retest this please.

@Ngone51
Copy link
Member

Ngone51 commented May 15, 2020

retest this please

logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
blockInfoManager.lockForReading(blockId).forall { info =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using map would give us back an Option[Boolean] and we just want a boolean

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
private class BlockManagerDecommissionManager(conf: SparkConf) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a wrapped manager class? It seems overkill to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first part I'm ambivalent, but given that we also want to migrate shuffle blocks after I think having a manager is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement it step by step and could always do refactor later. Or, we should at least add a todo ticket to explain why we need this and what we plan to do next. Otherwise, I am really -1 on this kind of change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are two conversations I want to have about this with you @Ngone51 now to make sure I'm understanding what you're trying to express.

There already is a second follow up PR that extends the BlockManagerDecommissionManager already exists, so I'm not sure I agree with your reasoning. If it was only just for some possible future implementation that didn't already exist I'd be more inclined to simplify. Maybe you can take a look at https://issues.apache.org/jira/browse/SPARK-20624 , https://issues.apache.org/jira/browse/SPARK-20629 and #28331 for context.

I want to understand your -1 here because that has some pretty strong meanings in the context of a code change. A -1 is generally viewed as expressing a veto, which I don't believe you have in the project (of course I was out for a month in the hospital last year so if you do please let point me to thread). Even if you don't have a veto in the project is it your intention to say that if you did have a veto you would block this code change? A veto is generally a very strong expression, and I'm worried I'm not understanding your reasoning since this seems like a relatively minor issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I understand text-only communication can have more misunderstandings, if you want to find a time this week when we're both free to jump on a call to clarify this (and we can write back our understanding here so it's recorded for people to understand what we talked about), I'd be more than happy to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a new reviewer (e.g. me) on a big topic, it's not always possible to know every detail(even worse, when there's no design doc). So it's the author's responsibility to give more context. For example, leaving todo JIRA tickets in the code comment or reply to give more information. But without sufficient context here, I really think "this change", wrapping a manager around a thread, doesn't make sense to me.

As for "-1", it really represents my personal opinion. I should say "I don't like this change" if "-1" means a lot for the community.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a reviewer it’s expected that you would read the issue before asking for a follow up issue in a blocking manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I did. But I still don't get it and I think it' not always possible that a reviewer could know the sub-issue is mean to be a follow up for some specific codes without design document/code comments around here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you look at the parent issue you can see there is another sub issue that says migrate shuffle blocks. It’s ok to ask for a follow up even if there is one (we all miss things in reading), but attempt to vote a -1 has a higher bar than just asking for something.

}
}
blockReplicationThread.setDaemon(true)
blockReplicationThread.setName("block-replication-thread")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Runnable for the decommissioning and ThreadUtils to execute the Runnable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at our code we seem to be roughly split on Thread versus Runnable usage. I think Runnable would make more sense if we were submitting this to an execution pool, but since we have a single thread and there is no reason to scale up the number of threads I don't see the need for that change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always use ThreadUtils.newDaemonSingleThreadExecutor for the single runnable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grep -r "new Thread" ./core/src/main | wc -l returns 36
grep -r ThreadUtils.newDaemonSingleThreadExecutor ./core/src/main |wc -l returns 4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah..that's good point but just wondering how many of them are chosen after realizing ThreadUtils.newDaemonSingleThreadExecutor.

BTW, you'd better grep "new Thread(" to exclude ThreadLocal declaration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even that returns 36 in core.

&& failures < 20) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to set stop=true here?

Or you mean we need to do multiple time decommissionRddCacheBlocks? If so, why we need to do it for multiple times? There should be no rdd blocks change after decommissioning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't set stop=true here because we loop through this multiple times. It is possible that not all blocks will replicate in the first iteration and also possible that more blocks are stored while were decommissioning (e.g. any existing running tasks which have a persist).

@Ngone51
Copy link
Member

Ngone51 commented May 15, 2020

.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use spark.storage.maxReplicationFailures directly. Less configurations contribute to better UX.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure that's a great idea. Looking at maxReplicationFailures the default is set to one, which certainly makes sense in the situation where we don't expect the host to be exiting. But this situation is different, we know the current block is going to disappear soon so it makes sense to more aggressively try and copy the block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for your explanation.

@SparkQA
Copy link

SparkQA commented May 15, 2020

Test build #122677 has finished for PR 28370 at commit c343056.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented May 15, 2020

The hive test failure is probably a flaky test.
Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 15, 2020

Test build #122687: Deflake Build #122677 has started for PR 28370 at commit c343056.

@SparkQA
Copy link

SparkQA commented May 15, 2020

Test build #122687: Deflake Build #122677 has finished for PR 28370 at commit c343056.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines +339 to +341
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
Copy link
Member

@Ngone51 Ngone51 May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there's no need to do replication if remainingLocations is equal to currentBlockLocations after filtering the blockManagerId, which means we've already successfully done decommission for the block in previous decommission round.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we currently remove the block on successful decommissioning. But it's possible it somehow is sufficiently replicated we don't need to do anything so I've added this to https://issues.apache.org/jira/browse/SPARK-31555 for tracking.

&& !Thread.interrupted()
&& failures < 20) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add attempt number to the log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
logDebug("Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
logInfo("Attempt to replicate all cached blocks done")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempt number?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d say fine to do in a follow up but if we want to add the attempt number here go for it (but I won’t hold off on merging for that).

case NonFatal(e) =>
failures += 1
logError("Error occurred while trying to replicate cached RDD blocks" +
s" for block manager decommissioning (failure count: $failures)", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempt number?

var failures = 0
while (blockManagerDecommissioning
&& !stopped
&& !Thread.interrupted()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?

  1. In stop(), stopped is set before interrupt().

  2. if the thread is doing decommission or sleeping when stop() is called, we'll capture InterruptedException later and set stopped to true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the interrupt exception is caught inside of the block transfer

Copy link
Member

@Ngone51 Ngone51 May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the interrupt exception is caught inside of the block transfer

If there's an InterruptedException captured by block transfer, then Thread.interrupted() inside blockReplicationThread would return false. And what do you expect for this case?

If you want the decommission thread stop, then, Thread.interrupted() won't work;

Or if you want the decommission thread to keep working, then, Thread.interrupted() is useless because the status has already been cleared(unless block transfer set it to interrupted again).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an interrupt exception is caught the thread would still be marked as interrupted

Copy link
Member

@Ngone51 Ngone51 May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe in normal cases like wait, sleep, the status will be cleared according to JDK doc:

* @throws  InterruptedException
*          if any thread has interrupted the current thread. The
*          <i>interrupted status</i> of the current thread is
*          cleared when this exception is thrown.

And even if the status is not cleared in other cases, the following Thread.sleep(sleepInterval) will throw InterruptedException firstly and set stopped to true and Thread.interrupted() still does not take effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that block of text indicates that the interrupted status is cleared when the exception is thrown. Which means it would be possible for the thread to be in an interrupted status without having the exception thrown. Worst case scenario: this check is not needed but also causes no significant harm

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments to explain why we need "+1"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that we’re decommissioning here makes this self evident

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method itself does not declare that it's used for decommissioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable then to add a comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Create input RDD with 10 partitions
val input = sc.parallelize(1 to 10, 10)
val accum = sc.longAccumulator("mapperRunAccumulator")
// Do a count to wait for the executors to be registered.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could TestUtils.waitUntilExecutorsUp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, I’d consider no harm either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rdd.count does not guarantee all the executors would be registered. The job could also succeed using one executor in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s ok for this test. But no harm in changing to the utility function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk
Copy link
Contributor

holdenk commented May 18, 2020

LGTM. Thanks for working on this @prakharjain09 I know how frustrating debugging test-only issues that only show up in Jenkins can be.

Thanks for taking the time to review @Ngone51, I've added your relevant suggestions for improvement to the follow on improvements JIRA https://issues.apache.org/jira/browse/SPARK-31555

@asfgit asfgit closed this in c560428 May 18, 2020
@holdenk
Copy link
Contributor

holdenk commented May 18, 2020

Merged to master (target 3.1). Let me know if you're interested in doing any of the follow-ups @Ngone51 / @prakharjain09 otherwise I'll get that started after the shuffle block stuff :)

@Ngone51
Copy link
Member

Ngone51 commented May 19, 2020

It seems there's another guy who wants to take the JIRA. If there's no following updates from the guy before next week, I will take over then.

@holdenk
Copy link
Contributor

holdenk commented May 19, 2020

Sounds good, happy to help coordinate with any reviews needed. Would like us to be able to start using this in 3.1 :)

@agrawaldevesh
Copy link
Contributor

agrawaldevesh commented Jun 11, 2020

Hi @holdenk and @prakharjain09

I am trying to follow along the great improvements you have been making to the Spark's decommissioning story.

I had two basic questions please:

otherwise I'll get that started after the shuffle block stuff :)

Would you say that we would be done with Parts 2 and 3, as mentioned in the design doc above after the PR #28708 is merged in ?

Thanks for improving this key aspect of Spark in cloud environments !

*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
* @param maxReplicationFailures number of replication failures to tolerate before
* giving up.
* @return whether block was successfully replicated or not
*/
def replicateBlock(
Copy link
Contributor

@agrawaldevesh agrawaldevesh Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakharjain09 / @holdenk, Thank you for this improvement. I had a question please:

(I am still new to this code paths and I am not totally sure of what I am talking about. So if there is something I am missing please help me fill the gaps :-). )

I notice that replicateBlock is already called during the executor removal codepath. ie, org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager does the replication to other peers if spark.storage.replication.proactive=true. This seemed to have been implemented in SPARK-15355. And org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager is triggered when the executor is "eventually" lost.

I understand it is a bit late to do the replication when the executor is indeed lost: Since decommissioning as implemented in #26440 does not really trigger eager executor loss. We instead merely stop scheduling on the decom'd executor and let it be shot down out of band. Which means that the replication triggered in SPARK-15355 would be too late.

I like the approach taken in this PR to eagerly tell the executor (block-manager) to start replication when the decom is first initiated, to give it more time to be useful. But I wonder if you implemented this somewhat differently by leveraging the existing eager replication loop ?.

Thanks !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the existing block replication is for the case where blocks we stored on two machines and due to executor loss are now down to one machine so they are replicated. It's not useless but it doesn't solve the same core problem.

holdenk pushed a commit to holdenk/spark that referenced this pull request Jun 25, 2020
… an executor is decommissioned

After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

NO

Added UTs.

Closes apache#28370 from prakharjain09/SPARK-20732-rddcache-1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
… an executor is decommissioned

After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

NO

Added UTs.

Closes apache#28370 from prakharjain09/SPARK-20732-rddcache-1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants