Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap #1541

Closed
wants to merge 1 commit into from
Closed

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Jul 23, 2014

MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon.

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1541. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17011/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1541:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17011/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1541. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17012/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1541:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17012/consoleFull

@mridulm
Copy link
Contributor

mridulm commented Jul 23, 2014

Instead of a ConcurrentHashMap, we should actually move it to a disk backed Map - the cleanup of this datastructure is painful - which it can become extremely large; particularly for iterative algo's.
Fortunately, most cases, we just need the last few entries - and so LRU scheme by most disk backed map's work beautifully.

We have been using mapdb for this in MapOutputTrackerWorker - and it has worked beautifully.
@rxin might be particularly interested since he is looking into reduce memory footprint of spark
CC @mateiz - this is what I had mentioned about earlier.

@zsxwing
Copy link
Member Author

zsxwing commented Jul 23, 2014

Instead of a ConcurrentHashMap, we should actually move it to a disk backed Map

Agree. Is there a PR ready? I think this is a critical bug and hope it can be fixed soon.

@kayousterhout
Copy link
Contributor

When is this accessed concurrently? I looked quickly and can only find updates from the (single-threaded) DAGScheduler event loop. Is the issue that it can be read/written concurrently?

@zsxwing
Copy link
Member Author

zsxwing commented Jul 25, 2014

When is this accessed concurrently?

For example, HashShuffleReader.read -> object BlockStoreShuffleFetcher.fetch -> MapOutputTracker.getServerStatuses. Different HashShuffleReader instances can be used in different Tasks. All TaskRunners share the same env of Executor. Therefore, all Tasks uses the same MapOutputTracker instance in the SparkEnv.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 1, 2014

ping @JoshRosen, could you help take a look at this one?

@JoshRosen
Copy link
Contributor

Thanks for the reminder.

@kayousterhout I looked over @zsxwing's example and I agree that there's a thread-safety issue here. We can definitely have multiple concurrent block fetches that could race when accessing mapStatuses.

There's a lot of other state in MapOutputTracker that's guarded with synchronized, which implies that this instances of MapOutputTracker will be accessed from multiple threads. In fact, there's even a statuses.synchronized at the end of getServerStatuses that's guarding a MapOutputTracker.convertMapStatuses call, but for some reason the other branch of the if guards it using fetchedStatuses.synchronized (which doesn't even make sense, since fetchedStatuses is a local variable defined inside of getServerStatuses).

Since the synchronization logic here seems kind of messy / confusing and mapStatuses is only accessed from MapOutputTracker, maybe it would be better to just add proper synchronization around reads/writes to mapStatuses rather than converting it to a ConcurrentHashMap.

@JoshRosen
Copy link
Contributor

Actually, it looks like the fetchedStatuses vs statuses synchronization is correct, since it's guarding against modification to that statuses array while reading it in convertMapStatuses. This needs a closer look, but I'm not sure whether we need this synchronization, since the output status for a particular map task should be immutable once written.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 2, 2014

the output status for a particular map task should be immutable once written.

But mapStatuses is mutable. Therefore, some thread is putting an item to mapStatuses and the space of map is not enough to store this new item, map will expand its space, rehash the items and update its internal state (https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/mutable/HashTable.scala#L249). If another thread is calling get at the same time, it may get a wrong value, or crash the thread.

@JoshRosen
Copy link
Contributor

I agree that mapStatuses itself is mutable. I was just observing that the values stored in mapStatuses (the Array[MapStatus]es) aren't modified after they're stored in the HashMap.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 2, 2014

You remind me that even if Array[MapStatus] won't be modified, according to java memory model, fetchedStatuses (https://github.com/zsxwing/spark/blob/SPARK-2634/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L168) and statuses (https://github.com/zsxwing/spark/blob/SPARK-2634/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L186) may be inconsistent without proper protection.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 2, 2014

ConcurrentHashMap should fix all these issues I found.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 18, 2014

@JoshRosen do you think it's OK?

@JoshRosen
Copy link
Contributor

This seems fine to me, actually, so I'm going to re-run the tests then merge it. We might want to do some other cleanup in this file, but that can wait for a separate PR.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have started for PR 1541 at commit d450053.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 1541 at commit d450053.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 86bce76 Sep 26, 2014
@zsxwing zsxwing deleted the SPARK-2634 branch September 26, 2014 01:29
@zsxwing
Copy link
Member Author

zsxwing commented Sep 26, 2014

Thank you @JoshRosen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants