Skip to content

Commit

Permalink
Periodically send cluster logs to den (#842)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandra Belousov <sashabelousovrh@Alexandras-MacBook-Pro.local>
  • Loading branch information
BelSasha and Alexandra Belousov authored Jun 23, 2024
1 parent e971ee4 commit e04e8f1
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 3 deletions.
8 changes: 8 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
STATUS_CHECK_DELAY = 1 * MINUTE

# Constants Surfacing Logs to Den
DEFAULT_LOG_SURFACING_INTERVAL = 2 * MINUTE
S3_LOGS_FILE_NAME = "server.log"
DEFAULT_SURFACED_LOG_LENGTH = 20
# Constants for schedulers
SCHEDULERS_DELAY = 2 * MINUTE
INCREASED_INTERVAL = 1 * HOUR
7 changes: 7 additions & 0 deletions runhouse/logger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from datetime import datetime, timezone
from typing import List

Expand All @@ -20,6 +21,12 @@ class ColoredFormatter:
def get_color(cls, color: str):
return cls.COLORS.get(color, "")

# TODO: This method is a temp solution, until we'll update logging architecture. Remove once logging is cleaned up.
@classmethod
def format_log(cls, text):
ansi_escape = re.compile(r"(?:\x1B[@-_][0-?]*[ -/]*[@-~])")
return ansi_escape.sub("", text)


class ClusterLogsFormatter:
def __init__(self, system):
Expand Down
3 changes: 3 additions & 0 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,9 @@ def _check_for_child_configs(cls, config: dict):

return config

##############################################
# Send Cluster status to Den methods
##############################################
def _disable_status_check(self):
"""
Stopping sending status to Den.
Expand Down
83 changes: 80 additions & 3 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import asyncio
import copy
import json
import logging
import threading
from typing import Any, Dict, List, Optional, Set, Union

import requests

import runhouse

from runhouse.constants import (
DEFAULT_LOG_SURFACING_INTERVAL,
DEFAULT_STATUS_CHECK_INTERVAL,
INCREASED_INTERVAL,
INCREASED_STATUS_CHECK_INTERVAL,
STATUS_CHECK_DELAY,
S3_LOGS_FILE_NAME,
SCHEDULERS_DELAY,
SERVER_LOGFILE,
)

from runhouse.globals import configs, obj_store, rns_client
from runhouse.logger import ColoredFormatter
from runhouse.resources.hardware import load_cluster_config_from_file
from runhouse.rns.rns_client import ResourceStatusData
from runhouse.rns.utils.api import ResourceAccess
Expand Down Expand Up @@ -56,6 +64,12 @@ async def __init__(
)
post_status_thread.start()

logger.debug("Creating send_logs_to_den thread.")
send_logs_thread = threading.Thread(
target=self.send_cluster_logs_to_den, daemon=True
)
send_logs_thread.start()

##############################################
# Cluster config state storage methods
##############################################
Expand Down Expand Up @@ -206,7 +220,7 @@ async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str)

async def aperiodic_status_check(self):
# Delay the start of post_status_thread, so we'll finish the cluster startup properly
await asyncio.sleep(STATUS_CHECK_DELAY)
await asyncio.sleep(SCHEDULERS_DELAY)
while True:
try:

Expand Down Expand Up @@ -250,7 +264,7 @@ async def aperiodic_status_check(self):
)
logger.warning(
f"Temporarily increasing the interval between two consecutive status checks. "
f"Next status check will be in {round(INCREASED_STATUS_CHECK_INTERVAL / 60, 2)} minutes. "
f"Next status check will be in {round(INCREASED_INTERVAL / 60, 2)} minutes. "
f"For changing the interval size, please run cluster._enable_or_update_status_check(new_interval). "
f"If a value is not provided, interval size will be set to {DEFAULT_STATUS_CHECK_INTERVAL}"
)
Expand Down Expand Up @@ -356,3 +370,66 @@ async def astatus(self):

def status(self):
return sync_function(self.astatus)()

##############################################
# Surface cluster logs to Den
##############################################
def _get_logs(self):
with open(SERVER_LOGFILE) as log_file:
log_lines = log_file.readlines()
cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines]
return " ".join(cleaned_log_lines)

async def asend_cluster_logs_to_den(self):
# Delay the start of post_logs_thread, so we'll finish the cluster startup properly
await asyncio.sleep(SCHEDULERS_DELAY)

while True:
logger.info("Trying to send cluster logs to Den")
try:
interval_size = DEFAULT_LOG_SURFACING_INTERVAL
latest_logs = self._get_logs()
logs_data = {"file_name": S3_LOGS_FILE_NAME, "logs": latest_logs}

cluster_config = await self.aget_cluster_config()
cluster_uri = rns_client.format_rns_address(cluster_config.get("name"))
api_server_url = cluster_config.get(
"api_server_url", rns_client.api_server_url
)

post_logs_resp = requests.post(
f"{api_server_url}/resource/{cluster_uri}/logs",
data=json.dumps(logs_data),
headers=rns_client.request_headers(),
)

post_logs_resp_json = post_logs_resp.json()

if post_logs_resp.status_code != 200:
post_logs_error = (
post_logs_resp_json.get("detail")
if post_logs_resp_json.get("detail")
else ""
)
logger.error(
f"({post_logs_resp.status_code}) Failed to send cluster logs to Den: {post_logs_error}"
)
else:
logger.info(
f"Successfully sent cluster logs to Den. Next status check will be in {round(interval_size / 60, 2)} minutes."
)
except Exception as e:
logger.error(
f"Sending cluster logs to den has failed: {e}. Please check cluster logs for more info."
)
logger.warning(
f"Temporarily increasing the interval between two consecutive log retrievals."
f"Next log retrieval will be in {round(INCREASED_INTERVAL / 60, 2)} minutes. "
f"For changing the interval size, please run cluster.restart_server(). "
f"Interval size will be set to {interval_size}"
)
finally:
await asyncio.sleep(interval_size)

def send_cluster_logs_to_den(self):
asyncio.run(self.asend_cluster_logs_to_den())
35 changes: 35 additions & 0 deletions tests/test_resources/test_clusters/test_on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import runhouse as rh
from runhouse.constants import SERVER_LOGFILE_PATH
from runhouse.globals import rns_client
from runhouse.logger import ColoredFormatter
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_clusters.test_cluster
Expand Down Expand Up @@ -234,3 +235,37 @@ def test_set_status_after_teardown(self, cluster):
get_status_data = get_status_data_resp.json()["data"][0]
assert get_status_data["resource_type"] == cluster_config.get("resource_type")
assert get_status_data["status"] == ResourceServerStatus.terminated

@pytest.mark.level("minimal")
def test_logs_surfacing_scheduler_basic_flow(self, cluster):
if not cluster.den_auth:
pytest.skip(
"This test checking pinging cluster status to den, this could be done only on clusters "
"with den_auth that can be saved to den."
)

time.sleep(120)
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

get_status_data_resp = requests.get(
f"{api_server_url}/resource/{cluster_uri}/logs",
headers=headers,
)

cluster_logs = cluster.run([f"cat {SERVER_LOGFILE_PATH}"])[0][1].split(
"\n"
) # create list of lines
cluster_logs = [
ColoredFormatter.format_log(log) for log in cluster_logs
] # clean log formatting
cluster_logs = "\n".join(cluster_logs) # make logs list into one string

assert "Trying to send cluster logs to Den" in cluster_logs

assert get_status_data_resp.status_code == 200
cluster_logs_from_s3 = get_status_data_resp.json()["data"][0][1:].replace(
"\n ", "\n"
)
assert cluster_logs_from_s3 in cluster_logs

0 comments on commit e04e8f1

Please sign in to comment.