diff --git a/runhouse/constants.py b/runhouse/constants.py index 5a192cbd3..1dfeafa62 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -76,3 +76,11 @@ DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR STATUS_CHECK_DELAY = 1 * MINUTE + +# Constants Surfacing Logs to Den +DEFAULT_LOG_SURFACING_INTERVAL = 2 * MINUTE +S3_LOGS_FILE_NAME = "server.log" +DEFAULT_SURFACED_LOG_LENGTH = 20 +# Constants for schedulers +SCHEDULERS_DELAY = 2 * MINUTE +INCREASED_INTERVAL = 1 * HOUR diff --git a/runhouse/logger.py b/runhouse/logger.py index b2c846adf..db15fb343 100644 --- a/runhouse/logger.py +++ b/runhouse/logger.py @@ -1,4 +1,5 @@ import logging +import re from datetime import datetime, timezone from typing import List @@ -20,6 +21,12 @@ class ColoredFormatter: def get_color(cls, color: str): return cls.COLORS.get(color, "") + # TODO: This method is a temp solution, until we'll update logging architecture. Remove once logging is cleaned up. + @classmethod + def format_log(cls, text): + ansi_escape = re.compile(r"(?:\x1B[@-_][0-?]*[ -/]*[@-~])") + return ansi_escape.sub("", text) + class ClusterLogsFormatter: def __init__(self, system): diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index dfb6e6d0f..cdf961dee 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -1704,6 +1704,9 @@ def _check_for_child_configs(cls, config: dict): return config + ############################################## + # Send Cluster status to Den methods + ############################################## def _disable_status_check(self): """ Stopping sending status to Den. diff --git a/runhouse/servers/cluster_servlet.py b/runhouse/servers/cluster_servlet.py index 879ca2e5e..5757c23ec 100644 --- a/runhouse/servers/cluster_servlet.py +++ b/runhouse/servers/cluster_servlet.py @@ -1,18 +1,26 @@ import asyncio import copy +import json import logging import threading from typing import Any, Dict, List, Optional, Set, Union +import requests + import runhouse from runhouse.constants import ( + DEFAULT_LOG_SURFACING_INTERVAL, DEFAULT_STATUS_CHECK_INTERVAL, + INCREASED_INTERVAL, INCREASED_STATUS_CHECK_INTERVAL, - STATUS_CHECK_DELAY, + S3_LOGS_FILE_NAME, + SCHEDULERS_DELAY, + SERVER_LOGFILE, ) from runhouse.globals import configs, obj_store, rns_client +from runhouse.logger import ColoredFormatter from runhouse.resources.hardware import load_cluster_config_from_file from runhouse.rns.rns_client import ResourceStatusData from runhouse.rns.utils.api import ResourceAccess @@ -56,6 +64,12 @@ async def __init__( ) post_status_thread.start() + logger.debug("Creating send_logs_to_den thread.") + send_logs_thread = threading.Thread( + target=self.send_cluster_logs_to_den, daemon=True + ) + send_logs_thread.start() + ############################################## # Cluster config state storage methods ############################################## @@ -206,7 +220,7 @@ async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str) async def aperiodic_status_check(self): # Delay the start of post_status_thread, so we'll finish the cluster startup properly - await asyncio.sleep(STATUS_CHECK_DELAY) + await asyncio.sleep(SCHEDULERS_DELAY) while True: try: @@ -250,7 +264,7 @@ async def aperiodic_status_check(self): ) logger.warning( f"Temporarily increasing the interval between two consecutive status checks. " - f"Next status check will be in {round(INCREASED_STATUS_CHECK_INTERVAL / 60, 2)} minutes. " + f"Next status check will be in {round(INCREASED_INTERVAL / 60, 2)} minutes. " f"For changing the interval size, please run cluster._enable_or_update_status_check(new_interval). " f"If a value is not provided, interval size will be set to {DEFAULT_STATUS_CHECK_INTERVAL}" ) @@ -356,3 +370,66 @@ async def astatus(self): def status(self): return sync_function(self.astatus)() + + ############################################## + # Surface cluster logs to Den + ############################################## + def _get_logs(self): + with open(SERVER_LOGFILE) as log_file: + log_lines = log_file.readlines() + cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines] + return " ".join(cleaned_log_lines) + + async def asend_cluster_logs_to_den(self): + # Delay the start of post_logs_thread, so we'll finish the cluster startup properly + await asyncio.sleep(SCHEDULERS_DELAY) + + while True: + logger.info("Trying to send cluster logs to Den") + try: + interval_size = DEFAULT_LOG_SURFACING_INTERVAL + latest_logs = self._get_logs() + logs_data = {"file_name": S3_LOGS_FILE_NAME, "logs": latest_logs} + + cluster_config = await self.aget_cluster_config() + cluster_uri = rns_client.format_rns_address(cluster_config.get("name")) + api_server_url = cluster_config.get( + "api_server_url", rns_client.api_server_url + ) + + post_logs_resp = requests.post( + f"{api_server_url}/resource/{cluster_uri}/logs", + data=json.dumps(logs_data), + headers=rns_client.request_headers(), + ) + + post_logs_resp_json = post_logs_resp.json() + + if post_logs_resp.status_code != 200: + post_logs_error = ( + post_logs_resp_json.get("detail") + if post_logs_resp_json.get("detail") + else "" + ) + logger.error( + f"({post_logs_resp.status_code}) Failed to send cluster logs to Den: {post_logs_error}" + ) + else: + logger.info( + f"Successfully sent cluster logs to Den. Next status check will be in {round(interval_size / 60, 2)} minutes." + ) + except Exception as e: + logger.error( + f"Sending cluster logs to den has failed: {e}. Please check cluster logs for more info." + ) + logger.warning( + f"Temporarily increasing the interval between two consecutive log retrievals." + f"Next log retrieval will be in {round(INCREASED_INTERVAL / 60, 2)} minutes. " + f"For changing the interval size, please run cluster.restart_server(). " + f"Interval size will be set to {interval_size}" + ) + finally: + await asyncio.sleep(interval_size) + + def send_cluster_logs_to_den(self): + asyncio.run(self.asend_cluster_logs_to_den()) diff --git a/tests/test_resources/test_clusters/test_on_demand_cluster.py b/tests/test_resources/test_clusters/test_on_demand_cluster.py index bab680204..e937a54f8 100644 --- a/tests/test_resources/test_clusters/test_on_demand_cluster.py +++ b/tests/test_resources/test_clusters/test_on_demand_cluster.py @@ -7,6 +7,7 @@ import runhouse as rh from runhouse.constants import SERVER_LOGFILE_PATH from runhouse.globals import rns_client +from runhouse.logger import ColoredFormatter from runhouse.resources.hardware.utils import ResourceServerStatus import tests.test_resources.test_clusters.test_cluster @@ -234,3 +235,37 @@ def test_set_status_after_teardown(self, cluster): get_status_data = get_status_data_resp.json()["data"][0] assert get_status_data["resource_type"] == cluster_config.get("resource_type") assert get_status_data["status"] == ResourceServerStatus.terminated + + @pytest.mark.level("minimal") + def test_logs_surfacing_scheduler_basic_flow(self, cluster): + if not cluster.den_auth: + pytest.skip( + "This test checking pinging cluster status to den, this could be done only on clusters " + "with den_auth that can be saved to den." + ) + + time.sleep(120) + cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address) + headers = rh.globals.rns_client.request_headers() + api_server_url = rh.globals.rns_client.api_server_url + + get_status_data_resp = requests.get( + f"{api_server_url}/resource/{cluster_uri}/logs", + headers=headers, + ) + + cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"])[0][1].split( + "\n" + ) # create list of lines + cluster_logs = [ + ColoredFormatter.format_log(log) for log in cluster_logs + ] # clean log formatting + cluster_logs = "\n".join(cluster_logs) # make logs list into one string + + assert "Trying to send cluster logs to Den" in cluster_logs + + assert get_status_data_resp.status_code == 200 + cluster_logs_from_s3 = get_status_data_resp.json()["data"][0][1:].replace( + "\n ", "\n" + ) + assert cluster_logs_from_s3 in cluster_logs