Skip to content

Commit

Permalink
[core] Erases PopWorkerRequest on registration timeout. (#48858)
Browse files Browse the repository at this point in the history
This fixes a bug from #47694. When a PopWorkerRequest is time out, we
should erase it from bookkeeping and invoke the callback with a
`WorkerPendingRegistration` status. However only the latter is done and
we forgot to erase, causing segfaults on a second call.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
  • Loading branch information
rynewang authored Nov 22, 2024
1 parent 8d35885 commit be0bbd0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,8 @@ void WorkerPool::MonitorPopWorkerRequestForRegistration(
auto &requests = state.pending_registration_requests;
auto it = std::find(requests.begin(), requests.end(), pop_worker_request);
if (it != requests.end()) {
// Fail the task...
// Pop and fail the task...
requests.erase(it);
PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration;
PopWorkerCallbackAsync(pop_worker_request->callback, nullptr, status);
}
Expand Down
13 changes: 13 additions & 0 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,19 @@ TEST_F(WorkerPoolDriverRegisteredTest, PopWorkerStatus) {
worker_pool_->ClearProcesses();
}

TEST_F(WorkerPoolDriverRegisteredTest, WorkerPendingRegistrationErasesRequest) {
std::shared_ptr<WorkerInterface> popped_worker;
PopWorkerStatus status;
auto task_spec = ExampleTaskSpec();
// Create a task without push worker. It should time out (WorkerPendingRegistration).
popped_worker = worker_pool_->PopWorkerSync(task_spec, false, &status);
ASSERT_EQ(popped_worker, nullptr);
ASSERT_EQ(status, PopWorkerStatus::WorkerPendingRegistration);
// The request should be erased.
ASSERT_EQ(worker_pool_->NumPendingRegistrationRequests(), 0);
worker_pool_->ClearProcesses();
}

TEST_F(WorkerPoolDriverRegisteredTest, TestIOWorkerFailureAndSpawn) {
std::unordered_set<std::shared_ptr<WorkerInterface>> spill_worker_set;
auto spill_worker_callback =
Expand Down

0 comments on commit be0bbd0

Please sign in to comment.