Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix check failure RAY_CHECK(it != current_tasks_.end()); #47659

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions .buildkite/core.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ steps:

- label: ":ray: core: python {{matrix.python}} tests ({{matrix.worker_id}})"
if: build.pull_request.labels includes "continuous-build" || pipeline.id == "0189e759-8c96-4302-b6b5-b4274406bf89" || pipeline.id == "018f4f1e-1b73-4906-9802-92422e3badaa"
tags:
tags:
- python
- dashboard
instance_type: large
Expand Down Expand Up @@ -345,7 +345,8 @@ steps:
instance_type: medium
commands:
- bazel run //ci/ray_ci:build_in_docker -- docker --platform cpu --canonical-tag ha_integration
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/tests/... core --only-tags ha_integration
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/tests/... core --only-tags ha_integration || true
- sleep 100000000
depends_on:
- manylinux
- forge
Expand Down
10 changes: 9 additions & 1 deletion python/ray/tests/conftest_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pytest_docker_tools import container, fetch, network, volume
from pytest_docker_tools import wrappers
import subprocess
import docker
from typing import List

# If you need to debug tests using fixtures in this file,
Expand Down Expand Up @@ -65,7 +66,12 @@ def print_logs(self):
print(content.decode())


gcs_network = network(driver="bridge")
ipam_config = docker.types.IPAMConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment how this works? (how the poor networking thing works. how often it happens etc.)

Btw, this is very nice to have such config.. will be very useful for unit test

pool_configs=[
docker.types.IPAMPool(subnet="192.168.52.0/24", gateway="192.168.52.254")
]
)
gcs_network = network(driver="bridge", ipam=ipam_config)

redis_image = fetch(repository="redis:latest")

Expand Down Expand Up @@ -96,6 +102,8 @@ def gen_head_node(envs):
# ip:port is treated as a different raylet.
"--node-manager-port",
"9379",
"--dashboard-host",
"0.0.0.0",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment=envs,
Expand Down
118 changes: 118 additions & 0 deletions python/ray/tests/test_network_failure_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from time import sleep
import pytest
import threading
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest_docker import * # noqa
from ray.tests.conftest_docker import gen_head_node, gen_worker_node
Expand Down Expand Up @@ -124,6 +125,123 @@ def check_task_pending(n=0):
wait_for_condition(lambda: check_task_pending(2))


head2 = gen_head_node(
{
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "100000",
"RAY_health_check_failure_threshold": "20",
}
)

worker2 = gen_worker_node(
{
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "100000",
"RAY_health_check_failure_threshold": "20",
}
)

DRIVER_SCRIPT = """
import asyncio
import ray
from ray.util.state import list_tasks

ray.init(namespace="test")

@ray.remote(num_cpus=0.1, name="counter", lifetime="detached")
class Counter:
def __init__(self):
self.count = 0

def inc(self):
self.count = self.count + 1
return self.count

@ray.method(max_task_retries=-1)
def get(self):
return self.count

@ray.remote(num_cpus=0.1, max_task_retries=-1)
class AsyncActor:
def __init__(self, counter):
self.counter = counter

async def run(self):
count = await self.counter.get.remote()
if count == 0:
# first attempt
await self.counter.inc.remote()
while len(list_tasks(
filters=[("name", "=", "AsyncActor.run")])) != 2:
# wait for second attempt to be made
await asyncio.sleep(1)
# wait until the second attempt reaches the actor
await asyncio.sleep(2)
await self.counter.inc.remote()
return "first"
else:
# second attempt
# make sure second attempt only runs
# after first attempt finishes
assert count == 2
return "second"

counter = Counter.remote()
async_actor = AsyncActor.remote(counter)
assert ray.get(async_actor.run.remote()) == "second"
"""

CHECK_ASYNC_ACTOR_RUN_IS_CALLED_SCRIPT = """
import sys
import ray
ray.init(namespace="test")

counter = ray.get_actor("counter")
if ray.get(counter.get.remote()) == 1:
# AsyncActor.run is called.
sys.exit(0)
else:
sys.exit(1)
"""


def test_async_actor_task_retry(head2, worker2, gcs_network):
network = gcs_network

def inject_transient_network_failure():
try:
wait_for_condition(
lambda: head2.exec_run(
cmd=f"python -c '{CHECK_ASYNC_ACTOR_RUN_IS_CALLED_SCRIPT}'"
).exit_code
== 0
)
worker_ip = worker2._container.attrs["NetworkSettings"]["Networks"][
network.name
]["IPAddress"]
network.disconnect(worker2.name, force=True)
sleep(2)
network.connect(worker2.name, ipv4_address=worker_ip)
except Exception as e:
print(f"Network failure injection failed {e}")

t = threading.Thread(target=inject_transient_network_failure, daemon=True)
t.start()

result = head2.exec_run(
cmd=f"python -c '{DRIVER_SCRIPT}'",
)
assert result.exit_code == 0, result.output.decode("utf-8")


if __name__ == "__main__":
import os

Expand Down
5 changes: 4 additions & 1 deletion src/ray/common/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ void RaySyncer::Connect(const std::string &node_id,
[this, channel](const std::string &node_id, bool restart) {
sync_reactors_.erase(node_id);
if (restart) {
RAY_LOG(INFO) << "Connection is broken. Will reconnect to node: "
<< NodeID::FromBinary(node_id);
execute_after(
io_context_,
[this, node_id, channel]() {
Expand All @@ -246,7 +248,7 @@ void RaySyncer::Connect(RaySyncerBidiReactor *reactor) {
boost::asio::dispatch(
io_context_.get_executor(), std::packaged_task<void()>([this, reactor]() {
RAY_CHECK(sync_reactors_.find(reactor->GetRemoteNodeID()) == sync_reactors_.end())
<< reactor->GetRemoteNodeID();
<< NodeID::FromBinary(reactor->GetRemoteNodeID());
sync_reactors_[reactor->GetRemoteNodeID()] = reactor;
// Send the view for new connections.
for (const auto &[_, messages] : node_state_->GetClusterView()) {
Expand Down Expand Up @@ -352,6 +354,7 @@ ServerBidiReactor *RaySyncerService::StartSync(grpc::CallbackServerContext *cont
[this](const std::string &node_id, bool reconnect) mutable {
// No need to reconnect for server side.
RAY_CHECK(!reconnect);
RAY_LOG(INFO) << "Connection is broken." << NodeID::FromBinary(node_id);
syncer_.sync_reactors_.erase(node_id);
syncer_.node_state_->RemoveNode(node_id);
});
Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/transport/actor_scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void ActorSchedulingQueue::Add(
const std::string &concurrency_group_name,
const ray::FunctionDescriptor &function_descriptor,
TaskID task_id,
uint64_t attempt_number,
const std::vector<rpc::ObjectReference> &dependencies) {
// A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order.
RAY_CHECK(seq_no != -1);
Expand All @@ -93,7 +94,8 @@ void ActorSchedulingQueue::Add(
std::move(reject_request),
std::move(send_reply_callback),
task_id,
dependencies.size() > 0,
attempt_number,
dependencies,
concurrency_group_name,
function_descriptor);
{
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/transport/actor_scheduling_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ActorSchedulingQueue : public SchedulingQueue {
const std::string &concurrency_group_name,
const ray::FunctionDescriptor &function_descriptor,
TaskID task_id = TaskID::Nil(),
uint64_t attempt_number = 0,
const std::vector<rpc::ObjectReference> &dependencies = {}) override;

/// Cancel the actor task in the queue.
Expand Down
14 changes: 10 additions & 4 deletions src/ray/core_worker/transport/actor_scheduling_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,37 @@ InboundRequest::InboundRequest(
std::function<void(const Status &, rpc::SendReplyCallback)> reject_callback,
rpc::SendReplyCallback send_reply_callback,
class TaskID task_id,
bool has_dependencies,
uint64_t attempt_number,
const std::vector<rpc::ObjectReference> &dependencies,
const std::string &concurrency_group_name,
const ray::FunctionDescriptor &function_descriptor)
: accept_callback_(std::move(accept_callback)),
reject_callback_(std::move(reject_callback)),
send_reply_callback_(std::move(send_reply_callback)),
task_id_(task_id),
attempt_number_(attempt_number),
concurrency_group_name_(concurrency_group_name),
function_descriptor_(function_descriptor),
has_pending_dependencies_(has_dependencies) {}
pending_dependencies_(dependencies) {}

void InboundRequest::Accept() { accept_callback_(std::move(send_reply_callback_)); }
void InboundRequest::Cancel(const Status &status) {
reject_callback_(status, std::move(send_reply_callback_));
}

bool InboundRequest::CanExecute() const { return !has_pending_dependencies_; }
bool InboundRequest::CanExecute() const { return pending_dependencies_.empty(); }
ray::TaskID InboundRequest::TaskID() const { return task_id_; }
uint64_t InboundRequest::AttemptNumber() const { return attempt_number_; }
const std::string &InboundRequest::ConcurrencyGroupName() const {
return concurrency_group_name_;
}
const ray::FunctionDescriptor &InboundRequest::FunctionDescriptor() const {
return function_descriptor_;
}
void InboundRequest::MarkDependenciesSatisfied() { has_pending_dependencies_ = false; }
const std::vector<rpc::ObjectReference> &InboundRequest::PendingDependencies() const {
return pending_dependencies_;
};
void InboundRequest::MarkDependenciesSatisfied() { pending_dependencies_.clear(); }

DependencyWaiterImpl::DependencyWaiterImpl(DependencyWaiterInterface &dependency_client)
: dependency_client_(dependency_client) {}
Expand Down
8 changes: 6 additions & 2 deletions src/ray/core_worker/transport/actor_scheduling_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,31 @@ class InboundRequest {
std::function<void(const Status &, rpc::SendReplyCallback)> reject_callback,
rpc::SendReplyCallback send_reply_callback,
TaskID task_id,
bool has_dependencies,
uint64_t attempt_number,
const std::vector<rpc::ObjectReference> &dependencies,
const std::string &concurrency_group_name,
const ray::FunctionDescriptor &function_descriptor);

void Accept();
void Cancel(const Status &status);
bool CanExecute() const;
ray::TaskID TaskID() const;
uint64_t AttemptNumber() const;
const std::string &ConcurrencyGroupName() const;
const ray::FunctionDescriptor &FunctionDescriptor() const;
void MarkDependenciesSatisfied();
const std::vector<rpc::ObjectReference> &PendingDependencies() const;

private:
std::function<void(rpc::SendReplyCallback)> accept_callback_;
std::function<void(const Status &, rpc::SendReplyCallback)> reject_callback_;
rpc::SendReplyCallback send_reply_callback_;

ray::TaskID task_id_;
uint64_t attempt_number_;
std::string concurrency_group_name_;
ray::FunctionDescriptor function_descriptor_;
bool has_pending_dependencies_;
std::vector<rpc::ObjectReference> pending_dependencies_;
};

/// Waits for an object dependency to become available. Abstract for testing.
Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/transport/normal_scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void NormalSchedulingQueue::Add(
const std::string &concurrency_group_name,
const FunctionDescriptor &function_descriptor,
TaskID task_id,
uint64_t attempt_number,
const std::vector<rpc::ObjectReference> &dependencies) {
absl::MutexLock lock(&mu_);
// Normal tasks should not have ordering constraints.
Expand All @@ -54,7 +55,8 @@ void NormalSchedulingQueue::Add(
std::move(reject_request),
std::move(send_reply_callback),
task_id,
dependencies.size() > 0,
attempt_number,
dependencies,
/*concurrency_group_name=*/"",
function_descriptor));
}
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/transport/normal_scheduling_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class NormalSchedulingQueue : public SchedulingQueue {
const std::string &concurrency_group_name,
const FunctionDescriptor &function_descriptor,
TaskID task_id,
uint64_t attempt_number,
const std::vector<rpc::ObjectReference> &dependencies) override;

// Search for an InboundRequest associated with the task that we are trying to cancel.
Expand Down
Loading