Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][adag] Support asyncio.gather on multiple CompiledDAGFutures (#47684) #47860

Merged

Conversation

jeffreyjeffreywang
Copy link
Contributor

@jeffreyjeffreywang jeffreyjeffreywang commented Sep 29, 2024

Why are these changes needed?

Currently, awaiting individual CompiledDAGFuture works but performing asyncio.gather on a list of CompiledDAGFuture won't because there is a race condition between CompiledDAGFutures that originate from the same execution in a aDAG. One solution is to relax the assertion in _cache_execution_results, allowing multiple CompiledDAGFuture originating from the same execution to attempt to cache the result for that execution. The other solution is to protect the critical section with asyncio.Lock. The first solution could result in awaiting on the same future multiple times, but since this overhead is lower than that of the second solution (i.e. locks introduce more overhead), we go for the first solution.

Related issue number

Resolves #47684.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

…multiple CompiledDAGFutures

Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
@jeffreyjeffreywang
Copy link
Contributor Author

Hey @rkooo567 @stephanie-wang, could you please take a look once you have time? This addresses the issue of not able to invoke asyncio.gather on a list of CompiledDAGFuture. Thank you!

@stephanie-wang stephanie-wang self-assigned this Oct 9, 2024
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @jeffreyjeffreywang!

Can you measure throughput before/after? A little worried about the lock overhead and dynamic lock allocation.

Also, can you explain what data/code the lock protects? Is there a lock-free design that can work?

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 10, 2024
@jeffreyjeffreywang
Copy link
Contributor Author

Thanks for the review, @stephanie-wang! The locks protects _result_buffer from being accessed and modified by multiple CompiledDAGFutures originating from the same execution index and avoids the same underlying asyncio.Future from being awaited multiple times. This will only be a problem when we have multiple output channels for the DAG.

Imagine that there are two output channels, and execute_async returns fut1 and fut2. While awaiting on fut1 and fut2, there is a race condition when they both attempt to store execution result in _result_buffer. Since the DAG has not stored the execution result, both will await on the same underlying future and write to _result_buffer. After either fut1 or fut2 writes to _result_buffer, the latter write triggers an assertion error. We could remove the assertion, but this will not solve the issue of awaiting on the same underlying future more than once.

I can add some logic to avoid using lock when there is a single output channel so that we don't influence the performance when MultiOutputNode isn't used. Besides, is there any tool I can use to measure the throughput?

@stephanie-wang
Copy link
Contributor

Thanks for the review, @stephanie-wang! The locks protects _result_buffer from being accessed and modified by multiple CompiledDAGFutures originating from the same execution index and avoids the same underlying asyncio.Future from being awaited multiple times. This will only be a problem when we have multiple output channels for the DAG.

Imagine that there are two output channels, and execute_async returns fut1 and fut2. While awaiting on fut1 and fut2, there is a race condition when they both attempt to store execution result in _result_buffer. Since the DAG has not stored the execution result, both will await on the same underlying future and write to _result_buffer. After either fut1 or fut2 writes to _result_buffer, the latter write triggers an assertion error. We could remove the assertion, but this will not solve the issue of awaiting on the same underlying future more than once.

Ah I see, thanks for the explanation! Seems like it could also be fixed by checking again if the result is still not cached after await fut?

I can add some logic to avoid using lock when there is a single output channel so that we don't influence the performance when MultiOutputNode isn't used. Besides, is there any tool I can use to measure the throughput?

Hmm unfortunately I don't think we have an asyncio microbenchmark. But you can take a look at the tests that are run on nightly for compiled graph. Actually adding an asyncio version of this microbenchmark would be useful (I'll file an issue for this if you're interested).

@jeffreyjeffreywang
Copy link
Contributor Author

Ah I see, thanks for the explanation! Seems like it could also be fixed by checking again if the result is still not cached after await fut?

Yup, we could simply check whether the result has been cached in _cache_execution_results and don't enforce this assertion anymore. This, however, can lead to awaiting the same future multiple times, but since creating and acquiring locks is more expensive, I think we can afford this overhead.

jeffreyjeffreywang added 4 commits October 14, 2024 04:27
Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
@jeffreyjeffreywang
Copy link
Contributor Author

Hey @stephanie-wang, this is now ready for another review.

@anyscalesam anyscalesam added core Issues that should be addressed in Ray Core compiled-graph triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 16, 2024
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops sorry for the delay in review. This looks great, thanks!

@stephanie-wang stephanie-wang added go add ONLY when ready to merge, run all tests and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Oct 23, 2024
@stephanie-wang stephanie-wang enabled auto-merge (squash) October 23, 2024 00:42
@rkooo567
Copy link
Contributor

there are test failures!

@rkooo567
Copy link
Contributor

btw is the description still relevant? I am not seeing the lock from code

@rkooo567 rkooo567 self-assigned this Oct 23, 2024
@jeffreyjeffreywang
Copy link
Contributor Author

@stephanie-wang no worries, I was busy dealing with work as well. Thanks for the review!
@rkooo567 Looks like the test failures are not related to my changes. Will rebase my branch to the latest master. I updated the description as well.

…-ref

Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
@stephanie-wang stephanie-wang enabled auto-merge (squash) October 24, 2024 22:00
@stephanie-wang
Copy link
Contributor

Hey @jeffreyjeffreywang FYI I re-merged master, not sure why there was a core test failure in your earlier merge.

@stephanie-wang stephanie-wang enabled auto-merge (squash) October 24, 2024 22:16
@stephanie-wang stephanie-wang merged commit 2bc594a into ray-project:master Oct 24, 2024
6 checks passed
Jay-ju pushed a commit to Jay-ju/ray that referenced this pull request Nov 5, 2024
…ay-project#47684) (ray-project#47860)

Currently, awaiting individual `CompiledDAGFuture` works but performing
`asyncio.gather` on a list of `CompiledDAGFuture` won't because there is
a race condition between `CompiledDAGFuture`s that originate from the
same execution in a aDAG. One solution is to relax the assertion in
`_cache_execution_results`, allowing multiple `CompiledDAGFuture`
originating from the same execution to attempt to cache the result for
that execution. The other solution is to protect the critical section
with `asyncio.Lock`. The first solution could result in awaiting on the
same future multiple times, but since this overhead is lower than that
of the second solution (i.e. locks introduce more overhead), we go for
the first solution.

Resolves ray-project#47684. 

---------

Signed-off-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
Co-authored-by: jeffreyjeffreywang <jeffjeffreywang@gmail.com>
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graph core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][aDAG] Multi ref output doesn't work with asyncio.gather
4 participants