Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11063][Streaming]Change preferredLocations of Receiver's RDD to hosts rather than hostports #9075

Closed
wants to merge 3 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Oct 12, 2015

The format of RDD's preferredLocations must be hostname but the format of Streaming Receiver's scheduling executors is hostport. So it doesn't work.

This PR converts schedulerExecutors to hosts before creating Receiver's RDD.

@SparkQA
Copy link

SparkQA commented Oct 12, 2015

Test build #43564 has finished for PR 9075 at commit 4706ec0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

The fix looks good to me.

@tdas
Copy link
Contributor

tdas commented Oct 16, 2015

What is the implication of this bug in case for the scheduling? We are trying to evenly distribute receivers to executors. But if we set the preferredLocations to only the granularity of hosts, then the following could happen. Two executors can be in the same host (through YARN). We set the preferred locations for two receivers for that same host even though we want them to be in two different executors. It may so happen that both executors get scheduled at the same executor. Isnt it?

@zsxwing
Copy link
Member Author

zsxwing commented Oct 16, 2015

What is the implication of this bug in case for the scheduling? We are trying to evenly distribute receivers to executors. But if we set the preferredLocations to only the granularity of hosts, then the following could happen. Two executors can be in the same host (through YARN). We set the preferred locations for two receivers for that same host even though we want them to be in two different executors. It may so happen that both executors get scheduled at the same executor. Isnt it?

Right. However, preferredLocations only supports hosts. So at least for branch 1.5, this is the best we can do. We can enhance preferredLocations to support host and port in master branch.

@jerryshao
Copy link
Contributor

From my understanding, Spark's scheduler does not support scheduling with port (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala)

@tdas
Copy link
Contributor

tdas commented Oct 16, 2015

Yeah, we have to fix it for branch 1.5 with a small fix like this.
Is it possible to test this with unit tests? Could you check if there are unit tests of the DAGScheduler / TaskScheduler that we can use to unit test so that something like this does not happen ever again.

@jerryshao
Copy link
Contributor

I think we could add another TaskLocation, like ExecutorTaskLocation and pass the preferredLocations with special tag prefix like HDFS in memory tag, to let TaskSetManager know we want to schedule the task with process local.

@@ -388,7 +388,7 @@ private[spark] class TaskSetManager(
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
return Some((index, TaskLocality.NO_PREF, false))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat could you take a look at this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do tomorrow morning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a significant bug in the core. If so probably go into a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how is this related to the main bug we are considering in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related. I will move it in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm....it's a typo but would not take any effect

see discussions here: #3816 (only look at the comments after Dec 30, 2014 is enough)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't notice this PR. Thanks.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 16, 2015

Added a unit test and fixed a minor issue in TaskSetManager.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 16, 2015

Tested this patch in a 5 nodes cluster. Each node has an executor, the executor core is 1 and the receiver number is 5.

Before this patch, there were several Receiver restarting logs. After applying this patch, all restarting logs disappeared.

@SparkQA
Copy link

SparkQA commented Oct 16, 2015

Test build #43840 has finished for PR 9075 at commit 2d7c030.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2015

Test build #43877 has finished for PR 9075 at commit 49b7792.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
// If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will it be if it is not set correctly? That is, without the fix above, what would locality be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's TaskLocality.ANY without this patch.

@tdas
Copy link
Contributor

tdas commented Oct 17, 2015

LGTM. Just a clarifying question.

@tdas
Copy link
Contributor

tdas commented Oct 19, 2015

Merging this to master and branch 1.5

@asfgit asfgit closed this in 6758213 Oct 19, 2015
asfgit pushed a commit that referenced this pull request Oct 19, 2015
… to hosts rather than hostports

The format of RDD's preferredLocations must be hostname but the format of Streaming Receiver's scheduling executors is hostport. So it doesn't work.

This PR converts `schedulerExecutors` to `hosts` before creating Receiver's RDD.

Author: zsxwing <zsxwing@gmail.com>

Closes #9075 from zsxwing/SPARK-11063.

(cherry picked from commit 6758213)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@zsxwing zsxwing deleted the SPARK-11063 branch October 20, 2015 00:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants