-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Posts CoreWorkerMemoryStore callbacks onto io_context. #47833
Conversation
CoreWorkerMemoryStore has GetAsync method which calls callbacks when the object is ready, immediately or on a Put() call. However despite the name, the call is not `Async`: it just calls on the GetAsync() or Put() call stack. If a mutex is held by any call frame in the callstack, it remains held in the callback, ripe for deadlocks. Adds another ctor argument io_context and posts any callbacks to the context to avoid those deadlocks. Note many cpp tests creates CoreWorkerMemoryStore without a io_context so I kept the option of creating an owned io_context with thread. Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
…e task id Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@MengjinYan could you take a first pass. |
src/ray/core_worker/store_provider/memory_store/memory_store.cc
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted with @rynewang offline. Here the PR changes the task id to request id because the corresponding tests (in dependency_resolver_test.cc) fail after the async change in memory store.
From the current understanding, the tests happen to use Tasks with the same TaskID to submit dependency resolve requests. With the async replication, the task state will be overridden and the tests will fail. At the same time, in reality, we are thinking although task execution can have multiple attempts, because the task object will be the same, so it might work with the original implementation.
Therefore, @jjyao we would like your opinion here to see whether the change here makes sense or we should just update the tests to make it more like real scenarios.
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@jjyao pls take a look since Mengjin already reviewed |
@@ -59,11 +59,15 @@ void InlineDependencies( | |||
|
|||
void LocalDependencyResolver::CancelDependencyResolution(const TaskID &task_id) { | |||
absl::MutexLock lock(&mu_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the change Mengjin mentioned. Problem is we used TaskID to track dependency resolution requests. But at least in cpp test, a same taskID may call ResolveDependencies
multiple times, a new request overrides and deletes an old dependency resolution request. But the test still assumes the old request finishes with callback called.
This PR fixes it by tracing each request not only by TaskID but also by an auto increment id.
I am not sure if the current behavior (forgetting old reqs) is expected, so I want some input from you.
// ID of the dependency resolution request. Used to index pending_tasks_ because a | ||
// same TaskID may be requested multiple times and we treat each request independently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a separate issue? Seems the PR description doesn't cover this change.
todo: core_worker.cc GetAsync post removal |
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Friendly ping @jjyao |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG
src/ray/core_worker/store_provider/memory_store/memory_store.cc
Outdated
Show resolved
Hide resolved
src/ray/core_worker/store_provider/memory_store/memory_store.cc
Outdated
Show resolved
Hide resolved
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com> Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com> Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@jjyao ready to merge |
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@jjyao ready to review |
@@ -26,8 +26,9 @@ | |||
namespace ray { | |||
namespace internal { | |||
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime) | |||
: local_mode_ray_tuntime_(local_mode_ray_tuntime) { | |||
memory_store_ = std::make_unique<CoreWorkerMemoryStore>(); | |||
: io_context_("LocalModeObjectStore"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually where this cause thread issue for cpp since now the callback is executed in a different thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have an answer since I don't know ray-cpp that much
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
std::unique_ptr<InstrumentedIOContextWithThread> io_context = | ||
std::make_unique<InstrumentedIOContextWithThread>( | ||
"DefaultCoreWorkerMemoryStoreWithThread"); | ||
return std::shared_ptr<DefaultCoreWorkerMemoryStoreWithThread>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not std::make_shared
? Your implementation leads to two memory allocations instead of one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the ctor is private and thus not visible to make_shared. you can make a private tag type trick but for sake of simplicity I did not do that.
…oject#47833) CoreWorkerMemoryStore has GetAsync method which calls callbacks when the object is ready, immediately or on a Put() call. However despite the name, the call is not `Async`: it just calls on the GetAsync() or Put() call stack. If a mutex is held by any call frame in the callstack, it remains held in the callback, ripe for deadlocks. Adds another ctor argument io_context and posts any callbacks to the context to avoid those deadlocks. Note many cpp tests creates CoreWorkerMemoryStore without a io_context so I kept the option of creating an owned io_context with thread. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
CoreWorkerMemoryStore has GetAsync method which calls callbacks when the object is ready, immediately or on a Put() call. However despite the name, the call is not
Async
: it just calls on the GetAsync() or Put() call stack. If a mutex is held by any call frame in the callstack, it remains held in the callback, ripe for deadlocks.Adds another ctor argument io_context and posts any callbacks to the context to avoid those deadlocks. Note many cpp tests creates CoreWorkerMemoryStore without a io_context so I kept the option of creating an owned io_context with thread.
Closes #47649