From ed81edd01821c05a69c7046658e07c118424d70b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 6 Jul 2024 06:51:02 -0700 Subject: [PATCH 1/7] Only delete object from memory store if recovery happens Signed-off-by: Jiajun Yao --- src/ray/core_worker/core_worker.cc | 7 +------ src/ray/core_worker/object_recovery_manager.cc | 10 +++++++++- src/ray/core_worker/object_recovery_manager.h | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e18955146e45..eec6131da3af 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -663,17 +663,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_LOG(ERROR) << ":info_message: Attempting to recover " << lost_objects.size() << " lost objects by resubmitting their tasks. To disable " << "object reconstruction, set @ray.remote(max_retries=0)."; - // Delete the objects from the in-memory store to indicate that they are not - // available. The object recovery manager will guarantee that a new value - // will eventually be stored for the objects (either an - // UnreconstructableError or a value reconstructed from lineage). - memory_store_->Delete(lost_objects); for (const auto &object_id : lost_objects) { // NOTE(swang): There is a race condition where this can return false if // the reference went out of scope since the call to the ref counter to get // the lost objects. It's okay to not mark the object as failed or recover // the object since there are no reference holders. - RAY_UNUSED(object_recovery_manager_->RecoverObject(object_id)); + RAY_UNUSED(object_recovery_manager_->RecoverObject(object_id, true)); } } }, diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 8ba0b2128181..2e801dbfb47a 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -19,7 +19,8 @@ namespace ray { namespace core { -bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { +bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id, + bool delete_from_memory_store_if_recovering) { if (object_id.TaskId().IsForActorCreationTask()) { // The GCS manages all actor restarts, so we should never try to // reconstruct an actor here. @@ -51,6 +52,13 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { // duplicate restarts of the same object. already_pending_recovery = !objects_pending_recovery_.insert(object_id).second; } + if (delete_from_memory_store_if_recovering) { + // Delete the objects from the in-memory store to indicate that they are not + // available. The object recovery manager will guarantee that a new value + // will eventually be stored for the objects (either an + // UnreconstructableError or a value reconstructed from lineage). + in_memory_store_->Delete({object_id}); + } } if (!already_pending_recovery) { diff --git a/src/ray/core_worker/object_recovery_manager.h b/src/ray/core_worker/object_recovery_manager.h index 15cf18503f09..15ac2b2a23a6 100644 --- a/src/ray/core_worker/object_recovery_manager.h +++ b/src/ray/core_worker/object_recovery_manager.h @@ -87,7 +87,8 @@ class ObjectRecoveryManager { /// about the object. If this returns true, then eventually recovery will /// either succeed (a value will be put into the memory store) or fail (the /// reconstruction failure callback will be called for this object). - bool RecoverObject(const ObjectID &object_id); + bool RecoverObject(const ObjectID &object_id, + bool delete_from_memory_store_if_recovering = false); private: /// Pin a new copy for a lost object from the given locations or, if that From ca58085af103492abc677b586d0afc7c08b735c6 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 6 Jul 2024 22:32:50 -0700 Subject: [PATCH 2/7] up Signed-off-by: Jiajun Yao --- src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/object_recovery_manager.cc | 16 +++++++--------- src/ray/core_worker/object_recovery_manager.h | 3 +-- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index eec6131da3af..4a70f95a17a9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -668,7 +668,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // the reference went out of scope since the call to the ref counter to get // the lost objects. It's okay to not mark the object as failed or recover // the object since there are no reference holders. - RAY_UNUSED(object_recovery_manager_->RecoverObject(object_id, true)); + RAY_UNUSED(object_recovery_manager_->RecoverObject(object_id)); } } }, diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 2e801dbfb47a..b02d17e65367 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -19,8 +19,7 @@ namespace ray { namespace core { -bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id, - bool delete_from_memory_store_if_recovering) { +bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { if (object_id.TaskId().IsForActorCreationTask()) { // The GCS manages all actor restarts, so we should never try to // reconstruct an actor here. @@ -46,19 +45,18 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id, bool already_pending_recovery = true; bool requires_recovery = pinned_at.IsNil() && !spilled; if (requires_recovery) { + // Delete the objects from the in-memory store to indicate that they are not + // available. The object recovery manager will guarantee that a new value + // will eventually be stored for the objects (either an + // UnreconstructableError or a value reconstructed from lineage). + in_memory_store_->Delete({object_id}); + { absl::MutexLock lock(&mu_); // Mark that we are attempting recovery for this object to prevent // duplicate restarts of the same object. already_pending_recovery = !objects_pending_recovery_.insert(object_id).second; } - if (delete_from_memory_store_if_recovering) { - // Delete the objects from the in-memory store to indicate that they are not - // available. The object recovery manager will guarantee that a new value - // will eventually be stored for the objects (either an - // UnreconstructableError or a value reconstructed from lineage). - in_memory_store_->Delete({object_id}); - } } if (!already_pending_recovery) { diff --git a/src/ray/core_worker/object_recovery_manager.h b/src/ray/core_worker/object_recovery_manager.h index 15ac2b2a23a6..15cf18503f09 100644 --- a/src/ray/core_worker/object_recovery_manager.h +++ b/src/ray/core_worker/object_recovery_manager.h @@ -87,8 +87,7 @@ class ObjectRecoveryManager { /// about the object. If this returns true, then eventually recovery will /// either succeed (a value will be put into the memory store) or fail (the /// reconstruction failure callback will be called for this object). - bool RecoverObject(const ObjectID &object_id, - bool delete_from_memory_store_if_recovering = false); + bool RecoverObject(const ObjectID &object_id); private: /// Pin a new copy for a lost object from the given locations or, if that From 1ddedc4db2bc0b2da0aa98e109acbaf128caa351 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sun, 7 Jul 2024 14:06:58 -0700 Subject: [PATCH 3/7] debug Signed-off-by: Jiajun Yao --- python/ray/tests/test_reconstruction_2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/tests/test_reconstruction_2.py b/python/ray/tests/test_reconstruction_2.py index 19e6adce0faa..bed15ae2e91d 100644 --- a/python/ray/tests/test_reconstruction_2.py +++ b/python/ray/tests/test_reconstruction_2.py @@ -335,6 +335,7 @@ def test_memory_util(config, ray_start_cluster): node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8 ) + print(f"node1 {node_to_kill.address_info} {node_to_kill.node_manager_port}") cluster.wait_for_nodes() @ray.remote @@ -386,6 +387,7 @@ def stats(): node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8 ) + print(f"node2 {node_to_kill.address_info} {node_to_kill.node_manager_port}") ref = dependent_task.remote(x, sema) wait_for_condition(lambda: stats() == (1, 1, 0)) From 2bc7714f82d702e7e40e6781aba4e1e6f80bbdb4 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 8 Jul 2024 08:01:10 -0700 Subject: [PATCH 4/7] up Signed-off-by: Jiajun Yao --- python/ray/tests/test_reconstruction_2.py | 2 -- src/ray/core_worker/core_worker.cc | 5 +++++ src/ray/core_worker/object_recovery_manager.cc | 8 ++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/python/ray/tests/test_reconstruction_2.py b/python/ray/tests/test_reconstruction_2.py index bed15ae2e91d..19e6adce0faa 100644 --- a/python/ray/tests/test_reconstruction_2.py +++ b/python/ray/tests/test_reconstruction_2.py @@ -335,7 +335,6 @@ def test_memory_util(config, ray_start_cluster): node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8 ) - print(f"node1 {node_to_kill.address_info} {node_to_kill.node_manager_port}") cluster.wait_for_nodes() @ray.remote @@ -387,7 +386,6 @@ def stats(): node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8 ) - print(f"node2 {node_to_kill.address_info} {node_to_kill.node_manager_port}") ref = dependent_task.remote(x, sema) wait_for_condition(lambda: stats() == (1, 1, 0)) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4a70f95a17a9..e18955146e45 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -663,6 +663,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_LOG(ERROR) << ":info_message: Attempting to recover " << lost_objects.size() << " lost objects by resubmitting their tasks. To disable " << "object reconstruction, set @ray.remote(max_retries=0)."; + // Delete the objects from the in-memory store to indicate that they are not + // available. The object recovery manager will guarantee that a new value + // will eventually be stored for the objects (either an + // UnreconstructableError or a value reconstructed from lineage). + memory_store_->Delete(lost_objects); for (const auto &object_id : lost_objects) { // NOTE(swang): There is a race condition where this can return false if // the reference went out of scope since the call to the ref counter to get diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index b02d17e65367..ecf4260c1315 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -45,12 +45,6 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { bool already_pending_recovery = true; bool requires_recovery = pinned_at.IsNil() && !spilled; if (requires_recovery) { - // Delete the objects from the in-memory store to indicate that they are not - // available. The object recovery manager will guarantee that a new value - // will eventually be stored for the objects (either an - // UnreconstructableError or a value reconstructed from lineage). - in_memory_store_->Delete({object_id}); - { absl::MutexLock lock(&mu_); // Mark that we are attempting recovery for this object to prevent @@ -78,6 +72,8 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } else { RAY_LOG(DEBUG) << "Object " << object_id << " has a pinned or spilled location, skipping recovery"; + RAY_CHECK( + in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); } return true; } From 81f6dcde5239c72374f33207c1f704df841953f1 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 8 Jul 2024 10:27:56 -0700 Subject: [PATCH 5/7] up Signed-off-by: Jiajun Yao --- .../core_worker/object_recovery_manager.cc | 4 ++++ .../test/object_recovery_manager_test.cc | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index ecf4260c1315..6bb9136cb273 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -72,6 +72,10 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } else { RAY_LOG(DEBUG) << "Object " << object_id << " has a pinned or spilled location, skipping recovery"; + // If the object doesn't exist in the memory store + // (core_worker.cc removes the object from memory store before calling this method), + // we need to add it back to indicate that it's available. + // If the object is already in the memory store then the put is a no-op. RAY_CHECK( in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); } diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index 52b921d44b58..8afa814aa07a 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -398,6 +398,30 @@ TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) { rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED); } +TEST_F(ObjectRecoveryManagerTest, TestReconstructionSkipped) { + // Test that if the object is already pinned or spilled, + // reconstruction is skipped. + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, + {}, + rpc::Address(), + "", + 0, + true, + /*add_local_ref=*/true); + ref_counter_->UpdateObjectPinnedAtRaylet(object_id, NodeID::FromRandom()); + + memory_store_->Delete({object_id}); + ASSERT_TRUE(manager_.RecoverObject(object_id)); + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_EQ(object_directory_->Flush(), 0); + ASSERT_EQ(raylet_client_->Flush(), 0); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); + bool in_plasma = false; + ASSERT_TRUE(memory_store_->Contains(object_id, &in_plasma)); + ASSERT_TRUE(in_plasma); +} + } // namespace core } // namespace ray From fe2350bd166fc2b7232e3c17e5b6f204d0195a60 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 8 Jul 2024 12:14:37 -0700 Subject: [PATCH 6/7] up Signed-off-by: Jiajun Yao --- src/ray/core_worker/object_recovery_manager.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 6bb9136cb273..840f3b3e1a31 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -70,8 +70,8 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } else if (requires_recovery) { RAY_LOG(DEBUG) << "Recovery already started for object " << object_id; } else { - RAY_LOG(DEBUG) << "Object " << object_id - << " has a pinned or spilled location, skipping recovery"; + RAY_LOG(INFO) << "Object " << object_id + << " has a pinned or spilled location, skipping recovery " << pinned_at; // If the object doesn't exist in the memory store // (core_worker.cc removes the object from memory store before calling this method), // we need to add it back to indicate that it's available. From 2d7868293466e3065487c0d508bcfe557a548a8d Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 8 Jul 2024 12:20:52 -0700 Subject: [PATCH 7/7] up Signed-off-by: Jiajun Yao --- src/ray/core_worker/test/object_recovery_manager_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index 8afa814aa07a..274c63e3245a 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -417,6 +417,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionSkipped) { ASSERT_EQ(object_directory_->Flush(), 0); ASSERT_EQ(raylet_client_->Flush(), 0); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); + // The object should be added back to the memory store + // indicating the object is available again. bool in_plasma = false; ASSERT_TRUE(memory_store_->Contains(object_id, &in_plasma)); ASSERT_TRUE(in_plasma);