From 9d031e0406d9f0ccf32cb05f93faaf0a06af9dce Mon Sep 17 00:00:00 2001 From: Caroline Date: Thu, 22 Aug 2024 20:23:30 -0400 Subject: [PATCH] Update and reuse rsync method --- .../resources/hardware/sky/command_runner.py | 12 ++- runhouse/resources/hardware/sky_ssh_runner.py | 97 ++++--------------- 2 files changed, 27 insertions(+), 82 deletions(-) diff --git a/runhouse/resources/hardware/sky/command_runner.py b/runhouse/resources/hardware/sky/command_runner.py index 12017f0dd..421bf25e8 100644 --- a/runhouse/resources/hardware/sky/command_runner.py +++ b/runhouse/resources/hardware/sky/command_runner.py @@ -230,7 +230,10 @@ def _rsync( stream_logs: bool = True, max_retry: int = 1, prefix_command: Optional[str] = None, - get_remote_home_dir: Callable[[], str] = lambda: '~') -> None: + get_remote_home_dir: Callable[[], str] = lambda: '~', + filter_options: Optional[str] = None, # RH MODIFIED, + return_cmd: bool = False, # RH MODIFIED, + ) -> None: """Builds the rsync command.""" # Build command. rsync_command = [] @@ -239,7 +242,8 @@ def _rsync( rsync_command += ['rsync', RSYNC_DISPLAY_OPTION] # --filter - rsync_command.append(RSYNC_FILTER_OPTION) + addtl_filter_options = f" --filter='{filter_options}'" if filter_options else "" # RH MODIFIED + rsync_command.append(RSYNC_FILTER_OPTION + addtl_filter_options) if up: # Build --exclude-from argument. @@ -281,6 +285,10 @@ def _rsync( command = ' '.join(rsync_command) logger.debug(f'Running rsync command: {command}') + # RH MODIFIED: return command instead of running it + if return_cmd: + return command + backoff = common_utils.Backoff(initial_backoff=5, max_backoff_factor=5) assert max_retry > 0, f'max_retry {max_retry} must be positive.' while max_retry >= 0: diff --git a/runhouse/resources/hardware/sky_ssh_runner.py b/runhouse/resources/hardware/sky_ssh_runner.py index b81185274..bff7503c4 100644 --- a/runhouse/resources/hardware/sky_ssh_runner.py +++ b/runhouse/resources/hardware/sky_ssh_runner.py @@ -1,23 +1,17 @@ import logging import os -import pathlib import shlex -import time from typing import Dict, List, Optional, Tuple, Union from runhouse.constants import DEFAULT_DOCKER_CONTAINER_NAME from runhouse.logger import get_logger -from runhouse.resources.hardware.sky import common_utils, log_lib, subprocess_utils +from runhouse.resources.hardware.sky import log_lib, subprocess_utils from runhouse.resources.hardware.sky.command_runner import ( _DEFAULT_CONNECT_TIMEOUT, - GIT_EXCLUDE, KubernetesCommandRunner, - RSYNC_DISPLAY_OPTION, - RSYNC_EXCLUDE_OPTION, - RSYNC_FILTER_OPTION, ssh_options_list, SSHCommandRunner, SshMode, @@ -257,29 +251,6 @@ def rsync( Raises: exceptions.CommandError: rsync command failed. """ - # Build command. - # TODO(zhwu): This will print a per-file progress bar (with -P), - # shooting a lot of messages to the output. --info=progress2 is used - # to get a total progress bar, but it requires rsync>=3.1.0 and Mac - # OS has a default rsync==2.6.9 (16 years old). - rsync_command = ["rsync", RSYNC_DISPLAY_OPTION] - - # RH MODIFIED: add --filter option - addtl_filter_options = f" --filter='{filter_options}'" if filter_options else "" - rsync_command.append(RSYNC_FILTER_OPTION + addtl_filter_options) - - if up: - # The source is a local path, so we need to resolve it. - # --exclude-from - resolved_source = pathlib.Path(source).expanduser().resolve() - if (resolved_source / GIT_EXCLUDE).exists(): - # Ensure file exists; otherwise, rsync will error out. - rsync_command.append( - RSYNC_EXCLUDE_OPTION.format( - shlex.quote(str(resolved_source / GIT_EXCLUDE)) - ) - ) - if self._docker_ssh_proxy_command is not None: docker_ssh_proxy_command = self._docker_ssh_proxy_command(["ssh"]) else: @@ -294,54 +265,18 @@ def rsync( disable_control_master=self.disable_control_master, ) ) - rsync_command.append(f'-e "ssh {ssh_options}"') - # To support spaces in the path, we need to quote source and target. - # rsync doesn't support '~' in a quoted local path, but it is ok to - # have '~' in a quoted remote path. - if up: - full_source_str = str(resolved_source) - if resolved_source.is_dir(): - full_source_str = os.path.join(full_source_str, "") - rsync_command.extend( - [ - f"{full_source_str!r}", - f"{self.ssh_user}@{self.ip}:{target!r}", - ] - ) - else: - rsync_command.extend( - [ - f"{self.ssh_user}@{self.ip}:{source!r}", - f"{os.path.expanduser(target)!r}", - ] - ) - command = " ".join(rsync_command) - - # RH MODIFIED: return command instead of running it - if return_cmd: - return command - - backoff = common_utils.Backoff(initial_backoff=5, max_backoff_factor=5) - while max_retry >= 0: - returncode, _, stderr = log_lib.run_with_log( - command, - log_path=log_path, - stream_logs=stream_logs, - shell=True, - require_outputs=True, - ) - if returncode == 0: - break - max_retry -= 1 - time.sleep(backoff.current_backoff()) - - direction = "up" if up else "down" - error_msg = ( - f"Failed to rsync {direction}: {source} -> {target}. " - "Ensure that the network is stable, then retry." - ) - subprocess_utils.handle_returncode( - returncode, command, error_msg, stderr=stderr, stream_logs=stream_logs + rsh_option = f"ssh {ssh_options}" + return self._rsync( + source, + target, + node_destination=f"{self.ssh_user}@{self.ip}", + up=up, + rsh_option=rsh_option, + log_path=log_path, + stream_logs=stream_logs, + max_retry=max_retry, + filter_options=filter_options, + return_cmd=return_cmd, ) @@ -489,8 +424,8 @@ def rsync( log_path: str = os.devnull, stream_logs: bool = True, max_retry: int = 1, - filter_options: bool = False, # RH MODIFIED # TODO - return_cmd: bool = False, # RH MODIFIED # TODO + filter_options: bool = False, # RH MODIFIED + return_cmd: bool = False, # RH MODIFIED ) -> None: """Uses 'rsync' to sync 'source' to 'target'. Args: @@ -539,4 +474,6 @@ def get_remote_home_dir() -> str: # /~/xx, so we need to replace ~ with the remote home directory. We # only need to do this when ~ is at the beginning of the path. get_remote_home_dir=get_remote_home_dir, + filter_options=filter_options, + return_cmd=return_cmd, )