Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fault tolerance for actor creation #3422

Merged
merged 6 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
<< " already removed from the lineage cache. This is most "
"likely due to reconstruction.";
}
// Maintain the invariant that if a task is in the
// MethodsWaitingForActorCreation queue, then it is subscribed to its
// respective actor creation task. Since the actor location is now known,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invariant is not just that it is subscribed to the its actor creation task, but also that the ONLY task it is subscribed to is its actor creation task, right? Can you add that to the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks!

// we can remove the task from the queue and forget its dependency on the
// actor creation task.
RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies(
method.GetTaskSpecification().TaskId()));
// The task's uncommitted lineage was already added to the local lineage
// cache upon the initial submission, so it's okay to resubmit it with an
// empty lineage this time.
Expand Down Expand Up @@ -1154,6 +1161,14 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// Keep the task queued until we discover the actor's location.
// (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueMethodsWaitingForActorCreation({task});
// The actor has not yet been created and may have failed. To make sure
// that the actor is eventually recreated, we maintain the invariant that
// if a task is in the MethodsWaitingForActorCreation queue, then it is
// subscribed to its respective actor creation task. Once the actor has
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double space -> single space

// been created and this method removed from the waiting queue, the
// caller must make the corresponding call to UnsubscribeDependencies.
task_dependency_manager_.SubscribeDependencies(spec.TaskId(),
{spec.ActorCreationDummyObjectId()});
// Mark the task as pending. It will be canceled once we discover the
// actor's location and either execute the task ourselves or forward it
// to another node.
Expand Down Expand Up @@ -1431,7 +1446,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {

// Publish the actor creation event to all other nodes so that methods for
// the actor will be forwarded directly to this node.
RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end());
RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end())
<< "Created an actor that already exists";
auto actor_data = std::make_shared<ActorTableDataT>();
actor_data->actor_id = actor_id.binary();
actor_data->actor_creation_dummy_object_id =
Expand All @@ -1447,6 +1463,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
// index in the log should succeed.
auto failure_callback = [](gcs::AsyncGcsClient *client, const ActorID &id,
const ActorTableDataT &data) {
// TODO(swang): Instead of making this a fatal check, we could just kill
// the duplicate actor process. If we do this, we must make sure to
// either resubmit the tasks that went to the duplicate actor, or wait
// for success before handling the actor state transition to ALIVE.
RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id;
};
RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(
Expand Down
67 changes: 67 additions & 0 deletions test/component_failures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import os
import json
import signal
import sys
import time

import numpy as np
import pytest

import ray
from ray.test.cluster_utils import Cluster
from ray.test.test_utils import run_string_as_driver_nonblocking


Expand All @@ -33,6 +36,27 @@ def shutdown_only():
ray.shutdown()


@pytest.fixture
def ray_start_cluster():
num_workers = 8
node_args = {
"resources": dict(CPU=num_workers),
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10
})
}
# Start with 4 workers and 4 cores.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say Start cluster with 4 worker nodes, each with 8 cores.?

g = Cluster(initialize_head=True, connect=True, head_node_args=node_args)
workers = []
for _ in range(4):
workers.append(g.add_node(**node_args))
g.wait_for_nodes()
yield g
ray.shutdown()
g.shutdown()


# This test checks that when a worker dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
Expand Down Expand Up @@ -347,6 +371,49 @@ def test_plasma_store_failed():
ray.shutdown()


def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster

@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability

def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)

num_children = 100
# Children actors will die about half the time.
death_probability = 0.5

children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(3):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=30000)
assert len(ready) == len(children_out)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing this fail with

>               assert len(ready) == len(children_out)
E               assert 75 == 100

Maybe the timeout is too short? How about we do it without a timeout?

Copy link
Contributor Author

@stephanie-wang stephanie-wang Nov 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I prefer to keep it with a timeout since this test will hang if it doesn't pass. If it hangs, we won't be able to get the stderr. I can increase the timeout to something much longer though.


# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.worker.RayGetError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])


@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
Expand Down