Skip to content

Commit

Permalink
cluster list filters (#1233)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandra Belousov <sashabelousovrh@Alexandras-MacBook-Pro.local>
  • Loading branch information
BelSasha and Alexandra Belousov authored Sep 25, 2024
1 parent 99766c9 commit e465b15
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 141 deletions.
5 changes: 5 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
SECOND = 1
MINUTE = 60
HOUR = 3600
DAY = HOUR * 24
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
DEFAULT_AUTOSTOP_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
Expand All @@ -105,3 +106,7 @@

# Constants runhouse cluster list
LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR
MAX_CLUSTERS_DISPLAY = 50

# in seconds
TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY}
189 changes: 54 additions & 135 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import typer
import yaml
from rich.console import Console
from rich.table import Table

import runhouse as rh

Expand All @@ -27,7 +26,7 @@
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
LAST_ACTIVE_AT_TIMEFRAME,
MAX_CLUSTERS_DISPLAY,
RAY_KILL_CMD,
RAY_START_CMD,
SERVER_LOGFILE,
Expand All @@ -42,8 +41,12 @@
check_for_existing_ray_instance,
kill_actors,
)

from runhouse.utils import StatusColors
from runhouse.resources.hardware.utils import ClustersListStatus
from runhouse.utils import (
add_clusters_to_output_table,
create_output_table,
print_sky_clusters_msg,
)

# create an explicit Typer application
app = typer.Typer(add_completion=False)
Expand Down Expand Up @@ -590,159 +593,75 @@ def status(
_print_status(cluster_status, current_cluster)


def get_clusters_from_den():
get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username}
clusters_in_den_resp = rns_client.session.get(
f"{rns_client.api_server_url}/resource",
params=get_clusters_params,
headers=rns_client.request_headers(),
)

return clusters_in_den_resp


@cluster_app.command("list")
def cluster_list():
"""List all Runhouse clusters saved in Den"""
import sky
def cluster_list(
show_all: bool = typer.Option(
False,
"-a",
"--all",
help=f"Get all clusters saved in Den. Up to {MAX_CLUSTERS_DISPLAY} most recently active clusters will be displayed.",
),
since: Optional[str] = typer.Option(
None,
"--since",
help="Time duration to filter on. Minimum allowable filter is 1 minute. You may filter by seconds (s), "
"minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.",
),
cluster_status: Optional[ClustersListStatus] = typer.Option(
None,
"--status",
help="Cluster status to filter on. Supported filter values: running, terminated (cluster is not live), down (Runhouse server is down, but the cluster might be live).",
),
):
"""Load Runhouse clusters"""

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"Runhouse token required to view all clusters. Please run `runhouse login` to load your token. To view on-demand clusters run {sky_cli_command_formatted}"
f"Listing clusters requires a Runhouse token. Please run `runhouse login` to get your token, or run {sky_cli_command_formatted} to list locally stored on-demand clusters."
)
return

sky_clusters = sky.status()
clusters = Cluster.list(show_all=show_all, since=since, status=cluster_status)

clusters_in_den_resp = get_clusters_from_den()
all_clusters = clusters.get("all_clusters", None)
running_clusters = clusters.get("running_clusters", None)
sky_clusters = clusters.get("sky_clusters", None)

if clusters_in_den_resp.status_code != 200:
logger.error(
f"Failed to load clusters from Den for username: {rns_client.username}"
)
clusters_in_den = []
else:
clusters_in_den = clusters_in_den_resp.json().get("data")
if not all_clusters:
no_clusters_msg = f"No clusters found in Den for {rns_client.username}"
if show_all or since or cluster_status:
no_clusters_msg += " that match the provided filters"
console.print(no_clusters_msg)
print_sky_clusters_msg(len(sky_clusters))
return

clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den]
filters_requested: bool = show_all or since or cluster_status

if not sky_clusters and not clusters_in_den:
console.print("No existing clusters.")
clusters_to_print = all_clusters if filters_requested else running_clusters

if sky_clusters:
# getting the on-demand clusters that are not saved in den.
sky_clusters = [
cluster
for cluster in sky_clusters
if f'/{rns_client.username}/{cluster.get("name")}'
not in clusters_in_den_names
if show_all:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[
: (min(len(clusters_to_print), MAX_CLUSTERS_DISPLAY))
]

total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]Clusters for {rns_client.username} (Total: {total_clusters})[/bold cyan]"

table = Table(title=table_title, title_justify="left")

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")

running_clusters = []
not_running_clusters = []

for den_cluster in clusters_in_den:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else None
)

# currently relying on status pings to den as a sign of cluster activity.
# The split is required to remove milliseconds and the offset (according to UTC) from the timestamp.
# (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM)

last_active_at = den_cluster.get("status_last_checked")
last_active_at = (
datetime.datetime.fromisoformat(last_active_at.split(".")[0])
if isinstance(last_active_at, str)
else None
)
last_active_at = (
last_active_at.replace(tzinfo=datetime.timezone.utc)
if last_active_at
else None
)

if cluster_status == "running" and not last_active_at:
# For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den.
cluster_status = "unknown"

cluster_info = {
"Name": cluster_name,
"Cluster Type": cluster_type,
"Status": cluster_status,
"Last Active (UTC)": last_active_at,
}
running_clusters.append(
cluster_info
) if cluster_status == "running" else not_running_clusters.append(cluster_info)

# TODO: will be used if we'll need to print not-running clusters
# Sort not-running clusters by the 'Status' column
# not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"])

# Sort clusters by the 'Last Active (UTC)' column
not_running_clusters = sorted(
not_running_clusters,
key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]),
reverse=True,
)
running_clusters = sorted(
running_clusters,
key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]),
reverse=True,
# creating the clusters table
table = create_output_table(
total_clusters=len(all_clusters),
running_clusters=len(running_clusters),
displayed_clusters=len(clusters_to_print),
filters_requested=filters_requested,
)

# creating the clusters table
total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")

# TODO: will be used if we'll need to print not-running clusters
# all_clusters = running_clusters + not_running_clusters

for rh_cluster in running_clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")

# Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
StatusColors.get_status_color(rh_cluster.get("Status")),
str(last_active_at).split("+")[
0
], # The split is required to remove the offset (according to UTC)
)
add_clusters_to_output_table(table=table, clusters=clusters_to_print)

console.print(table)

if len(sky_clusters) > 0:
console.print(
f"There are {len(sky_clusters)} live cluster(s) that are not saved in Den. For more information, please run [bold italic]sky status -r[/bold italic]."
)
# print msg about un-saved live sky clusters.
print_sky_clusters_msg(len(sky_clusters))


# Register the 'cluster' command group with the main runhouse application
Expand Down
87 changes: 87 additions & 0 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@

import yaml

from runhouse.resources.hardware.utils import (
ClustersListStatus,
get_clusters_from_den,
get_running_and_not_running_clusters,
get_unsaved_live_clusters,
parse_filters,
)

from runhouse.rns.utils.api import ResourceAccess, ResourceVisibility
from runhouse.servers.http.certs import TLSCertConfig
from runhouse.utils import (
Expand Down Expand Up @@ -1946,3 +1954,82 @@ def _folder_mv(

def _folder_exists(self, path: Union[str, Path]):
return self.client.folder_exists(path=path)

###############################
# Cluster list
###############################
@classmethod
def list(
cls,
show_all: Optional[bool] = False,
since: Optional[str] = None,
status: Optional[ClustersListStatus] = None,
) -> Dict[str, List[Dict]]:
"""
Returns user's runhouse clusters saved in Den and locally via Sky. If filters are provided, only clusters that are matching the
filters are returned. If no filters are provided, clusters that were active in the last 24 hours are returned.
Args:
show_all (bool, optional): Whether to list all clusters saved in Den. Maximum of 50 will be listed. (Default: False).
since (str, optional): Clusters that were active in the specified time period will be returned. Value can be in seconds, minutes, hours or days.
status (ResourceServerStatus, optional): Clusters with the provided status will be returned.
Examples:
>>> Cluster.list(since="75s")
>>> Cluster.list(since="3m")
>>> Cluster.list(since="2h")
>>> Cluster.list(since="7d")
"""

try:
import sky

# get sky live clusters
sky_live_clusters = [
{
"Name": sky_cluster.get("name"),
"Cluster Type": "OnDemandCluster (Sky)",
"Status": sky_cluster.get("status").value,
}
for sky_cluster in sky.status()
]
except Exception:
logger.debug("Failed to load sky live clusters.")
sky_live_clusters = []

cluster_filters = (
parse_filters(since=since, cluster_status=status)
if not show_all
else {"all": "all"}
)

# get clusters from den
den_clusters_resp = get_clusters_from_den(cluster_filters=cluster_filters)
if den_clusters_resp.status_code != 200:
logger.error(f"Failed to load {rns_client.username}'s clusters from Den")
den_clusters = []
else:
den_clusters = den_clusters_resp.json().get("data")

if not sky_live_clusters and not den_clusters:
return {}

# sky_live_clusters = sky clusters found in the local sky DB but not saved in den
sky_live_clusters = get_unsaved_live_clusters(
den_clusters=den_clusters, sky_live_clusters=sky_live_clusters
)

# running_clusters: running clusters which are saved in Den
# not running clusters: clusters that are terminated / unknown / down which are also saved in Den.
(
running_clusters,
not_running_clusters,
) = get_running_and_not_running_clusters(clusters=den_clusters)
all_clusters = running_clusters + not_running_clusters

clusters = {
"all_clusters": all_clusters,
"running_clusters": running_clusters,
"sky_clusters": sky_live_clusters,
}
return clusters
Loading

0 comments on commit e465b15

Please sign in to comment.