Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make getProcessingDistributionsForWorkId threadsafe #29979

Merged
merged 2 commits into from
Jan 29, 2024

Conversation

clmccart
Copy link
Contributor

@clmccart clmccart commented Jan 10, 2024

make getProcessingDistributionsForWorkId threadsafe to avoid race conditions with remove tracker

the race condition happens in getProcessingDistributionsForWorkId because we check for the existence of the work id in the activeTrackersByWorkId map before retrieving the value from the map. if another thread removes the tracker from the map in-between us checking for its existence and actually grabbing the object, we will get a null tracker that then throws a NullPointerException when we try to access data on it.

Was able to repro this with a chaos test locally and confirmed that synchronizing the getProcessingDistributionsForWorkId method solves the issue. Decided to not commit the chaos test since it's a bit messy and deviates from test patterns in this repo.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@clmccart
Copy link
Contributor Author

#29987

@Abacn
Copy link
Contributor

Abacn commented Jan 12, 2024

Thanks for explanation. For future reference, would you mind add a brief comment in code to indicate the need of synchronized?

@@ -117,7 +117,8 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadataForWorkId(String
return Optional.ofNullable(null);
}

public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) {
public synchronized Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(
String workId) {
if (!activeTrackersByWorkId.containsKey(workId)) {
Copy link
Contributor

@Abacn Abacn Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!activeTrackersByWorkId.containsKey(workId)) {
// synchronized to avoid race condition of accessing activeTrackersByWorkId
if (!activeTrackersByWorkId.containsKey(workId)) {

@@ -117,7 +117,8 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadataForWorkId(String
return Optional.ofNullable(null);
}

public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) {
public synchronized Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since removeTracker isn't synchronized it still seems like the race is possible.

An alternative since the activeTrackersByWorkId is a concurrent map would be to just call get() first and replace the containsKey check with comparing if null is returned.

similarly the completedProcessingMetrics.containsKey check could be replaced by using getOrDefault

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i like that better. done!

@Abacn
Copy link
Contributor

Abacn commented Jan 24, 2024

CC: @clmccart @scwhittle this issue is marked as release blocker. Any update?

@clmccart clmccart force-pushed the dofn-patch branch 2 times, most recently from 3be409d to a7e5cb0 Compare January 29, 2024 18:31
@clmccart clmccart changed the title make getProcessingDistributionsForWorkId synchronized make getProcessingDistributionsForWorkId threadsafe Jan 29, 2024
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lostluck lostluck merged commit b72eacf into apache:master Jan 29, 2024
17 checks passed
lostluck added a commit that referenced this pull request Jan 30, 2024
* make getProcessingDistributionsForWorkId synchronized to avoid race conditions with remove tracker

* correctly access the concurrent hashmap to avoid race conditions

---------

Co-authored-by: Claire McCarthy <clairemccarthy@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants