Skip to content

Commit

Permalink
[Core] Fix check failure: sync_reactors_.find(reactor->GetRemoteNodeI…
Browse files Browse the repository at this point in the history
…D()) == sync_reactors_.end() (ray-project#47861)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
jjyao authored and ujjawal-khare committed Oct 15, 2024
1 parent 0871f4d commit ffc1746
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 119 deletions.
119 changes: 1 addition & 118 deletions python/ray/tests/test_network_failure_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def test_transient_network_error(head2, worker2, gcs_network):
network = gcs_network

check_two_nodes = """
import sys
import ray
from ray._private.test_utils import wait_for_condition
Expand Down Expand Up @@ -196,124 +197,6 @@ def ping(self):
assert result.exit_code == 0, result.output.decode("utf-8")


head3 = gen_head_node(
{
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "100000",
"RAY_health_check_failure_threshold": "20",
}
)

worker3 = gen_worker_node(
envs={
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "100000",
"RAY_health_check_failure_threshold": "20",
},
num_cpus=2,
)


def test_async_actor_task_retry(head3, worker3, gcs_network):
# Test that if transient network error happens
# after an async actor task is submitted and being executed,
# a secon attempt will be submitted and executed after the
# first attempt finishes.
network = gcs_network

driver = """
import asyncio
import ray
from ray.util.state import list_tasks
ray.init(namespace="test")
@ray.remote(num_cpus=0.1, name="counter", lifetime="detached")
class Counter:
def __init__(self):
self.count = 0
def inc(self):
self.count = self.count + 1
return self.count
@ray.method(max_task_retries=-1)
def get(self):
return self.count
@ray.remote(num_cpus=0.1, max_task_retries=-1)
class AsyncActor:
def __init__(self, counter):
self.counter = counter
async def run(self):
count = await self.counter.get.remote()
if count == 0:
# first attempt
await self.counter.inc.remote()
while len(list_tasks(
filters=[("name", "=", "AsyncActor.run")])) != 2:
# wait for second attempt to be made
await asyncio.sleep(1)
# wait until the second attempt reaches the actor
await asyncio.sleep(2)
await self.counter.inc.remote()
return "first"
else:
# second attempt
# make sure second attempt only runs
# after first attempt finishes
assert count == 2
return "second"
counter = Counter.remote()
async_actor = AsyncActor.remote(counter)
assert ray.get(async_actor.run.remote()) == "second"
"""

check_async_actor_run_is_called = """
import ray
from ray._private.test_utils import wait_for_condition
ray.init(namespace="test")
wait_for_condition(lambda: ray.get_actor("counter") is not None)
counter = ray.get_actor("counter")
wait_for_condition(lambda: ray.get(counter.get.remote()) == 1)
"""

def inject_transient_network_failure():
try:
result = head3.exec_run(
cmd=f"python -c '{check_async_actor_run_is_called}'"
)
assert result.exit_code == 0, result.output.decode("utf-8")

worker_ip = worker3._container.attrs["NetworkSettings"]["Networks"][
network.name
]["IPAddress"]
network.disconnect(worker3.name, force=True)
sleep(2)
network.connect(worker3.name, ipv4_address=worker_ip)
except Exception as e:
print(f"Network failure injection failed {e}")

t = threading.Thread(target=inject_transient_network_failure, daemon=True)
t.start()

result = head3.exec_run(
cmd=f"python -c '{driver}'",
)
assert result.exit_code == 0, result.output.decode("utf-8")


if __name__ == "__main__":
import os

Expand Down
1 change: 0 additions & 1 deletion src/ray/common/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ ServerBidiReactor *RaySyncerService::StartSync(grpc::CallbackServerContext *cont
// 4. OnDone method of the old reactor is called which calls this cleanup_cb_
return;
}
RAY_LOG(INFO).WithField(NodeID::FromBinary(node_id)) << "Connection is broken.";
syncer_.sync_reactors_.erase(node_id);
syncer_.node_state_->RemoveNode(node_id);
});
Expand Down

0 comments on commit ffc1746

Please sign in to comment.