Skip to content

Commit

Permalink
modify cluster status checks tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Aug 26, 2024
1 parent a1e13a8 commit 9e540f6
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 182 deletions.
4 changes: 4 additions & 0 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ def teardown(self):
Example:
>>> rh.ondemand_cluster("rh-cpu").teardown()
"""
status_code_den_request = None
# TODO [SB]: remove the den_auth check once we will get status of clusters without den_auth as well.
if self.den_auth:
try:
Expand All @@ -577,13 +578,16 @@ def teardown(self):
logger.warning(
"Failed to update Den with terminated cluster status"
)
status_code_den_request = status_resp.status_code
except Exception as e:
logger.warning(e)

# Stream logs
sky.down(self.name)
self.address = None

return status_code_den_request

def teardown_and_delete(self):
"""Teardown cluster and delete it from configs.
Expand Down
161 changes: 102 additions & 59 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import datetime
import json
import os.path
import threading
from typing import Any, Dict, List, Optional, Set, Tuple, Union

Expand Down Expand Up @@ -70,10 +71,10 @@ async def __init__(
)

logger.info("Creating periodic_cluster_checks thread.")
cluster_checks_thread = threading.Thread(
self.cluster_checks_thread = threading.Thread(
target=self.periodic_cluster_checks, daemon=True
)
cluster_checks_thread.start()
self.cluster_checks_thread.start()

##############################################
# Cluster config state storage methods
Expand All @@ -87,6 +88,16 @@ async def aset_cluster_config(self, cluster_config: Dict[str, Any]):

self.cluster_config = cluster_config

new_cluster_name = self.cluster_config.get("name", None)

if self._cluster_name != new_cluster_name:
self._cluster_name = new_cluster_name
self._cluster_uri = (
rns_client.format_rns_address(self._cluster_name)
if self._cluster_name
else None
)

# Propagate the changes to all other process's obj_stores
await asyncio.gather(
*[
Expand Down Expand Up @@ -235,7 +246,9 @@ async def asave_status_metrics_to_den(self, status: dict):

status_data = {
"status": ResourceServerStatus.running,
"resource_type": status_copy.get("cluster_config").pop("resource_type"),
"resource_type": status_copy.get("cluster_config").pop(
"resource_type", "cluster"
),
"resource_info": status_copy,
"env_servlet_processes": env_servlet_processes,
}
Expand All @@ -251,6 +264,66 @@ async def asave_status_metrics_to_den(self, status: dict):
def save_status_metrics_to_den(self, status: dict):
return sync_function(self.asave_status_metrics_to_den)(status)

async def acheck_cluster_status_and_logs(
self, interval_size: int, send_to_den: bool = True
):

logger.debug("Performing cluster checks")
status, den_resp_status = await self.astatus(send_to_den=send_to_den)
logs_resp_status_code, new_start_log_line, new_end_log_line = None, None, None

if not send_to_den:
return (
den_resp_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
)

status_code_cluster_status = den_resp_status.status_code

if status_code_cluster_status == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
)
elif status_code_cluster_status != 200:
logger.error(
f"Failed to send cluster status to Den: {den_resp_status.json()}"
)
else:
logger.debug("Successfully sent cluster status to Den.")

cluster_config = await self.aget_cluster_config()
prev_end_log_line = cluster_config.get("end_log_line", 0)
(
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
) = await self.send_cluster_logs_to_den(
prev_end_log_line=prev_end_log_line,
)
if not logs_resp_status_code:
logger.debug(
f"No logs were generated in the past {interval_size} minute(s), logs were not sent to Den."
)

elif logs_resp_status_code == 200:
logger.debug("Successfully sent cluster logs to Den.")
await self.aset_cluster_config_value(
key="start_log_line", value=new_start_log_line
)
await self.aset_cluster_config_value(
key="end_log_line", value=new_end_log_line
)
return (
status_code_cluster_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
)

async def aperiodic_cluster_checks(self):
"""Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory,
and save it to Den."""
Expand All @@ -260,71 +333,34 @@ async def aperiodic_cluster_checks(self):
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)
while True:
try:
# Only if one of these is true, do we actually need to get the status from each EnvServlet
should_send_status_and_logs_to_den: bool = (
configs.token is not None
and interval_size != -1
and self._cluster_uri is not None
)
should_update_autostop: bool = self.autostop_helper is not None
should_send_status_and_logs_to_den: bool = (
configs.token is not None
and interval_size != -1
and self._cluster_uri is not None
)
should_update_autostop: bool = self.autostop_helper is not None

if (
not should_send_status_and_logs_to_den
and not should_update_autostop
):
break
if not should_send_status_and_logs_to_den and not should_update_autostop:
break

logger.debug("Performing cluster checks")
status, den_resp = await self.astatus(
send_to_den=should_send_status_and_logs_to_den
try:
(
status_code_cluster_status,
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
status,
) = await self.acheck_cluster_status_and_logs(
interval_size=interval_size,
send_to_den=should_send_status_and_logs_to_den,
)

if should_update_autostop:
logger.debug("Updating autostop")
await self._update_autostop(status)

if not should_send_status_and_logs_to_den:
elif not should_send_status_and_logs_to_den:
break

status_code = den_resp.status_code

if status_code == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
)
elif status_code != 200:
logger.error(
f"Failed to send cluster status to Den: {den_resp.json()}"
)
else:
logger.debug("Successfully sent cluster status to Den.")

prev_end_log_line = cluster_config.get("end_log_line", 0)
(
logs_resp_status_code,
new_start_log_line,
new_end_log_line,
) = await self.send_cluster_logs_to_den(
prev_end_log_line=prev_end_log_line,
)
if not logs_resp_status_code:
logger.debug(
f"No logs were generated in the past {interval_size} minute(s), logs were not sent to Den."
)

elif logs_resp_status_code == 200:
logger.debug("Successfully sent cluster logs to Den.")
await self.aset_cluster_config_value(
key="start_log_line", value=new_start_log_line
)
await self.aset_cluster_config_value(
key="end_log_line", value=new_end_log_line
)
# since we are setting a new values to the cluster_config, we need to reload it so the next
# cluster check iteration will reference to the updated cluster config.
cluster_config = await self.aget_cluster_config()

except Exception:
self.logger.error(
"Cluster checks have failed.\n"
Expand Down Expand Up @@ -525,6 +561,10 @@ def status(self, send_to_den: bool = False):
# Save cluster logs to Den
##############################################
def _get_logs(self):

if not os.path.exists(SERVER_LOGFILE):
return ""

with open(SERVER_LOGFILE) as log_file:
log_lines = log_file.readlines()
cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines]
Expand Down Expand Up @@ -573,3 +613,6 @@ async def send_cluster_logs_to_den(
)

return resp_status_code, prev_end_log_line, new_end_log_line

def cluster_periodic_thread_is_alive(self):
return self.cluster_checks_thread.is_alive()
4 changes: 2 additions & 2 deletions runhouse/servers/env_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None):

if not cluster_config.get("resource_subtype") == "Cluster":
stable_internal_external_ips = cluster_config.get(
"stable_internal_external_ips"
"stable_internal_external_ips", []
)
for ips_set in stable_internal_external_ips:
internal_ip, external_ip = ips_set[0], ips_set[1]
Expand All @@ -214,7 +214,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None):
node_name = f"worker_{stable_internal_external_ips.index(ips_set)} ({external_ip})"
else:
# a case it is a BYO cluster, assume that first ip in the ips list is the head.
ips = cluster_config.get("ips")
ips = cluster_config.get("ips", [])
if len(ips) == 1 or node_ip == ips[0]:
node_name = f"head ({node_ip})"
else:
Expand Down
3 changes: 2 additions & 1 deletion runhouse/servers/http/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import requests

from fastapi import HTTPException
from pydantic import BaseModel, validator
from pydantic import BaseModel
from pydantic.v1 import validator
from ray import cloudpickle as pickle
from ray.exceptions import RayTaskError

Expand Down
105 changes: 8 additions & 97 deletions tests/test_resources/test_clusters/test_on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@
import time

import pytest
import requests

import runhouse as rh
from runhouse.constants import SERVER_LOGFILE_PATH
from runhouse.globals import rns_client
from runhouse.logger import ColoredFormatter
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_clusters.test_cluster
from tests.utils import friend_account
Expand Down Expand Up @@ -201,97 +196,13 @@ def test_fn_to_docker_container(self, ondemand_aws_cluster):
# Status tests
####################################################################################################

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_status_scheduler_basic_flow(self, cluster):

cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
api_server_url = rh.globals.rns_client.api_server_url
headers = rh.globals.rns_client.request_headers()

get_status_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/cluster/status?limit=1",
headers=headers,
)

assert get_status_data_resp.status_code == 200
resp_data = get_status_data_resp.json().get("data")[0]
assert resp_data.get("status") == ResourceServerStatus.running

# TODO [sb]: call cluster's /logs endpoint once introduced.
cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"], stream_logs=False)[
0
][1].split(
"\n"
) # create list of lines
cluster_logs = [
ColoredFormatter.format_log(log) for log in cluster_logs
] # clean log formatting
cluster_logs = "\n".join(cluster_logs) # make logs list into one string

assert "Cluster checks have failed" not in cluster_logs
assert "Failed to send cluster status to Den: " not in cluster_logs

@pytest.mark.level("minimal")
@pytest.mark.skip("Test requires terminating the cluster")
def test_set_status_after_teardown(self, cluster):

def test_set_status_after_teardown(self, cluster, mocker):
mock_function = mocker.patch("sky.down")
response = cluster.teardown()
assert isinstance(response, int)
assert (
response == 200
) # that means that the call to post status endpoint in den was successful
mock_function.assert_called_once()
assert cluster.is_up()
cluster_config = cluster.config()
cluster_uri = rns_client.format_rns_address(cluster.rns_address)
api_server_url = cluster_config.get("api_server_url", rns_client.api_server_url)
cluster.teardown()
get_status_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/cluster/status",
headers=rns_client.request_headers(),
)

assert get_status_data_resp.status_code == 200
# For UI displaying purposes, the cluster/status endpoint returns cluster status history.
# The latest status info is the first element in the list returned by the endpoint.
get_status_data = get_status_data_resp.json()["data"][0]
assert get_status_data["resource_type"] == cluster_config.get("resource_type")
assert get_status_data["status"] == ResourceServerStatus.terminated

####################################################################################################
# Logs surfacing tests
####################################################################################################
@pytest.mark.level("minimal")
def test_logs_surfacing_scheduler_basic_flow(self, cluster):
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

get_logs_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/logs",
headers=headers,
)

# TODO [sb]: call cluster's /logs endpoint once introduced.
cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"], stream_logs=False)[
0
][1].split(
"\n"
) # create list of lines
cluster_logs = [
ColoredFormatter.format_log(log) for log in cluster_logs
] # clean log formatting
cluster_logs = "\n".join(cluster_logs) # make logs list into one string

assert "Cluster checks have failed" not in cluster_logs
assert "Failed to send cluster logs to Den: " not in cluster_logs

cluster_status = cluster.status()
logs_start_line = cluster_status.get("cluster_config").get(
"start_log_line", None
)
logs_end_line = cluster_status.get("cluster_config").get("end_log_line", None)

assert logs_start_line
assert logs_end_line
assert logs_end_line > logs_start_line

resp_data = get_logs_data_resp.json().get("data")
assert get_logs_data_resp.status_code == 200
cluster_logs_from_s3 = resp_data["logs_text"][0][1:].replace("\n ", "\n")
assert cluster_logs_from_s3 in cluster_logs
Loading

0 comments on commit 9e540f6

Please sign in to comment.