Skip to content

Commit

Permalink
[Core] Add test case where there is dead node for /nodes?view=summary…
Browse files Browse the repository at this point in the history
… endpoint (#47727)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Sep 18, 2024
1 parent ceceb68 commit eaee1cc
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,19 @@ def getpid(self):


@pytest.mark.parametrize(
"ray_start_cluster_head", [{"include_dashboard": True}], indirect=True
"ray_start_cluster_head",
[
{
"include_dashboard": True,
"_system_config": {
"health_check_initial_delay_ms": 0,
"health_check_timeout_ms": 100,
"health_check_failure_threshold": 3,
"health_check_period_ms": 100,
},
}
],
indirect=True,
)
def test_multi_nodes_info(
enable_test_module, disable_aiohttp_cache, ray_start_cluster_head
Expand All @@ -152,6 +164,8 @@ def test_multi_nodes_info(
webui_url = format_web_url(webui_url)
cluster.add_node()
cluster.add_node()
dead_node = cluster.add_node()
cluster.remove_node(dead_node, allow_graceful=False)

def _check_nodes():
try:
Expand All @@ -160,15 +174,18 @@ def _check_nodes():
summary = response.json()
assert summary["result"] is True, summary["msg"]
summary = summary["data"]["summary"]
assert len(summary) == 3
assert len(summary) == 4
for node_info in summary:
node_id = node_info["raylet"]["nodeId"]
response = requests.get(webui_url + f"/nodes/{node_id}")
response.raise_for_status()
detail = response.json()
assert detail["result"] is True, detail["msg"]
detail = detail["data"]["detail"]
assert detail["raylet"]["state"] == "ALIVE"
if node_id != dead_node.node_id:
assert detail["raylet"]["state"] == "ALIVE"
else:
assert detail["raylet"]["state"] == "DEAD"
response = requests.get(webui_url + "/test/dump?key=agents")
response.raise_for_status()
agents = response.json()
Expand All @@ -191,40 +208,44 @@ def test_multi_node_churn(
assert wait_until_server_available(cluster.webui_url) is True
webui_url = format_web_url(cluster.webui_url)

def cluster_chaos_monkey():
worker_nodes = []
success = True

def verify():
nonlocal success
while True:
time.sleep(5)
if len(worker_nodes) < 2:
worker_nodes.append(cluster.add_node())
continue
should_add_node = random.randint(0, 1)
if should_add_node:
worker_nodes.append(cluster.add_node())
else:
node_index = random.randrange(0, len(worker_nodes))
node_to_remove = worker_nodes.pop(node_index)
cluster.remove_node(node_to_remove)

def get_index():
resp = requests.get(webui_url)
resp.raise_for_status()

def get_nodes():
resp = requests.get(webui_url + "/nodes?view=summary")
resp.raise_for_status()
summary = resp.json()
assert summary["result"] is True, summary["msg"]
assert summary["data"]["summary"]

t = threading.Thread(target=cluster_chaos_monkey, daemon=True)
try:
resp = requests.get(webui_url)
resp.raise_for_status()
resp = requests.get(webui_url + "/nodes?view=summary")
resp.raise_for_status()
summary = resp.json()
assert summary["result"] is True, summary["msg"]
assert summary["data"]["summary"]
time.sleep(1)
except Exception:
success = False
break

t = threading.Thread(target=verify, daemon=True)
t.start()

t_st = datetime.now()
duration = timedelta(seconds=60)
worker_nodes = []
while datetime.now() < t_st + duration:
get_index()
time.sleep(2)
time.sleep(5)
if len(worker_nodes) < 2:
worker_nodes.append(cluster.add_node())
continue
should_add_node = random.randint(0, 1)
if should_add_node:
worker_nodes.append(cluster.add_node())
else:
node_index = random.randrange(0, len(worker_nodes))
node_to_remove = worker_nodes.pop(node_index)
cluster.remove_node(node_to_remove)

assert success


if __name__ == "__main__":
Expand Down

0 comments on commit eaee1cc

Please sign in to comment.