Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Fix spot pending status #2044

Merged
merged 16 commits into from
Jun 8, 2023
19 changes: 13 additions & 6 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,21 +1538,28 @@ def _get_tpu_vm_pod_ips(ray_config: Dict[str, Any],
@timeline.event
def get_head_ip(
handle: 'cloud_vm_ray_backend.CloudVmRayResourceHandle',
use_cached_head_ip: bool = True,
use_cached_head_ip: Optional[bool] = True,
max_attempts: int = 1,
) -> str:
"""Returns the ip of the head node."""
"""Returns the ip of the head node.

use_cached_head_ip: If True, use the cached head ip if it exists. If False,
query the head ip from the cloud provider. If None, use the cached head
ip if it exists, otherwise query the head ip from the cloud provider.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
"""
head_ip = handle.head_ip
if use_cached_head_ip:
if handle.head_ip is None:
if head_ip is None:
# This happens for INIT clusters (e.g., exit 1 in setup).
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Cluster\'s head IP not found; is it up? To fix: '
'run a successful launch first (`sky launch`) to ensure'
' the cluster status is UP (`sky status`).')
head_ip = handle.head_ip
else:
head_ip = _query_head_ip_with_retries(handle.cluster_yaml, max_attempts)
return head_ip
if use_cached_head_ip is None and head_ip is not None:
return head_ip
head_ip = _query_head_ip_with_retries(handle.cluster_yaml, max_attempts)
return head_ip


Expand Down
62 changes: 37 additions & 25 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ def __init__(self):
# job_id = get_output(run_on_cluster(code))
self.job_id = None

def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
is_local: bool = False) -> None:
def add_prologue(self, job_id: int, is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
self.job_id = job_id
Expand Down Expand Up @@ -275,14 +272,6 @@ def add_prologue(self,
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
if spot_task is not None:
# Add the spot job to spot queue table.
resources_str = backend_utils.get_task_resources_str(spot_task)
self._code += [
'from sky.spot import spot_state',
f'spot_state.set_pending('
f'{job_id}, {spot_task.name!r}, {resources_str!r})',
]

def add_gang_scheduling_placement_group_and_setup(
self,
Expand Down Expand Up @@ -1821,7 +1810,7 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle',
# At this state, an erroneous cluster may not have cached
# handle.head_ip (global_user_state.add_or_update_cluster(...,
# ready=True)).
use_cached_head_ip=False)
use_cached_head_ip=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple places in this PR where previously we always query the cloud for IPs, but with this PR we always try to use cached IPs first. Why is that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the function argument does not have a way to do the "cached IP first and fallback to query". We had to be conservative to always query the IPs as the cached IP may not exist.

Now, we make the use_cached_head_ip=None to use the cached IP first and then fallback to query, so that we can reduce the overhead for querying the IP address multiple times, even though we have the IP cached already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like all callers now use None for this arg. Ok to eliminate the arg?

Copy link
Collaborator Author

@Michaelvll Michaelvll Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Just removed the argument. Testing:

  • pytest tests/test_smoke.py

if returncode == 0:
return
launched_resources = handle.launched_resources
Expand All @@ -1832,7 +1821,7 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle',
'The command `ray status` errored out on the head node '
'of the local cluster. Check if ray[default]==2.4.0 '
'is installed or running correctly.')
backend.run_on_head(handle, 'ray stop', use_cached_head_ip=False)
backend.run_on_head(handle, 'ray stop', use_cached_head_ip=None)

# Runs `ray up <kwargs>` with our monkey-patched launch hash
# calculation. See the monkey patch file for why.
Expand Down Expand Up @@ -2519,7 +2508,7 @@ def _update_after_cluster_provisioned(
self.run_on_head(
handle,
_MAYBE_SKYLET_RESTART_CMD,
use_cached_head_ip=False,
use_cached_head_ip=None,
)

# Update job queue to avoid stale jobs (when restarted), before
Expand Down Expand Up @@ -2742,6 +2731,7 @@ def _exec_code_on_head(
job_id: int,
executable: str,
detach_run: bool = False,
extra_cmd: Optional[str] = None,
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Executes generated code on the head node."""
style = colorama.Style
Expand Down Expand Up @@ -2790,6 +2780,9 @@ def _exec_code_on_head(
mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} && '
f'touch {remote_log_path}')
code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd)
if extra_cmd is not None:
code = f'{code} && {extra_cmd}'
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved

job_submit_cmd = mkdir_code + ' && ' + code

returncode, stdout, stderr = self.run_on_head(handle,
Expand Down Expand Up @@ -3575,15 +3568,21 @@ def run_on_head(
log_path: str = '/dev/null',
process_stream: bool = True,
stream_logs: bool = False,
use_cached_head_ip: bool = True,
use_cached_head_ip: Optional[bool] = True,
ssh_mode: command_runner.SshMode = command_runner.SshMode.
NON_INTERACTIVE,
under_remote_workdir: bool = False,
require_outputs: bool = False,
separate_stderr: bool = False,
**kwargs,
) -> Union[int, Tuple[int, str, str]]:
"""Runs 'cmd' on the cluster's head node."""
"""Runs 'cmd' on the cluster's head node.

use_cached_head_ip: If True, use the cached head IP address. If False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add other Args too? (I know, code gardening...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. The other arguments are listed in the docstr of SSHCommandRunner.run and log_lib.run. The document here is a bit duplicated. We can also change it a sentence referring to the docstr of those two functions. Wdyt?

fetch the head IP address from the cloud provider. If None, use
the cached head IP address if it exists, otherwise fetch the head
IP address from the cloud provider.
"""
head_ip = backend_utils.get_head_ip(handle, use_cached_head_ip,
_FETCH_IP_MAX_ATTEMPTS)
ssh_credentials = backend_utils.ssh_credential_from_yaml(
Expand Down Expand Up @@ -3892,6 +3891,19 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle,
end = time.time()
logger.debug(f'Storage mount sync took {end - start} seconds.')

@classmethod
def _maybe_set_spot_state_code(cls, job_id: int,
task: task_lib.Task) -> Optional[str]:
if task.spot_task is not None:
# Add the spot job to spot queue table.
resources_str = backend_utils.get_task_resources_str(task.spot_task)
codegen = spot_lib.SpotCodeGen()
spot_name = task.spot_task.name
assert spot_name is not None, task
code = codegen.set_pending_state(job_id, spot_name, resources_str)
return code
return None

def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
task: task_lib.Task, job_id: int,
detach_run: bool) -> None:
Expand All @@ -3904,9 +3916,7 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,

codegen = RayCodeGen()
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
is_local=is_local)
codegen.add_prologue(job_id, is_local=is_local)
codegen.add_gang_scheduling_placement_group_and_setup(
1,
accelerator_dict,
Expand Down Expand Up @@ -3946,7 +3956,9 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
codegen.build(),
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
job_id,
executable='python3',
detach_run=detach_run)
detach_run=detach_run,
extra_cmd=self._maybe_set_spot_state_code(
job_id, task))

def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
task: task_lib.Task, job_id: int,
Expand All @@ -3972,9 +3984,7 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,

codegen = RayCodeGen()
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
is_local=is_local)
codegen.add_prologue(job_id, is_local=is_local)
codegen.add_gang_scheduling_placement_group_and_setup(
num_actual_nodes,
accelerator_dict,
Expand Down Expand Up @@ -4021,4 +4031,6 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
codegen.build(),
job_id,
executable='python3',
detach_run=detach_run)
detach_run=detach_run,
extra_cmd=self._maybe_set_spot_state_code(
job_id, task))
11 changes: 10 additions & 1 deletion sky/spot/spot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
"""Stream logs by job id."""
controller_status = job_lib.get_status(job_id)
status_msg = ('[bold cyan]Waiting for controller process to be RUNNING '
'{status_str}[/]. It may take a few minutes.')
'{status_str}[/].')
status_display = log_utils.safe_rich_status(
status_msg.format(status_str=''))
with status_display:
Expand Down Expand Up @@ -530,6 +530,15 @@ def stream_logs_by_id(cls,
]
return cls._build(code)

@classmethod
def set_pending_state(cls, job_id: int, name: str,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
resources_str: str) -> str:
code = [
f'spot_state.set_pending('
f'{job_id}, {name!r}, {resources_str!r})',
]
return cls._build(code)

@classmethod
def _build(cls, code: List[str]) -> str:
code = cls._PREFIX + code
Expand Down