From 077376021e939b36bdc49fcc19b486928f001411 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 9 Sep 2024 17:39:47 -0700 Subject: [PATCH] [Core] Reconstruct actor to run lineage reconstruction triggered actor task (#47396) Currently if we need to rerun an actor task to recover a lost object but the actor is dead, the actor task will fail immediately. This PR allows the actor to be restarted (if it doesn't violate max_restarts) so that the actor task can run to recover lost objects. In terms of the state machine, we add a state transition from DEAD to RESTARTING. Signed-off-by: Jiajun Yao --- BUILD.bazel | 1 + python/ray/tests/BUILD | 1 + .../test_actor_lineage_reconstruction.py | 109 +++++++ python/ray/tests/test_gcs_fault_tolerance.py | 2 +- python/ray/util/state/common.py | 6 + src/mock/ray/core_worker/actor_creator.h | 12 + src/mock/ray/core_worker/core_worker.h | 6 +- src/mock/ray/core_worker/reference_count.h | 56 ++++ src/mock/ray/rpc/worker/core_worker_client.h | 6 +- src/ray/core_worker/actor_creator.h | 22 ++ src/ray/core_worker/actor_manager.cc | 68 ++-- src/ray/core_worker/actor_manager.h | 17 +- src/ray/core_worker/core_worker.cc | 24 +- src/ray/core_worker/core_worker.h | 4 +- src/ray/core_worker/reference_count.cc | 34 +- src/ray/core_worker/reference_count.h | 26 +- .../core_worker/test/actor_manager_test.cc | 65 +--- .../test/dependency_resolver_test.cc | 12 + .../test/direct_actor_transport_mock_test.cc | 24 +- .../test/direct_actor_transport_test.cc | 114 +++++-- .../test/normal_task_submitter_test.cc | 12 + .../core_worker/test/reference_count_test.cc | 26 +- .../transport/actor_submit_queue.h | 1 + .../transport/actor_task_submitter.cc | 145 +++++++-- .../transport/actor_task_submitter.h | 54 +++- .../out_of_order_actor_submit_queue.cc | 4 + .../out_of_order_actor_submit_queue.h | 1 + .../sequential_actor_submit_queue.cc | 2 + .../transport/sequential_actor_submit_queue.h | 1 + src/ray/gcs/gcs_client/accessor.cc | 36 +++ src/ray/gcs/gcs_client/accessor.h | 11 + .../gcs/gcs_client/test/gcs_client_test.cc | 4 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 306 +++++++++++++----- src/ray/gcs/gcs_server/gcs_actor_manager.h | 37 ++- .../gcs_actor_manager_export_event_test.cc | 15 +- .../gcs_server/test/gcs_actor_manager_test.cc | 14 +- src/ray/gcs/pb_util.h | 9 + src/ray/protobuf/common.proto | 11 + src/ray/protobuf/core_worker.proto | 14 +- src/ray/protobuf/gcs.proto | 2 + src/ray/protobuf/gcs_service.proto | 25 ++ src/ray/rpc/gcs_server/gcs_rpc_client.h | 10 + src/ray/rpc/gcs_server/gcs_rpc_server.h | 11 + src/ray/rpc/worker/core_worker_client.h | 10 +- src/ray/rpc/worker/core_worker_server.h | 4 +- 45 files changed, 1056 insertions(+), 318 deletions(-) create mode 100644 python/ray/tests/test_actor_lineage_reconstruction.py create mode 100644 src/mock/ray/core_worker/reference_count.h diff --git a/BUILD.bazel b/BUILD.bazel index 8f348be2c0ab0..49c2a47da6958 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -991,6 +991,7 @@ ray_cc_test( tags = ["team:core"], deps = [ ":core_worker_lib", + ":ray_mock", "@com_google_googletest//:gtest_main", ], ) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index eac0dbeba0b52..0f3df97031336 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -447,6 +447,7 @@ py_test_module_list( py_test_module_list( files = [ "test_actor.py", + "test_actor_lineage_reconstruction.py", "test_actor_retry1.py", "test_actor_retry2.py", "test_actor_failures.py", diff --git a/python/ray/tests/test_actor_lineage_reconstruction.py b/python/ray/tests/test_actor_lineage_reconstruction.py new file mode 100644 index 0000000000000..eb487471ba33d --- /dev/null +++ b/python/ray/tests/test_actor_lineage_reconstruction.py @@ -0,0 +1,109 @@ +import os +import gc +import sys + +import pytest + +import ray +from ray._private.test_utils import wait_for_condition +from ray.core.generated import gcs_pb2 +from ray.core.generated import common_pb2 + + +def test_actor_reconstruction_triggered_by_lineage_reconstruction(ray_start_cluster): + # Test the sequence of events: + # actor goes out of scope and killed + # -> lineage reconstruction triggered by object lost + # -> actor is restarted + # -> actor goes out of scope again after lineage reconstruction is done + # -> actor is permanently dead when there is no reference. + cluster = ray_start_cluster + cluster.add_node(resources={"head": 1}) + ray.init(address=cluster.address) + worker1 = cluster.add_node(resources={"worker": 1}) + + @ray.remote( + num_cpus=1, resources={"worker": 1}, max_restarts=-1, max_task_retries=-1 + ) + class Actor: + def ping(self): + return [1] * 1024 * 1024 + + actor = Actor.remote() + actor_id = actor._actor_id + + obj1 = actor.ping.remote() + obj2 = actor.ping.remote() + + # Make the actor out of scope + actor = None + + def verify1(): + gc.collect() + actor_info = ray._private.state.state.global_state_accessor.get_actor_info( + actor_id + ) + assert actor_info is not None + actor_info = gcs_pb2.ActorTableData.FromString(actor_info) + assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD + assert ( + actor_info.death_cause.actor_died_error_context.reason + == common_pb2.ActorDiedErrorContext.Reason.OUT_OF_SCOPE + ) + assert actor_info.num_restarts_due_to_lineage_reconstruction == 0 + return True + + wait_for_condition(lambda: verify1()) + + # objs will be lost and recovered + # during the process, actor will be reconstructured + # and dead again after lineage reconstruction finishes + cluster.remove_node(worker1) + cluster.add_node(resources={"worker": 1}) + + assert ray.get(obj1) == [1] * 1024 * 1024 + assert ray.get(obj2) == [1] * 1024 * 1024 + + def verify2(): + actor_info = ray._private.state.state.global_state_accessor.get_actor_info( + actor_id + ) + assert actor_info is not None + actor_info = gcs_pb2.ActorTableData.FromString(actor_info) + assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD + assert ( + actor_info.death_cause.actor_died_error_context.reason + == common_pb2.ActorDiedErrorContext.Reason.OUT_OF_SCOPE + ) + # 1 restart recovers two objects + assert actor_info.num_restarts_due_to_lineage_reconstruction == 1 + return True + + wait_for_condition(lambda: verify2()) + + # actor can be permanently dead since no lineage reconstruction will happen + del obj1 + del obj2 + + def verify3(): + actor_info = ray._private.state.state.global_state_accessor.get_actor_info( + actor_id + ) + assert actor_info is not None + actor_info = gcs_pb2.ActorTableData.FromString(actor_info) + assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD + assert ( + actor_info.death_cause.actor_died_error_context.reason + == common_pb2.ActorDiedErrorContext.Reason.REF_DELETED + ) + assert actor_info.num_restarts_due_to_lineage_reconstruction == 1 + return True + + wait_for_condition(lambda: verify3()) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 44f35afc036db..e79d959b3573a 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -288,7 +288,7 @@ def condition(): # Wait for the actor dead. wait_for_condition(condition, timeout=10) - # If `PollOwnerForActorOutOfScope` was successfully called, + # If `ReportActorOutOfScope` was successfully called, # name should be properly deleted. with pytest.raises(ValueError): ray.get_actor("abc") diff --git a/python/ray/util/state/common.py b/python/ray/util/state/common.py index effb28638de69..365a49e1625fe 100644 --- a/python/ray/util/state/common.py +++ b/python/ray/util/state/common.py @@ -461,6 +461,12 @@ class ActorState(StateSchema): placement_group_id: Optional[str] = state_column(detail=True, filterable=True) #: Actor's repr name if a customized __repr__ method exists, else empty string. repr_name: Optional[str] = state_column(detail=True, filterable=True) + #: Number of restarts that has been tried on this actor. + num_restarts: int = state_column(filterable=False, detail=True) + #: Number of times this actor is restarted due to lineage reconstructions. + num_restarts_due_to_lineage_reconstruction: int = state_column( + filterable=False, detail=True + ) @dataclass(init=not IS_PYDANTIC_2) diff --git a/src/mock/ray/core_worker/actor_creator.h b/src/mock/ray/core_worker/actor_creator.h index e4f62014315ec..afb31406f5b5f 100644 --- a/src/mock/ray/core_worker/actor_creator.h +++ b/src/mock/ray/core_worker/actor_creator.h @@ -32,6 +32,18 @@ class MockActorCreatorInterface : public ActorCreatorInterface { (const TaskSpecification &task_spec, const rpc::ClientCallback &callback), (override)); + MOCK_METHOD(Status, + AsyncRestartActor, + (const ActorID &actor_id, + uint64_t num_restarts, + gcs::StatusCallback callback), + (override)); + MOCK_METHOD(Status, + AsyncReportActorOutOfScope, + (const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + gcs::StatusCallback callback), + (override)); MOCK_METHOD(void, AsyncWaitForActorRegisterFinish, (const ActorID &actor_id, gcs::StatusCallback callback), diff --git a/src/mock/ray/core_worker/core_worker.h b/src/mock/ray/core_worker/core_worker.h index aa64d1d0d2afa..60817fb7af1c6 100644 --- a/src/mock/ray/core_worker/core_worker.h +++ b/src/mock/ray/core_worker/core_worker.h @@ -58,9 +58,9 @@ class MockCoreWorker : public CoreWorker { rpc::SendReplyCallback send_reply_callback), (override)); MOCK_METHOD(void, - HandleWaitForActorOutOfScope, - (rpc::WaitForActorOutOfScopeRequest request, - rpc::WaitForActorOutOfScopeReply *reply, + HandleWaitForActorRefDeleted, + (rpc::WaitForActorRefDeletedRequest request, + rpc::WaitForActorRefDeletedReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); MOCK_METHOD(void, diff --git a/src/mock/ray/core_worker/reference_count.h b/src/mock/ray/core_worker/reference_count.h new file mode 100644 index 0000000000000..c0679dec135f5 --- /dev/null +++ b/src/mock/ray/core_worker/reference_count.h @@ -0,0 +1,56 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "gmock/gmock.h" +#include "ray/core_worker/reference_count.h" +namespace ray { +namespace core { + +class MockReferenceCounter : public ReferenceCounterInterface { + public: + MockReferenceCounter() : ReferenceCounterInterface() {} + + MOCK_METHOD2(AddLocalReference, + void(const ObjectID &object_id, const std::string &call_sit)); + + MOCK_METHOD4(AddBorrowedObject, + bool(const ObjectID &object_id, + const ObjectID &outer_id, + const rpc::Address &owner_address, + bool foreign_owner_already_monitoring)); + + MOCK_METHOD8(AddOwnedObject, + void(const ObjectID &object_id, + const std::vector &contained_ids, + const rpc::Address &owner_address, + const std::string &call_site, + const int64_t object_size, + bool is_reconstructable, + bool add_local_ref, + const absl::optional &pinned_at_raylet_id)); + + MOCK_METHOD2(AddObjectPrimaryCopyDeleteCallback, + bool(const ObjectID &object_id, + const std::function callback)); + + MOCK_METHOD2(SetObjectRefDeletedCallback, + bool(const ObjectID &object_id, + const std::function callback)); + + virtual ~MockReferenceCounter() {} +}; + +} // namespace core +} // namespace ray diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index c9316228c2f1c..abc82eb42999d 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -46,9 +46,9 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn const ClientCallback &callback), (override)); MOCK_METHOD(void, - WaitForActorOutOfScope, - (const WaitForActorOutOfScopeRequest &request, - const ClientCallback &callback), + WaitForActorRefDeleted, + (const WaitForActorRefDeletedRequest &request, + const ClientCallback &callback), (override)); MOCK_METHOD(void, PubsubLongPolling, diff --git a/src/ray/core_worker/actor_creator.h b/src/ray/core_worker/actor_creator.h index 49206e95f1b74..cf0d7d1a0e63b 100644 --- a/src/ray/core_worker/actor_creator.h +++ b/src/ray/core_worker/actor_creator.h @@ -38,6 +38,15 @@ class ActorCreatorInterface { virtual Status AsyncRegisterActor(const TaskSpecification &task_spec, gcs::StatusCallback callback) = 0; + virtual Status AsyncRestartActor(const ActorID &actor_id, + uint64_t num_restarts, + gcs::StatusCallback callback) = 0; + + virtual Status AsyncReportActorOutOfScope( + const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstructions, + gcs::StatusCallback callback) = 0; + /// Asynchronously request GCS to create the actor. /// /// \param task_spec The specification for the actor creation task. @@ -96,6 +105,19 @@ class DefaultActorCreator : public ActorCreatorInterface { }); } + Status AsyncRestartActor(const ActorID &actor_id, + uint64_t num_restarts, + gcs::StatusCallback callback) override { + return gcs_client_->Actors().AsyncRestartActor(actor_id, num_restarts, callback); + } + + Status AsyncReportActorOutOfScope(const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + gcs::StatusCallback callback) override { + return gcs_client_->Actors().AsyncReportActorOutOfScope( + actor_id, num_restarts_due_to_lineage_reconstruction, callback); + } + bool IsActorInRegistering(const ActorID &actor_id) const override { return registering_actors_->find(actor_id) != registering_actors_->end(); } diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 30087e711be93..02a89a7c65c91 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -37,7 +37,8 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han actor_id, actor_creation_return_id, add_local_ref, - is_self)); + is_self, + /*owned*/ false)); ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); return actor_id; @@ -135,7 +136,9 @@ bool ActorManager::AddNewActorHandle(std::unique_ptr actor_handle, caller_address, actor_id, actor_creation_return_id, - /*add_local_ref=*/false); + /*add_local_ref=*/false, + /*is_self*/ false, + owned); } bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, @@ -144,7 +147,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, const ActorID &actor_id, const ObjectID &actor_creation_return_id, bool add_local_ref, - bool is_self) { + bool is_self, + bool owned) { if (add_local_ref) { reference_counter_->AddLocalReference(actor_creation_return_id, call_site); } @@ -152,7 +156,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, actor_id, actor_handle->MaxPendingCalls(), actor_handle->ExecuteOutOfOrder(), - /*fail_if_actor_unreachable=*/actor_handle->MaxTaskRetries() == 0); + /*fail_if_actor_unreachable=*/actor_handle->MaxTaskRetries() == 0, + owned); bool inserted; { absl::MutexLock lock(&mutex_); @@ -166,6 +171,13 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, actor_task_submitter_->ConnectActor(actor_id, caller_address, /*num_restarts=*/0); } + if (inserted && owned) { + RAY_CHECK(reference_counter_->AddObjectPrimaryCopyDeleteCallback( + actor_creation_return_id, [this, actor_id](const ObjectID &object_id) { + MarkActorKilledOrOutOfScope(GetActorHandle(actor_id)); + })); + } + return inserted; } @@ -173,33 +185,23 @@ void ActorManager::OnActorKilled(const ActorID &actor_id) { MarkActorKilledOrOutOfScope(GetActorHandle(actor_id)); } -void ActorManager::WaitForActorOutOfScope( +void ActorManager::WaitForActorRefDeleted( const ActorID &actor_id, - std::function actor_out_of_scope_callback) { - absl::MutexLock lock(&mutex_); - auto it = actor_handles_.find(actor_id); - if (it == actor_handles_.end()) { - actor_out_of_scope_callback(actor_id); - } else { - auto actor_handle = it->second; - // GCS actor manager will wait until the actor has been created before polling the - // owner. This should avoid any asynchronous problems. - auto callback = [this, actor_id, actor_handle, actor_out_of_scope_callback]( - const ObjectID &object_id) { - MarkActorKilledOrOutOfScope(actor_handle); - actor_out_of_scope_callback(actor_id); - }; - - // Returns true if the object was present and the callback was added. It might have - // already been evicted by the time we get this request, in which case we should - // respond immediately so the gcs server can destroy the actor. - const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); - if (!reference_counter_->SetObjectPrimaryCopyDeleteCallback(actor_creation_return_id, - callback)) { - RAY_LOG(DEBUG) << "ActorID reference already gone for " << actor_id; - MarkActorKilledOrOutOfScope(actor_handle); - actor_out_of_scope_callback(actor_id); - } + std::function actor_ref_deleted_callback) { + // GCS actor manager will wait until the actor has been created before polling the + // owner. This should avoid any asynchronous problems. + auto callback = [actor_id, actor_ref_deleted_callback](const ObjectID &object_id) { + actor_ref_deleted_callback(actor_id); + }; + + // Returns true if the object was present and the callback was added. It might have + // already been evicted by the time we get this request, in which case we should + // respond immediately so the gcs server can destroy the actor. + const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); + if (!reference_counter_->SetObjectRefDeletedCallback(actor_creation_return_id, + callback)) { + RAY_LOG(DEBUG) << "ActorID reference already gone for " << actor_id; + callback(actor_creation_return_id); } } @@ -223,13 +225,15 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id, actor_task_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), /*is_dead=*/false, - actor_data.death_cause()); + actor_data.death_cause(), + /*is_restartable=*/true); } else if (actor_data.state() == rpc::ActorTableData::DEAD) { OnActorKilled(actor_id); actor_task_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), /*is_dead=*/true, - actor_data.death_cause()); + actor_data.death_cause(), + gcs::IsActorRestartable(actor_data)); // We cannot erase the actor handle here because clients can still // submit tasks to dead actors. This also means we defer unsubscription, // otherwise we crash when bulk unsubscribing all actor handles. diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 26e98e0139ff7..dcfe8e11a68d6 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -114,14 +114,14 @@ class ActorManager { const rpc::Address &caller_address, bool owned); - /// Wait for actor out of scope. + /// Wait for actor reference deletion. /// /// \param actor_id The actor id that owns the callback. - /// \param actor_out_of_scope_callback The callback function that will be called when - /// an actor_id goes out of scope. - void WaitForActorOutOfScope( + /// \param actor_ref_deleted_callback The callback function that will be called when + /// an actor_id has no references. + void WaitForActorRefDeleted( const ActorID &actor_id, - std::function actor_out_of_scope_callback); + std::function actor_ref_deleted_callback); /// Get a list of actor_ids from existing actor handles. /// This is used for debugging purpose. @@ -150,10 +150,10 @@ class ActorManager { /// \param[in] call_site The caller's site. /// \param[in] actor_id The id of an actor /// \param[in] actor_creation_return_id object id of this actor creation - /// \param[in] Whether to add a local reference for this actor. + /// \param[in] add_local_ref Whether to add a local reference for this actor. /// \param[in] is_self Whether this handle is current actor's handle. If true, actor - /// to the same actor. /// manager won't subscribe actor info from GCS. + /// \param[in] owned Whether the actor is owned by the current process. /// \return True if the handle was added and False if we already had a handle /// to the same actor. bool AddActorHandle(std::unique_ptr actor_handle, @@ -162,7 +162,8 @@ class ActorManager { const ActorID &actor_id, const ObjectID &actor_creation_return_id, bool add_local_ref, - bool is_self = false); + bool is_self, + bool owned); /// Check if named actor is cached locally. /// If it has been cached, core worker will not get actor id by name from GCS. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 972cf9dd07b86..36f6daea238ca 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -522,7 +522,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ *task_manager_, *actor_creator_, on_excess_queueing, - io_service_)); + io_service_, + reference_counter_)); auto node_addr_factory = [this](const NodeID &node_id) { absl::optional addr; @@ -2349,7 +2350,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, root_detached_actor_id); // Add the actor handle before we submit the actor creation task, since the // actor handle must be in scope by the time the GCS sends the - // WaitForActorOutOfScopeRequest. + // WaitForActorRefDeletedRequest. RAY_CHECK(actor_manager_->AddNewActorHandle( std::move(actor_handle), CurrentCallSite(), rpc_address_, /*owned=*/!is_detached)) << "Actor " << actor_id << " already exists"; @@ -3650,11 +3651,10 @@ void CoreWorker::PopulateObjectStatus(const ObjectID &object_id, } } -void CoreWorker::HandleWaitForActorOutOfScope( - rpc::WaitForActorOutOfScopeRequest request, - rpc::WaitForActorOutOfScopeReply *reply, +void CoreWorker::HandleWaitForActorRefDeleted( + rpc::WaitForActorRefDeletedRequest request, + rpc::WaitForActorRefDeletedReply *reply, rpc::SendReplyCallback send_reply_callback) { - // Currently WaitForActorOutOfScope is only used when GCS actor service is enabled. if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -3663,7 +3663,7 @@ void CoreWorker::HandleWaitForActorOutOfScope( // Send a response to trigger cleaning up the actor state once the handle is // no longer in scope. auto respond = [send_reply_callback](const ActorID &actor_id) { - RAY_LOG(DEBUG).WithField(actor_id) << "Replying to HandleWaitForActorOutOfScope"; + RAY_LOG(DEBUG).WithField(actor_id) << "Replying to HandleWaitForActorRefDeleted"; send_reply_callback(Status::OK(), nullptr, nullptr); }; @@ -3674,13 +3674,13 @@ void CoreWorker::HandleWaitForActorOutOfScope( if (!status.ok()) { respond(actor_id); } else { - RAY_LOG(DEBUG).WithField(actor_id) << "Received HandleWaitForActorOutOfScope"; - actor_manager_->WaitForActorOutOfScope(actor_id, std::move(respond)); + RAY_LOG(DEBUG).WithField(actor_id) << "Received HandleWaitForActorRefDeleted"; + actor_manager_->WaitForActorRefDeleted(actor_id, std::move(respond)); } }); } else { - RAY_LOG(DEBUG).WithField(actor_id) << "Received HandleWaitForActorOutOfScope"; - actor_manager_->WaitForActorOutOfScope(actor_id, std::move(respond)); + RAY_LOG(DEBUG).WithField(actor_id) << "Received HandleWaitForActorRefDeleted"; + actor_manager_->WaitForActorRefDeleted(actor_id, std::move(respond)); } } @@ -3729,7 +3729,7 @@ void CoreWorker::ProcessSubscribeForObjectEviction( // Returns true if the object was present and the callback was added. It might have // already been evicted by the time we get this request, in which case we should // respond immediately so the raylet unpins the object. - if (!reference_counter_->SetObjectPrimaryCopyDeleteCallback(object_id, unpin_object)) { + if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback(object_id, unpin_object)) { // If the object is already evicted (callback cannot be set), unregister the // subscription & publish the message so that the subscriber knows it. unpin_object(object_id); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e281c7879df9a..a0c08bc4b7b00 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1276,8 +1276,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::SendReplyCallback send_reply_callback) override; /// Implements gRPC server handler. - void HandleWaitForActorOutOfScope(rpc::WaitForActorOutOfScopeRequest request, - rpc::WaitForActorOutOfScopeReply *reply, + void HandleWaitForActorRefDeleted(rpc::WaitForActorRefDeletedRequest request, + rpc::WaitForActorRefDeletedReply *reply, rpc::SendReplyCallback send_reply_callback) override; // Implements gRPC server handler. diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 1e984f03753b3..fecb97071b298 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -741,6 +741,9 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) { num_objects_owned_by_us_--; } } + if (it->second.on_object_ref_delete) { + it->second.on_object_ref_delete(it->first); + } object_id_refs_.erase(it); ShutdownIfNeeded(); } @@ -762,11 +765,13 @@ int64_t ReferenceCounter::EvictLineage(int64_t min_bytes_to_evict) { } void ReferenceCounter::DeleteObjectPrimaryCopy(ReferenceTable::iterator it) { - if (it->second.on_object_primary_copy_delete) { - RAY_LOG(DEBUG) << "Calling on_object_primary_copy_delete for object " << it->first; - it->second.on_object_primary_copy_delete(it->first); - it->second.on_object_primary_copy_delete = nullptr; + RAY_LOG(DEBUG) << "Calling on_object_primary_copy_delete for object " << it->first + << " num callbacks: " + << it->second.on_object_primary_copy_delete_callbacks.size(); + for (const auto &callback : it->second.on_object_primary_copy_delete_callbacks) { + callback(it->first); } + it->second.on_object_primary_copy_delete_callbacks.clear(); it->second.pinned_at_raylet_id.reset(); if (it->second.spilled && !it->second.spilled_node_id.IsNil()) { // The spilled copy of the object should get deleted during the @@ -779,7 +784,18 @@ void ReferenceCounter::DeleteObjectPrimaryCopy(ReferenceTable::iterator it) { } } -bool ReferenceCounter::SetObjectPrimaryCopyDeleteCallback( +bool ReferenceCounter::SetObjectRefDeletedCallback( + const ObjectID &object_id, const std::function callback) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + if (it == object_id_refs_.end()) { + return false; + } + it->second.on_object_ref_delete = callback; + return true; +} + +bool ReferenceCounter::AddObjectPrimaryCopyDeleteCallback( const ObjectID &object_id, const std::function callback) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); @@ -796,13 +812,7 @@ bool ReferenceCounter::SetObjectPrimaryCopyDeleteCallback( return false; } - // NOTE: In two cases, `GcsActorManager` will send `WaitForActorOutOfScope` request more - // than once, causing the delete callback to be set repeatedly. - // 1.If actors have not been registered successfully before GCS restarts, gcs client - // will resend the registration request after GCS restarts. - // 2.After GCS restarts, GCS will send `WaitForActorOutOfScope` request to owned actors - // again. - it->second.on_object_primary_copy_delete = callback; + it->second.on_object_primary_copy_delete_callbacks.emplace_back(callback); return true; } diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index b188e21106935..f7122f4e414d1 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -49,7 +49,10 @@ class ReferenceCounterInterface { bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id = absl::optional()) = 0; - virtual bool SetObjectPrimaryCopyDeleteCallback( + virtual bool AddObjectPrimaryCopyDeleteCallback( + const ObjectID &object_id, + const std::function callback) = 0; + virtual bool SetObjectRefDeletedCallback( const ObjectID &object_id, const std::function callback) = 0; @@ -316,13 +319,19 @@ class ReferenceCounter : public ReferenceCounterInterface, void FreePlasmaObjects(const std::vector &object_ids) ABSL_LOCKS_EXCLUDED(mutex_); - /// Sets the callback that will be run when the object goes out of scope. + /// Adds the callback that will be run when the object goes out of scope + /// (Reference.OutOfScope() returns true). /// Returns true if the object was in scope and the callback was added, else false. - bool SetObjectPrimaryCopyDeleteCallback( + bool AddObjectPrimaryCopyDeleteCallback( const ObjectID &object_id, const std::function callback) ABSL_LOCKS_EXCLUDED(mutex_); - void ResetDeleteCallbacks(const std::vector &object_ids) + /// Sets the callback that will be run when the object reference is deleted + /// from the reference table (all refs including lineage ref count go to 0). + /// Returns true if the object was in the reference table and the callback was added + /// else false. + bool SetObjectRefDeletedCallback(const ObjectID &object_id, + const std::function callback) ABSL_LOCKS_EXCLUDED(mutex_); /// Set a callback for when we are no longer borrowing this object (when our @@ -778,7 +787,14 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Callback that will be called when this Object's primary copy /// should be deleted: out of scope or internal_api.free - std::function on_object_primary_copy_delete; + /// Note: when an object is out of scope, it can still + /// have lineage ref count and on_object_ref_delete + /// will be called when lineage ref count is also 0. + std::vector> + on_object_primary_copy_delete_callbacks; + /// Callback that will be called when the object ref is deleted + /// from the reference table (all refs including lineage ref count go to 0). + std::function on_object_ref_delete; /// Callback that is called when this process is no longer a borrower /// (RefCount() == 0). std::function on_ref_removed; diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 2e9b236a4be28..b5c938c6c0ceb 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -16,9 +16,9 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "mock/ray/core_worker/reference_count.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" -#include "ray/core_worker/reference_count.h" #include "ray/core_worker/transport/task_receiver.h" #include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_client/gcs_client.h" @@ -97,27 +97,22 @@ class MockGcsClient : public gcs::GcsClient { class MockActorTaskSubmitter : public ActorTaskSubmitterInterface { public: MockActorTaskSubmitter() : ActorTaskSubmitterInterface() {} - void AddActorQueueIfNotExists(const ActorID &actor_id, - int32_t max_pending_calls, - bool execute_out_of_order = false, - bool fail_if_actor_unreachable = true) override { - AddActorQueueIfNotExists_( - actor_id, max_pending_calls, execute_out_of_order, fail_if_actor_unreachable); - } - MOCK_METHOD4(AddActorQueueIfNotExists_, + MOCK_METHOD5(AddActorQueueIfNotExists, void(const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order, - bool fail_if_actor_unreachable)); + bool fail_if_actor_unreachable, + bool owned)); MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts)); - MOCK_METHOD4(DisconnectActor, + MOCK_METHOD5(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts, bool dead, - const rpc::ActorDeathCause &death_cause)); + const rpc::ActorDeathCause &death_cause, + bool is_restartable)); MOCK_METHOD0(CheckTimeoutTasks, void()); @@ -126,36 +121,6 @@ class MockActorTaskSubmitter : public ActorTaskSubmitterInterface { virtual ~MockActorTaskSubmitter() {} }; -class MockReferenceCounter : public ReferenceCounterInterface { - public: - MockReferenceCounter() : ReferenceCounterInterface() {} - - MOCK_METHOD2(AddLocalReference, - void(const ObjectID &object_id, const std::string &call_sit)); - - MOCK_METHOD4(AddBorrowedObject, - bool(const ObjectID &object_id, - const ObjectID &outer_id, - const rpc::Address &owner_address, - bool foreign_owner_already_monitoring)); - - MOCK_METHOD8(AddOwnedObject, - void(const ObjectID &object_id, - const std::vector &contained_ids, - const rpc::Address &owner_address, - const std::string &call_site, - const int64_t object_size, - bool is_reconstructable, - bool add_local_ref, - const absl::optional &pinned_at_raylet_id)); - - MOCK_METHOD2(SetObjectPrimaryCopyDeleteCallback, - bool(const ObjectID &object_id, - const std::function callback)); - - virtual ~MockReferenceCounter() {} -}; - class ActorManagerTest : public ::testing::Test { public: ActorManagerTest() @@ -203,7 +168,7 @@ class ActorManagerTest : public ::testing::Test { ray_namespace, -1, false); - EXPECT_CALL(*reference_counter_, SetObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); actor_manager_->AddNewActorHandle(std::move(actor_handle), call_site, @@ -242,7 +207,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { "", -1, false); - EXPECT_CALL(*reference_counter_, SetObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); // Add an actor handle. @@ -285,7 +250,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); // Now actor state is updated to DEAD. Make sure it is disconnected. - EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _)).Times(1); + EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _, _)).Times(1); actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::DEAD); actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); @@ -319,7 +284,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { "", -1, false); - EXPECT_CALL(*reference_counter_, SetObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); ObjectID outer_object_id = ObjectID::Nil(); @@ -344,7 +309,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationPending) { ActorID actor_id = AddActorHandle(); // Nothing happens if state is pending. EXPECT_CALL(*actor_task_submitter_, ConnectActor(_, _, _)).Times(0); - EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _)).Times(0); + EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::PENDING_CREATION); @@ -356,7 +321,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) { ActorID actor_id = AddActorHandle(); // Should disconnect to an actor when actor is restarting. EXPECT_CALL(*actor_task_submitter_, ConnectActor(_, _, _)).Times(0); - EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _)).Times(1); + EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::RESTARTING); @@ -368,7 +333,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationDead) { ActorID actor_id = AddActorHandle(); // Should disconnect to an actor when actor is dead. EXPECT_CALL(*actor_task_submitter_, ConnectActor(_, _, _)).Times(0); - EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _)).Times(1); + EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::DEAD); @@ -380,7 +345,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationAlive) { ActorID actor_id = AddActorHandle(); // Should connect to an actor when actor is alive. EXPECT_CALL(*actor_task_submitter_, ConnectActor(_, _, _)).Times(1); - EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _)).Times(0); + EXPECT_CALL(*actor_task_submitter_, DisconnectActor(_, _, _, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::ALIVE); diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index ae01ca1c0653e..4059c1b84b3b9 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -140,6 +140,18 @@ class MockActorCreator : public ActorCreatorInterface { return Status::OK(); } + Status AsyncRestartActor(const ActorID &actor_id, + uint64_t num_restarts, + gcs::StatusCallback callback) override { + return Status::OK(); + } + + Status AsyncReportActorOutOfScope(const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + gcs::StatusCallback callback) override { + return Status::OK(); + } + void AsyncWaitForActorRegisterFinish(const ActorID &, gcs::StatusCallback callback) override { callbacks.push_back(callback); diff --git a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc index c588ac1005635..ace066245a497 100644 --- a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc @@ -19,6 +19,7 @@ #include "ray/core_worker/actor_creator.h" #include "mock/ray/core_worker/task_manager.h" #include "mock/ray/gcs/gcs_client/gcs_client.h" +#include "mock/ray/core_worker/reference_count.h" // clang-format on namespace ray { @@ -36,8 +37,14 @@ class DirectTaskTransportTest : public ::testing::Test { client_pool = std::make_shared( [&](const rpc::Address &) { return nullptr; }); memory_store = std::make_unique(); - actor_task_submitter = std::make_unique( - *client_pool, *memory_store, *task_finisher, *actor_creator, nullptr, io_context); + reference_counter = std::make_shared(); + actor_task_submitter = std::make_unique(*client_pool, + *memory_store, + *task_finisher, + *actor_creator, + nullptr, + io_context, + reference_counter); } TaskSpecification GetActorTaskSpec(const ActorID &actor_id) { @@ -74,6 +81,7 @@ class DirectTaskTransportTest : public ::testing::Test { std::shared_ptr task_finisher; std::unique_ptr actor_creator; std::shared_ptr gcs_client; + std::shared_ptr reference_counter; }; TEST_F(DirectTaskTransportTest, ActorCreationOk) { @@ -122,7 +130,11 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) { ::testing::Return(Status::OK()))); ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok()); ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); - actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1); + actor_task_submitter->AddActorQueueIfNotExists(actor_id, + -1, + /*execute_out_of_order*/ false, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); ASSERT_TRUE(CheckSubmitTask(task_spec)); EXPECT_CALL( *task_finisher, @@ -147,7 +159,11 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) { ::testing::Return(Status::OK()))); ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok()); ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); - actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1); + actor_task_submitter->AddActorQueueIfNotExists(actor_id, + -1, + /*execute_out_of_order*/ false, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); ASSERT_TRUE(CheckSubmitTask(task_spec)); EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _, _)).Times(0); register_cb(Status::OK()); diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 468e8292faa1c..2c73107acae6a 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -24,6 +24,7 @@ #include "ray/rpc/worker/core_worker_client.h" #include "mock/ray/core_worker/actor_creator.h" #include "mock/ray/core_worker/task_manager.h" +#include "mock/ray/core_worker/reference_count.h" // clang-format on // clang-format off @@ -114,6 +115,7 @@ class ActorTaskSubmitterTest : public ::testing::TestWithParam { store_(std::make_shared()), task_finisher_(std::make_shared()), io_work(io_context), + reference_counter_(std::make_shared()), submitter_( *client_pool_, *store_, @@ -122,7 +124,8 @@ class ActorTaskSubmitterTest : public ::testing::TestWithParam { [this](const ActorID &actor_id, int64_t num_queued) { last_queue_warning_ = num_queued; }, - io_context) {} + io_context, + reference_counter_) {} void TearDown() override { io_context.stop(); } @@ -135,6 +138,7 @@ class ActorTaskSubmitterTest : public ::testing::TestWithParam { std::shared_ptr task_finisher_; instrumented_io_context io_context; boost::asio::io_service::work io_work; + std::shared_ptr reference_counter_; ActorTaskSubmitter submitter_; protected: @@ -150,7 +154,11 @@ TEST_P(ActorTaskSubmitterTest, TestSubmitTask) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); auto task = CreateActorTaskHelper(actor_id, worker_id, 0); ASSERT_TRUE(CheckSubmitTask(task)); @@ -184,7 +192,11 @@ TEST_P(ActorTaskSubmitterTest, TestQueueingWarning) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); submitter_.ConnectActor(actor_id, addr, 0); for (int i = 0; i < 7500; i++) { @@ -215,7 +227,11 @@ TEST_P(ActorTaskSubmitterTest, TestDependencies) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -250,7 +266,11 @@ TEST_P(ActorTaskSubmitterTest, TestOutOfOrderDependencies) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -300,7 +320,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorDead) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -323,11 +347,13 @@ TEST_P(ActorTaskSubmitterTest, TestActorDead) { EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _, _)).Times(0); const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // Actor marked as dead. All queued tasks should get failed. EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1); - submitter_.DisconnectActor(actor_id, 2, /*dead=*/true, death_cause); + submitter_.DisconnectActor( + actor_id, 2, /*dead=*/true, death_cause, /*is_restartable=*/false); } TEST_P(ActorTaskSubmitterTest, TestActorRestartNoRetry) { @@ -336,7 +362,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartNoRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -363,7 +393,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartNoRetry) { // Simulate the actor failing. const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // Third task fails after the actor is disconnected. It should not get // retried. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); @@ -389,7 +420,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -419,7 +454,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartRetry) { // Simulate the actor failing. const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // Third task fails after the actor is disconnected. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); @@ -450,7 +486,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -476,7 +516,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderRetry) { // Simulate the actor failing. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""), /*index=*/0)); const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // Actor gets restarted. addr.set_port(1); @@ -511,7 +552,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -536,7 +581,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { // We receive the RESTART message late. Nothing happens. const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 2); @@ -545,7 +591,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); // The actor dies twice. We receive the last RESTART message first. - submitter_.DisconnectActor(actor_id, 3, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 3, /*dead=*/false, death_cause, /*is_restartable=*/true); ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 3); @@ -560,15 +607,18 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { // We receive the late messages. Nothing happens. addr.set_port(2); submitter_.ConnectActor(actor_id, addr, 2); - submitter_.DisconnectActor(actor_id, 2, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 2, /*dead=*/false, death_cause, /*is_restartable=*/true); ASSERT_EQ(num_clients_connected_, 2); // The actor dies permanently. - submitter_.DisconnectActor(actor_id, 3, /*dead=*/true, death_cause); + submitter_.DisconnectActor( + actor_id, 3, /*dead=*/true, death_cause, /*is_restartable=*/false); ASSERT_EQ(num_clients_connected_, 2); // We receive more late messages. Nothing happens because the actor is dead. - submitter_.DisconnectActor(actor_id, 4, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 4, /*dead=*/false, death_cause, /*is_restartable=*/true); addr.set_port(3); submitter_.ConnectActor(actor_id, addr, 4); ASSERT_EQ(num_clients_connected_, 2); @@ -585,7 +635,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartFailInflightTasks) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -610,7 +664,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartFailInflightTasks) { EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _, _)) .Times(1); const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // The task replies are now received. Since the tasks are already failed, they will not // be marked as failed or finished again. @@ -632,7 +687,11 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartFastFail) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + -1, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -646,7 +705,8 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartFastFail) { // Actor failed and is now restarting. const auto death_cause = CreateMockDeathCause(); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); + submitter_.DisconnectActor( + actor_id, 1, /*dead=*/false, death_cause, /*is_restartable=*/true); // Submit a new task. This task should fail immediately because "max_task_retries" is 0. auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); @@ -664,7 +724,11 @@ TEST_P(ActorTaskSubmitterTest, TestPendingTasks) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, max_pending_calls, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, + max_pending_calls, + execute_out_of_order, + /*fail_if_actor_unreachable*/ true, + /*owned*/ false); addr.set_port(0); // Submit number of `max_pending_calls` tasks would be OK. diff --git a/src/ray/core_worker/test/normal_task_submitter_test.cc b/src/ray/core_worker/test/normal_task_submitter_test.cc index 1c1bc33b16b16..4b3ccda7f4a93 100644 --- a/src/ray/core_worker/test/normal_task_submitter_test.cc +++ b/src/ray/core_worker/test/normal_task_submitter_test.cc @@ -338,6 +338,18 @@ class MockActorCreator : public ActorCreatorInterface { return Status::OK(); } + Status AsyncRestartActor(const ActorID &actor_id, + uint64_t num_restarts, + gcs::StatusCallback callback) override { + return Status::OK(); + } + + Status AsyncReportActorOutOfScope(const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + gcs::StatusCallback callback) override { + return Status::OK(); + } + Status AsyncCreateActor( const TaskSpecification &task_spec, const rpc::ClientCallback &callback) override { diff --git a/src/ray/core_worker/test/reference_count_test.cc b/src/ray/core_worker/test/reference_count_test.cc index 50d2915142a80..7374c4798ecee 100644 --- a/src/ray/core_worker/test/reference_count_test.cc +++ b/src/ray/core_worker/test/reference_count_test.cc @@ -573,9 +573,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // The object goes out of scope once it has no more refs. std::vector out; - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); ASSERT_TRUE(*out_of_scope); @@ -583,9 +583,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // Unreconstructable objects go out of scope even if they have a nonzero // lineage ref count. *out_of_scope = false; - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({}, {id}); ASSERT_FALSE(*out_of_scope); rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out); @@ -2437,9 +2437,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // The object goes out of scope once it has no more refs. std::vector out; - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); @@ -2450,9 +2450,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // Unreconstructable objects stay in scope if they have a nonzero lineage ref // count. *out_of_scope = false; - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({return_id}, {id}); ASSERT_TRUE(rc->IsObjectPendingCreation(return_id)); ASSERT_FALSE(*out_of_scope); @@ -2541,7 +2541,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out); // We should fail to set the deletion callback because the object has // already gone out of scope. - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback( + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback( id, [&](const ObjectID &object_id) { ASSERT_FALSE(true); })); ASSERT_EQ(out.size(), 1); @@ -2658,7 +2658,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); @@ -2674,7 +2674,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { deleted->clear(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); rc->ResetObjectsOnRemovedNode(node_id); auto objects = rc->FlushObjectsToRecover(); @@ -2699,7 +2699,7 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); - ASSERT_FALSE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); bool owned_by_us; @@ -2714,7 +2714,7 @@ TEST_F(ReferenceCountTest, TestFree) { // Test free after receiving information about where the object is pinned. rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->SetObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->FreePlasmaObjects({id}); diff --git a/src/ray/core_worker/transport/actor_submit_queue.h b/src/ray/core_worker/transport/actor_submit_queue.h index 96991d9ac887e..a359239b46759 100644 --- a/src/ray/core_worker/transport/actor_submit_queue.h +++ b/src/ray/core_worker/transport/actor_submit_queue.h @@ -80,6 +80,7 @@ class IActorSubmitQueue { /// Mark a task has been executed on the receiver side. virtual void MarkSeqnoCompleted(uint64_t sequence_no, const TaskSpecification &task_spec) = 0; + virtual bool Empty() = 0; }; } // namespace core } // namespace ray diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index c4bda9007ca10..c3ce03d9ce1ab 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -25,19 +25,68 @@ using namespace ray::gcs; namespace ray { namespace core { +void ActorTaskSubmitter::NotifyGCSWhenActorOutOfScope( + const ActorID &actor_id, uint64_t num_restarts_due_to_lineage_reconstruction) { + const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); + auto actor_out_of_scope_callback = [this, + actor_id, + num_restarts_due_to_lineage_reconstruction]( + const ObjectID &object_id) { + { + absl::MutexLock lock(&mu_); + if (auto iter = client_queues_.find(actor_id); iter != client_queues_.end()) { + if (iter->second.state != rpc::ActorTableData::DEAD) { + iter->second.pending_out_of_scope_death = true; + } + } + } + RAY_CHECK_OK(actor_creator_.AsyncReportActorOutOfScope( + actor_id, num_restarts_due_to_lineage_reconstruction, [actor_id](Status status) { + if (!status.ok()) { + RAY_LOG(ERROR).WithField(actor_id) + << "Failed to report actor out of scope: " << status + << ". The actor will not be killed"; + } + })); + }; + + if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback( + actor_creation_return_id, + [actor_out_of_scope_callback](const ObjectID &object_id) { + actor_out_of_scope_callback(object_id); + })) { + RAY_LOG(DEBUG).WithField(actor_id) << "Actor already out of scope"; + actor_out_of_scope_callback(actor_creation_return_id); + } +} + void ActorTaskSubmitter::AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order, - bool fail_if_actor_unreachable) { - absl::MutexLock lock(&mu_); - // No need to check whether the insert was successful, since it is possible - // for this worker to have multiple references to the same actor. - RAY_LOG(INFO).WithField(actor_id) - << "Set actor max pending calls to " << max_pending_calls; - client_queues_.emplace( - actor_id, - ClientQueue( - actor_id, execute_out_of_order, max_pending_calls, fail_if_actor_unreachable)); + bool fail_if_actor_unreachable, + bool owned) { + bool inserted; + { + absl::MutexLock lock(&mu_); + // No need to check whether the insert was successful, since it is possible + // for this worker to have multiple references to the same actor. + RAY_LOG(INFO).WithField(actor_id) + << "Set actor max pending calls to " << max_pending_calls; + inserted = client_queues_ + .emplace(actor_id, + ClientQueue(actor_id, + execute_out_of_order, + max_pending_calls, + fail_if_actor_unreachable, + owned)) + .second; + } + if (owned && inserted) { + // Actor owner is responsible for notifying GCS when the + // actor is out of scope so that GCS can kill the actor. + NotifyGCSWhenActorOutOfScope(actor_id, + /*num_restarts_due_to_lineage_reconstruction*/ 0); + } } Status ActorTaskSubmitter::SubmitActorCreationTask(TaskSpecification task_spec) { @@ -124,6 +173,10 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); + if (queue->second.state == rpc::ActorTableData::DEAD && + queue->second.is_restartable && queue->second.owned) { + RestartActor(actor_id); + } if (queue->second.state != rpc::ActorTableData::DEAD) { // We must fix the send order prior to resolving dependencies, which may // complete out of order. This ensures that we will not deadlock due to @@ -281,10 +334,37 @@ void ActorTaskSubmitter::ConnectActor(const ActorID &actor_id, FailInflightTasks(inflight_task_callbacks); } +void ActorTaskSubmitter::RestartActor(const ActorID &actor_id) { + RAY_LOG(INFO).WithField(actor_id) << "Reconstructing actor"; + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + RAY_CHECK(queue->second.owned) << "Only owner can restart the dead actor"; + RAY_CHECK(queue->second.is_restartable) << "This actor is no longer restartable"; + queue->second.state = rpc::ActorTableData::RESTARTING; + queue->second.num_restarts_due_to_lineage_reconstructions += 1; + RAY_CHECK_OK(actor_creator_.AsyncRestartActor( + actor_id, + queue->second.num_restarts_due_to_lineage_reconstructions, + [this, + actor_id, + num_restarts_due_to_lineage_reconstructions = + queue->second.num_restarts_due_to_lineage_reconstructions](Status status) { + if (!status.ok()) { + RAY_LOG(ERROR).WithField(actor_id) + << "Failed to reconstruct actor. Error message: " << status.ToString(); + } else { + // Notify GCS when the actor is out of scope again. + NotifyGCSWhenActorOutOfScope(actor_id, + num_restarts_due_to_lineage_reconstructions); + } + })); +} + void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead, - const rpc::ActorDeathCause &death_cause) { + const rpc::ActorDeathCause &death_cause, + bool is_restartable) { RAY_LOG(DEBUG).WithField(actor_id) << "Disconnecting from actor, death context type=" << GetActorDeathCauseString(death_cause); @@ -297,7 +377,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); if (!dead) { - RAY_CHECK(num_restarts > 0); + RAY_CHECK_GT(num_restarts, 0); } if (num_restarts <= queue->second.num_restarts && !dead) { // This message is about an old version of the actor that has already been @@ -317,16 +397,29 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, if (dead) { queue->second.state = rpc::ActorTableData::DEAD; queue->second.death_cause = death_cause; - // If there are pending requests, treat the pending tasks as failed. - RAY_LOG(INFO).WithField(actor_id) - << "Failing pending tasks for actor because the actor is already dead."; - - task_ids_to_fail = queue->second.actor_submit_queue->ClearAllTasks(); - // We need to execute this outside of the lock to prevent deadlock. - wait_for_death_info_tasks = std::move(queue->second.wait_for_death_info_tasks); - // Reset the queue - queue->second.wait_for_death_info_tasks = - std::deque>(); + queue->second.pending_out_of_scope_death = false; + queue->second.is_restartable = is_restartable; + + if (queue->second.is_restartable && queue->second.owned) { + // Actor is out of scope so there should be no inflight actor tasks. + RAY_CHECK(queue->second.wait_for_death_info_tasks.empty()); + RAY_CHECK(inflight_task_callbacks.empty()); + if (!queue->second.actor_submit_queue->Empty()) { + // There are pending lineage reconstruction tasks. + RestartActor(actor_id); + } + } else { + // If there are pending requests, treat the pending tasks as failed. + RAY_LOG(INFO).WithField(actor_id) + << "Failing pending tasks for actor because the actor is already dead."; + + task_ids_to_fail = queue->second.actor_submit_queue->ClearAllTasks(); + // We need to execute this outside of the lock to prevent deadlock. + wait_for_death_info_tasks = std::move(queue->second.wait_for_death_info_tasks); + // Reset the queue + queue->second.wait_for_death_info_tasks = + std::deque>(); + } } else if (queue->second.state != rpc::ActorTableData::DEAD) { // Only update the actor's state if it is not permanently dead. The actor // will eventually get restarted or marked as permanently dead. @@ -383,6 +476,7 @@ void ActorTaskSubmitter::FailTaskWithError(const PendingTaskWaitingForDeathInfo // preempted and it's dead. auto actor_death_cause = error_info.mutable_actor_died_error(); auto actor_died_error_context = actor_death_cause->mutable_actor_died_error_context(); + actor_died_error_context->set_reason(rpc::ActorDiedErrorContext::NODE_DIED); actor_died_error_context->set_actor_id(task.task_spec.ActorId().Binary()); auto node_death_info = actor_died_error_context->mutable_node_death_info(); node_death_info->set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED); @@ -427,6 +521,13 @@ void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { RAY_CHECK(it != client_queues_.end()); auto &client_queue = it->second; auto &actor_submit_queue = client_queue.actor_submit_queue; + if (client_queue.pending_out_of_scope_death) { + // Wait until the actor is dead and then decide + // whether we should fail pending tasks or restart the actor. + // If the actor is restarted, ConnectActor will be called + // and pending tasks will be sent at that time. + return; + } if (!client_queue.rpc_client) { if (client_queue.state == rpc::ActorTableData::RESTARTING && client_queue.fail_if_actor_unreachable) { diff --git a/src/ray/core_worker/transport/actor_task_submitter.h b/src/ray/core_worker/transport/actor_task_submitter.h index b5d470730ac4d..ebee8ebceaab7 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.h +++ b/src/ray/core_worker/transport/actor_task_submitter.h @@ -50,15 +50,17 @@ class ActorTaskSubmitterInterface { public: virtual void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls, - bool execute_out_of_order = false, - bool fail_if_actor_unreachable = true) = 0; + bool execute_out_of_order, + bool fail_if_actor_unreachable, + bool owned) = 0; virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts) = 0; virtual void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead, - const rpc::ActorDeathCause &death_cause) = 0; + const rpc::ActorDeathCause &death_cause, + bool is_restartable) = 0; virtual void CheckTimeoutTasks() = 0; @@ -77,13 +79,15 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator, std::function warn_excess_queueing, - instrumented_io_context &io_service) + instrumented_io_context &io_service, + std::shared_ptr reference_counter) : core_worker_client_pool_(core_worker_client_pool), actor_creator_(actor_creator), resolver_(store, task_finisher, actor_creator), task_finisher_(task_finisher), warn_excess_queueing_(warn_excess_queueing), - io_service_(io_service) { + io_service_(io_service), + reference_counter_(reference_counter) { next_queueing_warn_threshold_ = ::RayConfig::instance().actor_excess_queueing_warn_threshold(); } @@ -104,11 +108,13 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// \param[in] max_pending_calls The max pending calls for the actor to be added. /// \param[in] execute_out_of_order Whether to execute tasks out of order. /// \param[in] fail_if_actor_unreachable Whether to fail newly submitted tasks + /// \param[in] owned Whether the actor is owned by the current process. /// immediately when the actor is unreachable. void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls, - bool execute_out_of_order = false, - bool fail_if_actor_unreachable = true); + bool execute_out_of_order, + bool fail_if_actor_unreachable, + bool owned); /// Submit a task to an actor for execution. /// @@ -137,13 +143,15 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// \param[in] num_restarts How many times this actor has been restarted /// before. If we've already seen a later incarnation of the actor, we will /// ignore the command to connect. - /// \param[in] dead Whether the actor is permanently dead. In this case, all + /// \param[in] dead Whether the actor is dead. In this case, all /// pending tasks for the actor should be failed. /// \param[in] death_cause Context about why this actor is dead. + /// \param[in] is_restartable Whether the dead actor is restartable. void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead, - const rpc::ActorDeathCause &death_cause); + const rpc::ActorDeathCause &death_cause, + bool is_restartable); /// Set the timerstamp for the caller. void SetCallerCreationTimestamp(int64_t timestamp); @@ -267,9 +275,11 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { ClientQueue(ActorID actor_id, bool execute_out_of_order, int32_t max_pending_calls, - bool fail_if_actor_unreachable) + bool fail_if_actor_unreachable, + bool owned) : max_pending_calls(max_pending_calls), - fail_if_actor_unreachable(fail_if_actor_unreachable) { + fail_if_actor_unreachable(fail_if_actor_unreachable), + owned(owned) { if (execute_out_of_order) { actor_submit_queue = std::make_unique(actor_id); } else { @@ -288,6 +298,9 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// indicate that the actor is not yet created. This is used to drop stale /// messages from the GCS. int64_t num_restarts = -1; + /// How many times this actor has been lineage reconstructured. + /// This is used to drop stale messages. + int64_t num_restarts_due_to_lineage_reconstructions = 0; /// Whether this actor exits by spot preemption. bool preempted = false; /// The RPC client. We use shared_ptr to enable shared_from_this for @@ -295,6 +308,11 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { std::shared_ptr rpc_client = nullptr; /// The intended worker ID of the actor. std::string worker_id = ""; + /// The actor is out of scope but the death info is not published + /// to this worker yet. + bool pending_out_of_scope_death = false; + /// If the actor is dead, whether it can be restarted. + bool is_restartable = false; /// The queue that orders actor requests. std::unique_ptr actor_submit_queue; @@ -331,6 +349,9 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// Whether to fail newly submitted tasks immediately when the actor is unreachable. bool fail_if_actor_unreachable = true; + /// Whether the current process is owner of the actor. + bool owned; + /// Returns debug string for class. /// /// \return string. @@ -365,6 +386,9 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// Send all pending tasks for an actor. /// + /// If the actor is pending out-of-scope death notification, pending tasks will + /// wait until the notification is received to decide whether we should + /// fail pending tasks or restart the actor. /// \param[in] actor_id Actor ID. /// \return Void. void SendPendingTasks(const ActorID &actor_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -389,6 +413,12 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { const absl::flat_hash_map> &inflight_task_callbacks) ABSL_LOCKS_EXCLUDED(mu_); + /// Restart the actor from DEAD by sending a RestartActor rpc to GCS. + void RestartActor(const ActorID &actor_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + void NotifyGCSWhenActorOutOfScope(const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstructions); + /// Pool for producing new core worker clients. rpc::CoreWorkerClientPool &core_worker_client_pool_; @@ -415,6 +445,8 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// The event loop where the actor task events are handled. instrumented_io_context &io_service_; + std::shared_ptr reference_counter_; + friend class CoreWorkerTest; }; diff --git a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc index 33572d98b7461..db3e2df27316c 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc +++ b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc @@ -54,6 +54,10 @@ void OutofOrderActorSubmitQueue::MarkTaskCanceled(uint64_t position) { sending_queue_.erase(position); } +bool OutofOrderActorSubmitQueue::Empty() { + return pending_queue_.empty() && sending_queue_.empty(); +} + void OutofOrderActorSubmitQueue::MarkDependencyResolved(uint64_t position) { // move the task from pending_requests queue to sending_requests queue. auto it = pending_queue_.find(position); diff --git a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h index 70b740a79b046..090e30006264a 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h +++ b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h @@ -68,6 +68,7 @@ class OutofOrderActorSubmitQueue : public IActorSubmitQueue { uint64_t GetSequenceNumber(const TaskSpecification &task_spec) const override; /// Mark a task has been executed on the receiver side. void MarkSeqnoCompleted(uint64_t position, const TaskSpecification &task_spec) override; + bool Empty() override; private: ActorID kActorId; diff --git a/src/ray/core_worker/transport/sequential_actor_submit_queue.cc b/src/ray/core_worker/transport/sequential_actor_submit_queue.cc index 2bb71a84f84b8..35de4e41bb99f 100644 --- a/src/ray/core_worker/transport/sequential_actor_submit_queue.cc +++ b/src/ray/core_worker/transport/sequential_actor_submit_queue.cc @@ -30,6 +30,8 @@ bool SequentialActorSubmitQueue::Contains(uint64_t sequence_no) const { return requests.find(sequence_no) != requests.end(); } +bool SequentialActorSubmitQueue::Empty() { return requests.empty(); } + const std::pair &SequentialActorSubmitQueue::Get( uint64_t sequence_no) const { auto it = requests.find(sequence_no); diff --git a/src/ray/core_worker/transport/sequential_actor_submit_queue.h b/src/ray/core_worker/transport/sequential_actor_submit_queue.h index 4c787de73fd0a..3e5e42a8119a4 100644 --- a/src/ray/core_worker/transport/sequential_actor_submit_queue.h +++ b/src/ray/core_worker/transport/sequential_actor_submit_queue.h @@ -65,6 +65,7 @@ class SequentialActorSubmitQueue : public IActorSubmitQueue { /// Mark a task has been executed on the receiver side. void MarkSeqnoCompleted(uint64_t sequence_no, const TaskSpecification &task_spec) override; + bool Empty() override; private: /// The ID of the actor. diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index 650f29b63df01..50b1326eb3f2d 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -284,6 +284,22 @@ Status ActorInfoAccessor::SyncListNamedActors( return status; } +Status ActorInfoAccessor::AsyncRestartActor(const ray::ActorID &actor_id, + uint64_t num_restarts, + const ray::gcs::StatusCallback &callback, + int64_t timeout_ms) { + rpc::RestartActorRequest request; + request.set_actor_id(actor_id.Binary()); + request.set_num_restarts(num_restarts); + client_impl_->GetGcsRpcClient().RestartActor( + request, + [callback](const Status &status, rpc::RestartActorReply &&reply) { + callback(status); + }, + timeout_ms); + return Status::OK(); +} + Status ActorInfoAccessor::AsyncRegisterActor(const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback, int64_t timeout_ms) { @@ -342,6 +358,26 @@ Status ActorInfoAccessor::AsyncCreateActor( return Status::OK(); } +Status ActorInfoAccessor::AsyncReportActorOutOfScope( + const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + const StatusCallback &callback, + int64_t timeout_ms) { + rpc::ReportActorOutOfScopeRequest request; + request.set_actor_id(actor_id.Binary()); + request.set_num_restarts_due_to_lineage_reconstruction( + num_restarts_due_to_lineage_reconstruction); + client_impl_->GetGcsRpcClient().ReportActorOutOfScope( + request, + [callback](const Status &status, rpc::ReportActorOutOfScopeReply &&reply) { + if (callback) { + callback(status); + } + }, + timeout_ms); + return Status::OK(); +} + Status ActorInfoAccessor::AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index fbb877e532434..b62912debac73 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -119,6 +119,12 @@ class ActorInfoAccessor { const std::string &ray_namespace, std::vector> &actors); + virtual Status AsyncReportActorOutOfScope( + const ActorID &actor_id, + uint64_t num_restarts_due_to_lineage_reconstruction, + const StatusCallback &callback, + int64_t timeout_ms = -1); + /// Register actor to GCS asynchronously. /// /// \param task_spec The specification for the actor creation task. @@ -129,6 +135,11 @@ class ActorInfoAccessor { const StatusCallback &callback, int64_t timeout_ms = -1); + virtual Status AsyncRestartActor(const ActorID &actor_id, + uint64_t num_restarts, + const StatusCallback &callback, + int64_t timeout_ms = -1); + /// Register actor to GCS synchronously. /// /// The RPC will timeout after the default GCS RPC timeout is exceeded. diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index e69422a05ac71..6a75ca6d1cc25 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -244,7 +244,7 @@ class GcsClientTest : public ::testing::TestWithParam { message.mutable_actor_creation_task_spec()->set_actor_id(actor_id.Binary()); message.mutable_actor_creation_task_spec()->set_is_detached(is_detached); message.mutable_actor_creation_task_spec()->set_ray_namespace("test"); - // If the actor is non-detached, the `WaitForActorOutOfScope` function of the core + // If the actor is non-detached, the `WaitForActorRefDeleted` function of the core // worker client is called during the actor registration process. In order to simulate // the scenario of registration failure, we set the address to an illegal value. if (!is_detached) { @@ -743,7 +743,7 @@ TEST_P(GcsClientTest, TestActorTableResubscribe) { auto expected_num_subscribe_one_notifications = num_subscribe_one_notifications + 1; // NOTE: In the process of actor registration, if the callback function of - // `WaitForActorOutOfScope` is executed first, and then the callback function of + // `WaitForActorRefDeleted` is executed first, and then the callback function of // `ActorTable().Put` is executed, the actor registration fails, we will receive one // notification message; otherwise, the actor registration succeeds, we will receive // two notification messages. So we can't assert whether the actor is registered diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 5b4c936557076..83f4bd41ea332 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -59,6 +59,7 @@ const ray::rpc::ActorDeathCause GenWorkerDiedCause( oom_ctx->set_error_message(disconnect_detail); } else { auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::WORKER_DIED); AddActorInfo(actor, actor_died_error_ctx); actor_died_error_ctx->set_error_message(absl::StrCat( "The actor is dead because its worker process has died. Worker exit type: ", @@ -77,6 +78,7 @@ const ray::rpc::ActorDeathCause GenOwnerDiedCause( const std::string owner_ip_address) { ray::rpc::ActorDeathCause death_cause; auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::OWNER_DIED); AddActorInfo(actor, actor_died_error_ctx); actor_died_error_ctx->set_error_message( absl::StrCat("The actor is dead because its owner has died. Owner Id: ", @@ -94,6 +96,7 @@ const ray::rpc::ActorDeathCause GenKilledByApplicationCause( const ray::gcs::GcsActor *actor) { ray::rpc::ActorDeathCause death_cause; auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::RAY_KILL); AddActorInfo(actor, actor_died_error_ctx); actor_died_error_ctx->set_error_message( "The actor is dead because it was killed by `ray.kill`."); @@ -103,46 +106,60 @@ const ray::rpc::ActorDeathCause GenKilledByApplicationCause( const ray::rpc::ActorDeathCause GenActorOutOfScopeCause(const ray::gcs::GcsActor *actor) { ray::rpc::ActorDeathCause death_cause; auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::OUT_OF_SCOPE); AddActorInfo(actor, actor_died_error_ctx); actor_died_error_ctx->set_error_message( "The actor is dead because all references to the actor were removed."); return death_cause; } -// Returns true if an actor should be dead and should not be loaded to registered_actors_. -// `true` Cases: -// 0. state==DEAD +const ray::rpc::ActorDeathCause GenActorRefDeletedCause(const ray::gcs::GcsActor *actor) { + ray::rpc::ActorDeathCause death_cause; + auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::REF_DELETED); + AddActorInfo(actor, actor_died_error_ctx); + actor_died_error_ctx->set_error_message( + "The actor is dead because all references to the actor were removed including " + "lineage ref count."); + return death_cause; +} + +// Returns true if an actor should be loaded to registered_actors_. +// `false` Cases: +// 0. state is DEAD, and is not restartable // 1. root owner is job, and job is dead // 2. root owner is another detached actor, and that actor is dead -bool OnInitializeActorShouldDie(const ray::gcs::GcsInitData &gcs_init_data, - ray::ActorID actor_id) { +bool OnInitializeActorShouldLoad(const ray::gcs::GcsInitData &gcs_init_data, + ray::ActorID actor_id) { const auto &jobs = gcs_init_data.Jobs(); const auto &actors = gcs_init_data.Actors(); const auto &actor_task_specs = gcs_init_data.ActorTaskSpecs(); const auto &actor_table_data = actors.find(actor_id); - if (actor_table_data == actors.end() || - actor_table_data->second.state() == ray::rpc::ActorTableData::DEAD) { - return true; + if (actor_table_data == actors.end()) { + return false; + } + if (actor_table_data->second.state() == ray::rpc::ActorTableData::DEAD && + !ray::gcs::IsActorRestartable(actor_table_data->second)) { + return false; } - // Since state!=DEAD it should have a task spec. const auto &actor_task_spec = ray::map_find_or_die(actor_task_specs, actor_id); ActorID root_detached_actor_id = ray::TaskSpecification(actor_task_spec).RootDetachedActorId(); if (root_detached_actor_id.IsNil()) { // owner is job, NOT detached actor, should die with job auto job_iter = jobs.find(actor_id.JobId()); - return job_iter == jobs.end() || job_iter->second.is_dead(); + return job_iter != jobs.end() && !job_iter->second.is_dead(); } else if (actor_id == root_detached_actor_id) { // owner is itself, just live on - return false; + return true; } else { // owner is another detached actor, should die with the owner actor // Root detached actor can be dead only if state() == DEAD. auto root_detached_actor_iter = actors.find(root_detached_actor_id); - return root_detached_actor_iter == actors.end() || - root_detached_actor_iter->second.state() == ray::rpc::ActorTableData::DEAD; + return root_detached_actor_iter != actors.end() && + root_detached_actor_iter->second.state() != ray::rpc::ActorTableData::DEAD; } }; @@ -275,6 +292,7 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause( ray::rpc::ActorDeathCause death_cause; auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); + actor_died_error_ctx->set_reason(ray::rpc::ActorDiedErrorContext::NODE_DIED); AddActorInfo(actor, actor_died_error_ctx); auto node_death_info = actor_died_error_ctx->mutable_node_death_info(); node_death_info->CopyFrom(node->death_info()); @@ -338,6 +356,35 @@ GcsActorManager::GcsActorManager( }); } +void GcsActorManager::HandleReportActorOutOfScope( + rpc::ReportActorOutOfScopeRequest request, + rpc::ReportActorOutOfScopeReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto actor_id = ActorID::FromBinary(request.actor_id()); + auto it = registered_actors_.find(actor_id); + if (it != registered_actors_.end()) { + auto actor = GetActor(actor_id); + if (actor->GetActorTableData().num_restarts_due_to_lineage_reconstruction() > + request.num_restarts_due_to_lineage_reconstruction()) { + // Stale report + RAY_LOG(INFO).WithField(actor_id) + << "The out of scope report is stale, the actor has been restarted."; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + return; + } + + DestroyActor(actor_id, + GenActorOutOfScopeCause(actor), + /*force_kill=*/true, + [reply, send_reply_callback]() { + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }); + } else { + RAY_LOG(INFO).WithField(actor_id) << "The out of scope actor is already dead"; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + } +} + void GcsActorManager::HandleRegisterActor(rpc::RegisterActorRequest request, rpc::RegisterActorReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -362,6 +409,68 @@ void GcsActorManager::HandleRegisterActor(rpc::RegisterActorRequest request, ++counts_[CountType::REGISTER_ACTOR_REQUEST]; } +void GcsActorManager::HandleRestartActor(rpc::RestartActorRequest request, + rpc::RestartActorReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto actor_id = ActorID::FromBinary(request.actor_id()); + RAY_LOG(INFO).WithField(actor_id.JobId()).WithField(actor_id) << "HandleRestartActor"; + auto iter = registered_actors_.find(actor_id); + if (iter == registered_actors_.end()) { + GCS_RPC_SEND_REPLY( + send_reply_callback, reply, Status::Invalid("Actor is permanently dead.")); + return; + } + + auto success_callback = [reply, send_reply_callback, actor_id]( + const std::shared_ptr &actor) { + RAY_LOG(INFO).WithField(actor_id.JobId()).WithField(actor_id) << "Restarted actor"; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }; + auto pending_restart_iter = actor_to_restart_callbacks_.find(actor_id); + if (pending_restart_iter != actor_to_restart_callbacks_.end()) { + pending_restart_iter->second.emplace_back(std::move(success_callback)); + return; + } + + auto actor = iter->second; + if (request.num_restarts() <= actor->GetActorTableData().num_restarts()) { + // This is a stale message. + success_callback(actor); + return; + } + RAY_CHECK_EQ(request.num_restarts(), actor->GetActorTableData().num_restarts() + 1); + RAY_CHECK_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + RAY_CHECK(IsActorRestartable(actor->GetActorTableData())); + + actor_to_restart_callbacks_[actor_id].emplace_back(std::move(success_callback)); + actor->GetMutableActorTableData()->set_num_restarts_due_to_lineage_reconstruction( + actor->GetActorTableData().num_restarts_due_to_lineage_reconstruction() + 1); + RestartActor( + actor_id, + /*need_reschedule=*/true, + GenActorOutOfScopeCause(actor.get()), + [this, actor]() { + // If a creator dies before this callback is called, the actor could have + // been already destroyed. It is okay not to invoke a callback because we + // don't need to reply to the creator as it is already dead. + auto registered_actor_it = registered_actors_.find(actor->GetActorID()); + if (registered_actor_it == registered_actors_.end()) { + // NOTE(sang): This logic assumes that the ordering of backend call is + // guaranteed. It is currently true because we use a single TCP socket + // to call the default Redis backend. If ordering is not guaranteed, we + // should overwrite the actor state to DEAD to avoid race condition. + return; + } + auto iter = actor_to_restart_callbacks_.find(actor->GetActorID()); + RAY_CHECK(iter != actor_to_restart_callbacks_.end() && !iter->second.empty()); + auto callbacks = std::move(iter->second); + actor_to_restart_callbacks_.erase(iter); + for (auto &callback : callbacks) { + callback(actor); + } + }); +} + void GcsActorManager::HandleCreateActor(rpc::CreateActorRequest request, rpc::CreateActorReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -543,7 +652,8 @@ void GcsActorManager::HandleGetNamedActorInfo( Status status = Status::OK(); auto iter = registered_actors_.find(actor_id); - if (actor_id.IsNil() || iter == registered_actors_.end()) { + if (actor_id.IsNil() || iter == registered_actors_.end() || + iter->second->GetState() == rpc::ActorTableData::DEAD) { // The named actor was not found or the actor is already removed. std::stringstream stream; stream << "Actor with name '" << name << "' was not found."; @@ -674,7 +784,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ if (!actor->IsDetached()) { // This actor is owned. Send a long polling request to the actor's // owner to determine when the actor should be removed. - PollOwnerForActorOutOfScope(actor); + PollOwnerForActorRefDeleted(actor); } else { // If it's a detached actor, we need to register the runtime env it used to GC. runtime_env_manager_.AddURIReference(actor->GetActorID().Hex(), @@ -837,21 +947,29 @@ std::vector> GcsActorManager::ListNamedActor if (all_namespaces) { for (const auto &namespace_it : named_actors_) { for (const auto &actor_it : namespace_it.second) { - actors.push_back(std::make_pair(namespace_it.first, actor_it.first)); + auto iter = registered_actors_.find(actor_it.second); + if (iter != registered_actors_.end() && + iter->second->GetState() != rpc::ActorTableData::DEAD) { + actors.push_back(std::make_pair(namespace_it.first, actor_it.first)); + } } } } else { auto namespace_it = named_actors_.find(ray_namespace); if (namespace_it != named_actors_.end()) { for (const auto &actor_it : namespace_it->second) { - actors.push_back(std::make_pair(namespace_it->first, actor_it.first)); + auto iter = registered_actors_.find(actor_it.second); + if (iter != registered_actors_.end() && + iter->second->GetState() != rpc::ActorTableData::DEAD) { + actors.push_back(std::make_pair(namespace_it->first, actor_it.first)); + } } } } return actors; } -void GcsActorManager::PollOwnerForActorOutOfScope( +void GcsActorManager::PollOwnerForActorRefDeleted( const std::shared_ptr &actor) { const auto &actor_id = actor->GetActorID(); const auto &owner_node_id = actor->GetOwnerNodeID(); @@ -867,20 +985,20 @@ void GcsActorManager::PollOwnerForActorOutOfScope( } it->second.children_actor_ids.insert(actor_id); - rpc::WaitForActorOutOfScopeRequest wait_request; + rpc::WaitForActorRefDeletedRequest wait_request; wait_request.set_intended_worker_id(owner_id.Binary()); wait_request.set_actor_id(actor_id.Binary()); - it->second.client->WaitForActorOutOfScope( + it->second.client->WaitForActorRefDeleted( wait_request, [this, owner_node_id, owner_id, actor_id]( - Status status, const rpc::WaitForActorOutOfScopeReply &reply) { + Status status, const rpc::WaitForActorRefDeletedReply &reply) { if (!status.ok()) { RAY_LOG(INFO) << "Worker " << owner_id << " failed, destroying actor child, job id = " << actor_id.JobId(); } else { RAY_LOG(INFO) << "Actor " << actor_id - << " is out of scope, destroying actor, job id = " + << " has no references, destroying actor, job id = " << actor_id.JobId(); } @@ -888,57 +1006,41 @@ void GcsActorManager::PollOwnerForActorOutOfScope( if (node_it != owners_.end() && node_it->second.count(owner_id)) { // Only destroy the actor if its owner is still alive. The actor may // have already been destroyed if the owner died. - DestroyActor( - actor_id, GenActorOutOfScopeCause(GetActor(actor_id)), /*force_kill=*/true); + DestroyActor(actor_id, + GenActorRefDeletedCause(GetActor(actor_id)), + /*force_kill=*/true); } }); } void GcsActorManager::DestroyActor(const ActorID &actor_id, const rpc::ActorDeathCause &death_cause, - bool force_kill) { + bool force_kill, + std::function done_callback) { RAY_LOG(INFO).WithField(actor_id.JobId()).WithField(actor_id) << "Destroying actor"; actor_to_register_callbacks_.erase(actor_id); + actor_to_restart_callbacks_.erase(actor_id); auto it = registered_actors_.find(actor_id); if (it == registered_actors_.end()) { RAY_LOG(INFO).WithField(actor_id) << "Tried to destroy actor that does not exist"; + if (done_callback) { + done_callback(); + } return; } gcs_actor_scheduler_->OnActorDestruction(it->second); it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); - AddDestroyedActorToCache(it->second); - const auto actor = std::move(it->second); + const auto actor = it->second; - registered_actors_.erase(it); RAY_LOG(DEBUG) << "Try to kill actor " << actor->GetActorID() << ", with status " << actor->GetState() << ", name " << actor->GetName(); - // Clean up the client to the actor's owner, if necessary. - if (!actor->IsDetached()) { - RemoveActorFromOwner(actor); - } else { - runtime_env_manager_.RemoveURIReference(actor_id.Hex()); - } - function_manager_.RemoveJobReference(actor_id.JobId()); - RemoveActorNameFromRegistry(actor); - // The actor is already dead, most likely due to process or node failure. - if (actor->GetState() == rpc::ActorTableData::DEAD) { - RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << "is already dead," - << "skipping kill request."; - // Inform all creation callbacks that the actor is dead and that actor creation is - // therefore cancelled. - RunAndClearActorCreationCallbacks( - actor, - rpc::PushTaskReply(), - Status::SchedulingCancelled("Actor creation cancelled.")); - return; - } if (actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) { // The actor creation task still has unresolved dependencies. Remove from the // unresolved actors map. RemoveUnresolvedActor(actor); - } else { + } else if (actor->GetState() != rpc::ActorTableData::DEAD) { // The actor is still alive or pending creation. Clean up all remaining // state. const auto &node_id = actor->GetNodeID(); @@ -967,11 +1069,37 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, // TODO(swang): We can skip this step and delete the actor table entry // entirely if the callers check directly whether the owner is still alive. auto mutable_actor_table_data = actor->GetMutableActorTableData(); - actor->UpdateState(rpc::ActorTableData::DEAD); auto time = current_sys_time_ms(); - mutable_actor_table_data->set_end_time(time); + if (actor->GetState() != rpc::ActorTableData::DEAD) { + actor->UpdateState(rpc::ActorTableData::DEAD); + mutable_actor_table_data->set_end_time(time); + mutable_actor_table_data->mutable_death_cause()->CopyFrom(death_cause); + } else if (mutable_actor_table_data->death_cause().context_case() == + ContextCase::kActorDiedErrorContext && + death_cause.context_case() == ContextCase::kActorDiedErrorContext && + mutable_actor_table_data->death_cause() + .actor_died_error_context() + .reason() == rpc::ActorDiedErrorContext::OUT_OF_SCOPE && + death_cause.actor_died_error_context().reason() == + rpc::ActorDiedErrorContext::REF_DELETED) { + // Update death cause from restartable OUT_OF_SCOPE to non-restartable REF_DELETED + mutable_actor_table_data->mutable_death_cause()->CopyFrom(death_cause); + } mutable_actor_table_data->set_timestamp(time); - mutable_actor_table_data->mutable_death_cause()->CopyFrom(death_cause); + + const bool is_restartable = IsActorRestartable(*mutable_actor_table_data); + if (!is_restartable) { + AddDestroyedActorToCache(it->second); + registered_actors_.erase(it); + function_manager_.RemoveJobReference(actor_id.JobId()); + RemoveActorNameFromRegistry(actor); + // Clean up the client to the actor's owner, if necessary. + if (!actor->IsDetached()) { + RemoveActorFromOwner(actor); + } else { + runtime_env_manager_.RemoveURIReference(actor_id.Hex()); + } + } auto actor_table_data = std::make_shared(*mutable_actor_table_data); @@ -979,10 +1107,21 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor_table_data, - [this, actor, actor_id, actor_table_data](Status status) { + [this, + actor, + actor_id, + actor_table_data, + is_restartable, + done_callback = std::move(done_callback)](Status status) { + if (done_callback) { + done_callback(); + } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); - RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + if (!is_restartable) { + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + } actor->WriteActorExportEvent(); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); @@ -1123,7 +1262,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id, } // Otherwise, try to reconstruct the actor that was already created or in the creation // process. - ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause); + RestartActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause); } void GcsActorManager::OnNodeDead(std::shared_ptr node, @@ -1153,9 +1292,9 @@ void GcsActorManager::OnNodeDead(std::shared_ptr node, // Cancel scheduling actors that haven't been created on the node. auto scheduling_actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id); for (auto &actor_id : scheduling_actor_ids) { - ReconstructActor(actor_id, - /*need_reschedule=*/true, - GenNodeDiedCause(GetActor(actor_id), node_ip_address, node)); + RestartActor(actor_id, + /*need_reschedule=*/true, + GenNodeDiedCause(GetActor(actor_id), node_ip_address, node)); } // Try reconstructing all workers created on the node. @@ -1166,9 +1305,9 @@ void GcsActorManager::OnNodeDead(std::shared_ptr node, created_actors_.erase(iter); for (auto &entry : created_actors) { // Reconstruct the removed actor. - ReconstructActor(entry.second, - /*need_reschedule=*/true, - GenNodeDiedCause(GetActor(entry.second), node_ip_address, node)); + RestartActor(entry.second, + /*need_reschedule=*/true, + GenNodeDiedCause(GetActor(entry.second), node_ip_address, node)); } } @@ -1216,15 +1355,19 @@ void GcsActorManager::SetPreemptedAndPublish(const NodeID &node_id) { } } -void GcsActorManager::ReconstructActor(const ActorID &actor_id, - bool need_reschedule, - const rpc::ActorDeathCause &death_cause) { +void GcsActorManager::RestartActor(const ActorID &actor_id, + bool need_reschedule, + const rpc::ActorDeathCause &death_cause, + std::function done_callback) { // If the owner and this actor is dead at the same time, the actor - // could've been destroyed and dereigstered before reconstruction. + // could've been destroyed and dereigstered before restart. auto iter = registered_actors_.find(actor_id); if (iter == registered_actors_.end()) { - RAY_LOG(DEBUG) << "Actor is destroyed before reconstruction, actor id = " << actor_id + RAY_LOG(DEBUG) << "Actor is destroyed before restart, actor id = " << actor_id << ", job id = " << actor_id.JobId(); + if (done_callback) { + done_callback(); + } return; } auto &actor = iter->second; @@ -1267,7 +1410,10 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor, actor_id, mutable_actor_table_data](Status status) { + [this, actor, actor_id, mutable_actor_table_data, done_callback](Status status) { + if (done_callback) { + done_callback(); + } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); actor->WriteActorExportEvent(); @@ -1285,13 +1431,17 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor, actor_id, mutable_actor_table_data, death_cause](Status status) { + [this, actor, actor_id, mutable_actor_table_data, death_cause, done_callback]( + Status status) { // If actor was an detached actor, make sure to destroy it. // We need to do this because detached actors are not destroyed // when its owners are dead because it doesn't have owners. if (actor->IsDetached()) { DestroyActor(actor_id, death_cause); } + if (done_callback) { + done_callback(); + } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); RAY_CHECK_OK( @@ -1300,8 +1450,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor - // once the owner fails or notifies us that the actor's handle has gone out - // of scope. + // once the owner fails or notifies us that the actor has no references. } } @@ -1357,10 +1506,12 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac auto actor_id = actor->GetActorID(); liftime_num_created_actors_++; // NOTE: If an actor is deleted immediately after the user creates the actor, reference - // counter may return a reply to the request of WaitForActorOutOfScope to GCS server, + // counter may ReportActorOutOfScope to GCS server, // and GCS server will destroy the actor. The actor creation is asynchronous, it may be // destroyed before the actor creation is completed. - if (registered_actors_.count(actor_id) == 0) { + auto registered_actor_it = registered_actors_.find(actor_id); + if (registered_actor_it == registered_actors_.end() || + registered_actor_it->second->GetState() == rpc::ActorTableData::DEAD) { return; } @@ -1432,7 +1583,7 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { // - Actors whose state != DEAD. // - Non-deatched actors whose owner (job or root_detached_actor) is alive. // - Detached actors which lives even when their original owner is dead. - if (!OnInitializeActorShouldDie(gcs_init_data, actor_id)) { + if (OnInitializeActorShouldLoad(gcs_init_data, actor_id)) { const auto &actor_task_spec = map_find_or_die(actor_task_specs, actor_id); auto actor = std::make_shared( actor_table_data, actor_task_spec, actor_state_counter_); @@ -1458,7 +1609,7 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { if (!actor->IsDetached()) { // This actor is owned. Send a long polling request to the actor's // owner to determine when the actor should be removed. - PollOwnerForActorOutOfScope(actor); + PollOwnerForActorRefDeleted(actor); } if (!actor->GetWorkerID().IsNil()) { @@ -1604,9 +1755,9 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) { actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill); } CancelActorInScheduling(actor, task_id); - ReconstructActor(actor_id, - /*need_reschedule=*/true, - GenKilledByApplicationCause(GetActor(actor_id))); + RestartActor(actor_id, + /*need_reschedule=*/true, + GenKilledByApplicationCause(GetActor(actor_id))); } } @@ -1726,6 +1877,7 @@ std::string GcsActorManager::DebugString() const { << "\n- Created actors count: " << created_actors_.size() << "\n- owners_: " << owners_.size() << "\n- actor_to_register_callbacks_: " << actor_to_register_callbacks_.size() + << "\n- actor_to_restart_callbacks_: " << actor_to_restart_callbacks_.size() << "\n- actor_to_create_callbacks_: " << actor_to_create_callbacks_.size() << "\n- sorted_destroyed_actor_list_: " << sorted_destroyed_actor_list_.size(); return stream.str(); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index c26834599671b..c05ba9ebd0dcb 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -88,6 +88,7 @@ class GcsActor { actor_table_data_.set_job_id(task_spec.job_id()); actor_table_data_.set_max_restarts(actor_creation_task_spec.max_actor_restarts()); actor_table_data_.set_num_restarts(0); + actor_table_data_.set_num_restarts_due_to_lineage_reconstruction(0); actor_table_data_.mutable_function_descriptor()->CopyFrom( task_spec.function_descriptor()); @@ -254,6 +255,7 @@ class GcsActor { }; using RegisterActorCallback = std::function)>; +using RestartActorCallback = std::function)>; using CreateActorCallback = std::function, const rpc::PushTaskReply &reply, const Status &status)>; @@ -263,10 +265,11 @@ using CreateActorCallback = std::function /// --->DEPENDENCIES_UNREADY--->PENDING_CREATION--->ALIVE RESTARTING -/// | | | <--- | -/// 8 | 7 | 6 | 4 | 5 +/// | | | <--- ^ +/// 8 | 7 | 6 | 4 | 9 /// | v | | /// ------------------> DEAD <------------------------- +/// 5 /// /// 0: When GCS receives a `RegisterActor` request from core worker, it will add an actor /// to `registered_actors_` and `unresolved_actors_`. @@ -300,6 +303,7 @@ using CreateActorCallback = std::function &actor); + void PollOwnerForActorRefDeleted(const std::shared_ptr &actor); /// Destroy an actor that has gone out of scope. This cleans up all local /// state associated with the actor and marks the actor as dead. For owned @@ -512,9 +524,11 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param[in] actor_id The actor id to destroy. /// \param[in] death_cause The reason why actor is destroyed. /// \param[in] force_kill Whether destory the actor forcelly. + /// \param[in] done_callback Called when destroy finishes. void DestroyActor(const ActorID &actor_id, const rpc::ActorDeathCause &death_cause, - bool force_kill = true); + bool force_kill = true, + std::function done_callback = nullptr); /// Get unresolved actors that were submitted from the specified node. absl::flat_hash_map> @@ -532,9 +546,10 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// again. /// \param death_cause Context about why this actor is dead. Should only be set when /// need_reschedule=false. - void ReconstructActor(const ActorID &actor_id, - bool need_reschedule, - const rpc::ActorDeathCause &death_cause); + void RestartActor(const ActorID &actor_id, + bool need_reschedule, + const rpc::ActorDeathCause &death_cause, + std::function done_callback = nullptr); /// Remove the specified actor from `unresolved_actors_`. /// @@ -574,6 +589,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { actor_delta->mutable_death_cause()->CopyFrom(actor.death_cause()); actor_delta->mutable_address()->CopyFrom(actor.address()); actor_delta->set_num_restarts(actor.num_restarts()); + actor_delta->set_max_restarts(actor.max_restarts()); actor_delta->set_timestamp(actor.timestamp()); actor_delta->set_pid(actor.pid()); actor_delta->set_start_time(actor.start_time()); @@ -632,6 +648,11 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// messages from a driver/worker caused by some network problems. absl::flat_hash_map> actor_to_register_callbacks_; + /// Callbacks of pending `RestartActor` requests. + /// Maps actor ID to actor restart callbacks, which is used to filter duplicated + /// messages from a driver/worker caused by some network problems. + absl::flat_hash_map> + actor_to_restart_callbacks_; /// Callbacks of actor creation requests. /// Maps actor ID to actor creation callbacks, which is used to filter duplicated /// messages come from a Driver/Worker caused by some network problems. diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc index 0d2dc48cc7631..2e91f34bcf73b 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc @@ -75,9 +75,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} - void WaitForActorOutOfScope( - const rpc::WaitForActorOutOfScopeRequest &request, - const rpc::ClientCallback &callback) override { + void WaitForActorRefDeleted( + const rpc::WaitForActorRefDeletedRequest &request, + const rpc::ClientCallback &callback) override { callbacks_.push_back(callback); } @@ -93,12 +93,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The created_actors_ of gcs actor manager will be modified in io_service thread. // In order to avoid multithreading reading and writing created_actors_, we also - // send the `WaitForActorOutOfScope` callback operation to io_service thread. + // send the `WaitForActorRefDeleted` callback operation to io_service thread. std::promise promise; io_service_.post( [this, status, &promise]() { auto callback = callbacks_.front(); - auto reply = rpc::WaitForActorOutOfScopeReply(); + auto reply = rpc::WaitForActorRefDeletedReply(); callback(status, std::move(reply)); promise.set_value(false); }, @@ -109,7 +109,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return true; } - std::list> callbacks_; + std::list> callbacks_; std::vector killed_actors_; instrumented_io_context &io_service_; }; @@ -308,7 +308,8 @@ TEST_F(GcsActorManagerTest, TestBasic) { // Verify death cause for last actor DEAD event ASSERT_EQ( event_data["death_cause"]["actor_died_error_context"]["error_message"], - "The actor is dead because all references to the actor were removed."); + "The actor is dead because all references to the actor were removed " + "including lineage ref count."); } } return; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index cb4f1bec4deaa..4297a43a9f0bf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -72,9 +72,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} - void WaitForActorOutOfScope( - const rpc::WaitForActorOutOfScopeRequest &request, - const rpc::ClientCallback &callback) override { + void WaitForActorRefDeleted( + const rpc::WaitForActorRefDeletedRequest &request, + const rpc::ClientCallback &callback) override { callbacks_.push_back(callback); } @@ -90,12 +90,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The created_actors_ of gcs actor manager will be modified in io_service thread. // In order to avoid multithreading reading and writing created_actors_, we also - // send the `WaitForActorOutOfScope` callback operation to io_service thread. + // send the `WaitForActorRefDeleted` callback operation to io_service thread. std::promise promise; io_service_.post( [this, status, &promise]() { auto callback = callbacks_.front(); - auto reply = rpc::WaitForActorOutOfScopeReply(); + auto reply = rpc::WaitForActorRefDeletedReply(); callback(status, std::move(reply)); promise.set_value(false); }, @@ -106,7 +106,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return true; } - std::list> callbacks_; + std::list> callbacks_; std::vector killed_actors_; instrumented_io_context &io_service_; }; @@ -904,7 +904,7 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { auto actor = mock_actor_scheduler_->actors.back(); mock_actor_scheduler_->actors.clear(); - // Simulate the reply of WaitForActorOutOfScope request to trigger actor destruction. + // Simulate the reply of WaitForActorRefDeleted request to trigger actor destruction. ASSERT_TRUE(worker_client_->Reply()); // Check that the actor is in state `DEAD`. diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 92de8dec69158..cb3c518072b2e 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -189,6 +189,15 @@ inline std::string GenErrorMessageFromDeathCause( } } +inline bool IsActorRestartable(const rpc::ActorTableData &actor) { + RAY_CHECK_EQ(actor.state(), rpc::ActorTableData::DEAD); + return actor.death_cause().context_case() == ContextCase::kActorDiedErrorContext && + actor.death_cause().actor_died_error_context().reason() == + rpc::ActorDiedErrorContext::OUT_OF_SCOPE && + ((actor.max_restarts() == -1) || + (static_cast(actor.num_restarts()) < actor.max_restarts())); +} + inline std::string RayErrorInfoToString(const ray::rpc::RayErrorInfo &error_info) { std::stringstream ss; ss << "Error type " << error_info.error_type() << " exception string " diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index b89f8665b7b39..dbd2fee683ee9 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -358,6 +358,16 @@ message ActorUnschedulableContext { // Note: ActorDiedErrorContext is used in the export API and state API so it is public. // Any modifications must be backward compatible. message ActorDiedErrorContext { + enum Reason { + UNSPECIFIED = 0; + WORKER_DIED = 1; + OWNER_DIED = 2; + NODE_DIED = 3; + RAY_KILL = 4; + OUT_OF_SCOPE = 5; + REF_DELETED = 6; + } + string error_message = 1; // The id of owner of the actor. bytes owner_id = 2; @@ -380,6 +390,7 @@ message ActorDiedErrorContext { bool never_started = 10; // The node death info, if node death is the cause of actor death. optional NodeDeathInfo node_death_info = 11; + Reason reason = 12; } // Context for task OOM. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index c31278f5784f2..caa869ce18e98 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -180,14 +180,14 @@ message GetObjectStatusReply { uint64 object_size = 4; } -message WaitForActorOutOfScopeRequest { +message WaitForActorRefDeletedRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; - // ActorID of the actor in scope. + // ActorID of the actor to wait for. bytes actor_id = 2; } -message WaitForActorOutOfScopeReply {} +message WaitForActorRefDeletedReply {} message UpdateObjectLocationBatchRequest { bytes intended_worker_id = 1; @@ -447,11 +447,11 @@ service CoreWorkerService { returns (DirectActorCallArgWaitCompleteReply); // Ask the object's owner about the object's current status. rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply); - // Wait for the actor's owner to decide that the actor has gone out of scope. + // Wait for the actor's owner to decide that the actor has no references. // Replying to this message indicates that the client should force-kill the - // actor process, if still alive. - rpc WaitForActorOutOfScope(WaitForActorOutOfScopeRequest) - returns (WaitForActorOutOfScopeReply); + // actor process, if still alive and mark the actor as permanently dead. + rpc WaitForActorRefDeleted(WaitForActorRefDeletedRequest) + returns (WaitForActorRefDeletedReply); /// The long polling request sent to the core worker for pubsub operations. /// It is replied once there are batch of objects that need to be published to /// the caller (subscriber). diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index ccc5807c6ff87..b34f5bd9f39b4 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -153,6 +153,8 @@ message ActorTableData { string repr_name = 31; // Whether the actor was on a preempted node bool preempted = 32; + // Number of times this actor is restarted due to lineage reconstructions. + uint64 num_restarts_due_to_lineage_reconstruction = 33; } message ErrorTableData { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 68b3fb5f8d4e4..ddd7f331ddd91 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -159,10 +159,23 @@ message KillActorViaGcsReply { GcsStatus status = 1; } +message ReportActorOutOfScopeRequest { + // ID of this actor. + bytes actor_id = 1; + // The actor after this number of lineage reconstructions is out of scope. + // This is used to filter out stale message. + uint64 num_restarts_due_to_lineage_reconstruction = 2; +} + +message ReportActorOutOfScopeReply { + GcsStatus status = 1; +} + // Service for actor info access. service ActorInfoGcsService { // Register actor to gcs service. rpc RegisterActor(RegisterActorRequest) returns (RegisterActorReply); + rpc RestartActor(RestartActorRequest) returns (RestartActorReply); // Create actor which local dependencies are resolved. rpc CreateActor(CreateActorRequest) returns (CreateActorReply); // Get actor data from GCS Service by actor id. @@ -175,6 +188,8 @@ service ActorInfoGcsService { rpc GetAllActorInfo(GetAllActorInfoRequest) returns (GetAllActorInfoReply); // Kill actor via GCS Service. rpc KillActorViaGcs(KillActorViaGcsRequest) returns (KillActorViaGcsReply); + rpc ReportActorOutOfScope(ReportActorOutOfScopeRequest) + returns (ReportActorOutOfScopeReply); } message GetClusterIdRequest {} @@ -377,6 +392,16 @@ message RegisterActorReply { GcsStatus status = 1; } +message RestartActorRequest { + bytes actor_id = 1; + // The target number of restarts. + uint64 num_restarts = 2; +} + +message RestartActorReply { + GcsStatus status = 1; +} + message CreatePlacementGroupRequest { PlacementGroupSpec placement_group_spec = 1; } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 08c111d7463a1..c46fbeb5059b6 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -322,6 +322,16 @@ class GcsRpcClient { actor_info_grpc_client_, /*method_timeout_ms*/ -1, ) + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, + ReportActorOutOfScope, + actor_info_grpc_client_, + /*method_timeout_ms*/ -1, ) + + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, + RestartActor, + actor_info_grpc_client_, + /*method_timeout_ms*/ -1, ) + /// Create actor via GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, CreateActor, diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 133301676b88e..be7247bff4bf0 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -221,6 +221,10 @@ class ActorInfoGcsServiceHandler { RegisterActorReply *reply, SendReplyCallback send_reply_callback) = 0; + virtual void HandleRestartActor(RestartActorRequest request, + RestartActorReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleCreateActor(CreateActorRequest request, CreateActorReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -244,6 +248,10 @@ class ActorInfoGcsServiceHandler { virtual void HandleKillActorViaGcs(KillActorViaGcsRequest request, KillActorViaGcsReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleReportActorOutOfScope(ReportActorOutOfScopeRequest request, + ReportActorOutOfScopeReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `ActorInfoGcsService`. @@ -266,6 +274,7 @@ class ActorInfoGrpcService : public GrpcService { /// Register/Create Actor RPC takes long time, we shouldn't limit them to avoid /// distributed deadlock. ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActor, -1); + ACTOR_INFO_SERVICE_RPC_HANDLER(RestartActor, -1); ACTOR_INFO_SERVICE_RPC_HANDLER(CreateActor, -1); /// Others need back pressure. @@ -279,6 +288,8 @@ class ActorInfoGrpcService : public GrpcService { GetAllActorInfo, RayConfig::instance().gcs_max_active_rpcs_per_handler()); ACTOR_INFO_SERVICE_RPC_HANDLER( KillActorViaGcs, RayConfig::instance().gcs_max_active_rpcs_per_handler()); + ACTOR_INFO_SERVICE_RPC_HANDLER( + ReportActorOutOfScope, RayConfig::instance().gcs_max_active_rpcs_per_handler()); } private: diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 5de94fcc2dd22..3dc55da1c8dbf 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -122,10 +122,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { virtual void GetObjectStatus(const GetObjectStatusRequest &request, const ClientCallback &callback) {} - /// Ask the actor's owner to reply when the actor has gone out of scope. - virtual void WaitForActorOutOfScope( - const WaitForActorOutOfScopeRequest &request, - const ClientCallback &callback) {} + /// Ask the actor's owner to reply when the actor has no references. + virtual void WaitForActorRefDeleted( + const WaitForActorRefDeletedRequest &request, + const ClientCallback &callback) {} /// Send a long polling request to a core worker for pubsub operations. virtual void PubsubLongPolling(const PubsubLongPollingRequest &request, @@ -256,7 +256,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, override) VOID_RPC_CLIENT_METHOD(CoreWorkerService, - WaitForActorOutOfScope, + WaitForActorRefDeleted, grpc_client_, /*method_timeout_ms*/ -1, override) diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 29d382d1910c8..468a5ceba426a 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -37,7 +37,7 @@ namespace rpc { RAY_CORE_WORKER_RPC_SERVICE_HANDLER(DirectActorCallArgWaitComplete) \ RAY_CORE_WORKER_RPC_SERVICE_HANDLER(RayletNotifyGCSRestart) \ RAY_CORE_WORKER_RPC_SERVICE_HANDLER(GetObjectStatus) \ - RAY_CORE_WORKER_RPC_SERVICE_HANDLER(WaitForActorOutOfScope) \ + RAY_CORE_WORKER_RPC_SERVICE_HANDLER(WaitForActorRefDeleted) \ RAY_CORE_WORKER_RPC_SERVICE_HANDLER(PubsubLongPolling) \ RAY_CORE_WORKER_RPC_SERVICE_HANDLER(PubsubCommandBatch) \ RAY_CORE_WORKER_RPC_SERVICE_HANDLER(UpdateObjectLocationBatch) \ @@ -63,7 +63,7 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DirectActorCallArgWaitComplete) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RayletNotifyGCSRestart) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetObjectStatus) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForActorOutOfScope) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForActorRefDeleted) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PubsubLongPolling) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PubsubCommandBatch) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(UpdateObjectLocationBatch) \