From f60fac366689b324c70480e37f824a946e5d2224 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Sun, 23 Oct 2022 08:54:44 -0700 Subject: [PATCH 1/7] WIP --- docs/source/reference/api.rst | 3 ++ sky/task.py | 66 ++++++++++++++++++++++------------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index 923e9560bfa..ca98ab649b3 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -63,6 +63,9 @@ sky.Task .. autoclass:: sky.Task :members: + :exclude-members: estimate_runtime + + .. automethod:: __init__ sky.Dag ~~~~~~~~~ diff --git a/sky/task.py b/sky/task.py index f4207cdedea..c7f39547bb2 100644 --- a/sky/task.py +++ b/sky/task.py @@ -70,7 +70,7 @@ def _is_valid_env_var(name: str) -> bool: class Task: - """Task: a coarse-grained stage in an application.""" + """Task: a coarse-grained stage in an application DAG.""" def __init__( self, @@ -86,35 +86,48 @@ def __init__( ): """Initializes a Task. - All fields are optional. `Task.run` is the actual program: either a + All fields are optional. ``Task.run`` is the actual program: either a shell command to run (str) or a command generator for different nodes (lambda; see below). - Before executing a Task, it is required to call Task.set_resources() to - assign resource requirements to this task. + Optionally, call ``Task.set_resources()`` to set the resource + requirements for this task. If not set, a default CPU-only requirement + is assumed (the same as ``sky cpunode``). + + Example: + >>> # A Task that will sync up local workdir '.', containing + >>> # requirements.txt and train.py. + >>> sky.Task(setup='pip install requirements.txt', + >>> run='python train.py', + >>> workdir='.') + >>> + >>> # A Task for provisioning a cluster. + >>> sky.Task(num_nodes=n).set_resources(...) Args: - name: A string name for the Task. - setup: A setup command, run under 'workdir' and before actually - executing the run command, 'run'. - run: Either a shell command (str) or a command generator (callable). - If latter, it must take a node rank and a list of node addresses as - input and return a shell command (str) (valid to return None for - some nodes, in which case no commands are run on them). Commands - will be run under 'workdir'. Note the command generator should be - self-contained. + name: A string name for the Task for display purposes. + setup: A setup command, which will be run before executing the run + commands ``run``, and executed under ``workdir``. + run: The actual command for the task. If not None, either a shell + command (str) or a command generator (callable). If latter, it + must take a node rank and a list of node addresses as input and + return a shell command (str) (valid to return None for some nodes, + in which case no commands are run on them). Run commands will be + run under ``workdir``. Note the command generator should be a + self-contained lambda. envs: A dictionary of environment variables to set before running the - setup and run command. - workdir: The local working directory. This directory and its files - will be synced to a location on the remote VM(s), and 'setup' and - 'run' commands will be run under that location (thus, they can rely - on relative paths when invoking binaries). + setup and run commands. + workdir: The local working directory. This directory will be synced + to a location on the remote VM(s), and ``setup`` and ``run`` + commands will be run under that location (thus, they can rely on + relative paths when invoking binaries). num_nodes: The number of nodes to provision for this Task. If None, treated as 1 node. If > 1, each node will execute its own - setup/run command; 'run' can either be a str, meaning all nodes get - the same command, or a lambda, as documented above. - docker_image: The base docker image that this Task will be built on. - In effect when LocalDockerBackend is used. Defaults to + setup/run command, where ``run`` can either be a str, meaning all + nodes get the same command, or a lambda, with the semantics + documented above. + docker_image: (Only in effect when LocalDockerBackend is used.) The + base docker image that this Task will be built on. Defaults to 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'. """ self.name = name @@ -207,7 +220,8 @@ def _validate(self): f'a symlink to a directory). {self.workdir} not found.') @staticmethod - def from_yaml(yaml_path): + def from_yaml(yaml_path: str) -> 'Task': + """Initializes a task from a task YAML.""" with open(os.path.expanduser(yaml_path), 'r') as f: # TODO(zongheng): use # https://github.com/yaml/pyyaml/issues/165#issuecomment-430074049 @@ -344,8 +358,10 @@ def num_nodes(self) -> int: def envs(self) -> Dict[str, str]: return self._envs - def set_envs(self, envs: Union[None, Tuple[Tuple[str, str]], Dict[str, - str]]): + def set_envs( + self, envs: Union[None, Tuple[Tuple[str, str]], Dict[str, + str]]) -> None: + """Sets the environment variables for the setup and run commands.""" if envs is None: self._envs = None return From 0fdcd643240b4d9c9395673abf9d06fd6e022b8f Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 24 Oct 2022 21:00:55 -0700 Subject: [PATCH 2/7] Polish sky.Task doc strings. --- sky/execution.py | 14 ++-- sky/task.py | 204 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 152 insertions(+), 66 deletions(-) diff --git a/sky/execution.py b/sky/execution.py index 4a2bbdb178a..250a5240fc6 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -299,12 +299,12 @@ def launch( (when specified, it is synced to remote cluster). The task undergoes job queue scheduling on the cluster. - Currently, the first argument must be a sky.Task, or (more advanced usage) - a sky.Dag; in the latter case, currently it must be a single task). - Support for pipelines/general DAGs are in experimental branches. + Currently, the first argument must be a sky.Task, or (EXPERIMENTAL advanced + usage) a sky.Dag. In the latter case, currently it must contain a single + task; support for pipelines/general DAGs are in experimental branches. Args: - task: sky.Task or sky.Dag to launch. + task: sky.Task, or sky.Dag (experimental; 1-task only) to launch. cluster_name: name of the cluster to create/reuse. If None, auto-generate a name. retry_until_up: whether to retry launching the cluster until it is @@ -397,7 +397,8 @@ def exec( # pylint: disable=redefined-builtin Use ``ssh my_cluster`` instead. Args: - task: sky.Task or sky.Dag containing the task to execute. + task: sky.Task, or sky.Dag (experimental; 1-task only) containing the + task to execute. cluster_name: name of an existing cluster to execute the task. down: Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the @@ -460,7 +461,8 @@ def spot_launch( Please refer to the sky.cli.spot_launch for the document. Args: - task: sky.Task or sky.Dag to launch as a managed spot job. + task: sky.Task, or sky.Dag (experimental; 1-task only) to launch as a + managed spot job. name: Name of the spot job. detach_run: Whether to detach the run. diff --git a/sky/task.py b/sky/task.py index c7f39547bb2..280a0b9b337 100644 --- a/sky/task.py +++ b/sky/task.py @@ -70,7 +70,7 @@ def _is_valid_env_var(name: str) -> bool: class Task: - """Task: a coarse-grained stage in an application DAG.""" + """Task: a computation to be run on the cloud.""" def __init__( self, @@ -94,6 +94,9 @@ def __init__( requirements for this task. If not set, a default CPU-only requirement is assumed (the same as ``sky cpunode``). + All setters of this class, ``Task.set_*()``, return ``self``, i.e., + they are fluent APIs and can be chained together. + Example: >>> # A Task that will sync up local workdir '.', containing >>> # requirements.txt and train.py. @@ -101,8 +104,11 @@ def __init__( >>> run='python train.py', >>> workdir='.') >>> - >>> # A Task for provisioning a cluster. - >>> sky.Task(num_nodes=n).set_resources(...) + >>> # An empty Task for provisioning a cluster. + >>> task = sky.Task(num_nodes=n).set_resources(...) + >>> + >>> # Chaining setters. + >>> sky.Task().set_resources(...).set_file_mounts(...) Args: name: A string name for the Task for display purposes. @@ -221,7 +227,15 @@ def _validate(self): @staticmethod def from_yaml(yaml_path: str) -> 'Task': - """Initializes a task from a task YAML.""" + """Initializes a task from a task YAML. + + Args: + yaml_path: file path to a valid task yaml file. + + Raises: + ValueError: if the path gets loaded into a str instead of a dict; or + if there are any other parsing errors. + """ with open(os.path.expanduser(yaml_path), 'r') as f: # TODO(zongheng): use # https://github.com/yaml/pyyaml/issues/165#issuecomment-430074049 @@ -359,12 +373,19 @@ def envs(self) -> Dict[str, str]: return self._envs def set_envs( - self, envs: Union[None, Tuple[Tuple[str, str]], Dict[str, - str]]) -> None: - """Sets the environment variables for the setup and run commands.""" + self, envs: Union[None, Tuple[Tuple[str, str]], + Dict[str, str]]) -> 'Task': + """Sets the environment variables for the setup and run commands. + + Returns: + self: The current task, with envs set. + + Raises: + ValueError: if various invalid inputs errors are detected. + """ if envs is None: self._envs = None - return + return self if isinstance(envs, (list, tuple)): keys = set(env[0] for env in envs) if len(keys) != len(envs): @@ -385,6 +406,7 @@ def set_envs( 'envs must be List[Tuple[str, str]] or Dict[str, str]: ' f'{envs}') self._envs = envs + return self @property def need_spot_recovery(self) -> bool: @@ -400,8 +422,8 @@ def num_nodes(self, num_nodes: Optional[int]) -> None: f'num_nodes should be a positive int. Got: {num_nodes}') self._num_nodes = num_nodes - # E.g., 's3://bucket', 'gs://bucket', or None. - def set_inputs(self, inputs, estimated_size_gigabytes): + def set_inputs(self, inputs, estimated_size_gigabytes) -> 'Task': + # E.g., 's3://bucket', 'gs://bucket', or None. self.inputs = inputs self.estimated_inputs_size_gigabytes = estimated_size_gigabytes return self @@ -413,7 +435,7 @@ def get_estimated_inputs_size_gigabytes(self): return self.estimated_inputs_size_gigabytes def get_inputs_cloud(self): - """Returns the cloud my inputs live in.""" + """EXPERIMENTAL: Returns the cloud my inputs live in.""" assert isinstance(self.inputs, str), self.inputs if self.inputs.startswith('s3:'): return clouds.AWS() @@ -423,7 +445,7 @@ def get_inputs_cloud(self): with ux_utils.print_exception_no_traceback(): raise ValueError(f'cloud path not supported: {self.inputs}') - def set_outputs(self, outputs, estimated_size_gigabytes): + def set_outputs(self, outputs, estimated_size_gigabytes) -> 'Task': self.outputs = outputs self.estimated_outputs_size_gigabytes = estimated_size_gigabytes return self @@ -434,14 +456,22 @@ def get_outputs(self): def get_estimated_outputs_size_gigabytes(self): return self.estimated_outputs_size_gigabytes - def set_resources(self, resources: Union['resources_lib.Resources', - Set['resources_lib.Resources']]): + def set_resources( + self, resources: Union['resources_lib.Resources', + Set['resources_lib.Resources']] + ) -> 'Task': """Sets the required resources to execute this task. + If this function is not called for a Task, default resource + requirements will be used (8 vCPUs). + Args: resources: either a sky.Resources, or a set of them. The latter case - indicates the user intent "pick any one of these resources" to run - a task. + is EXPERIMENTAL and indicates asking the optimizer to "pick any one + of these resources" to run this task. + + Returns: + self: The current task, with resources set. """ if isinstance(resources, sky.Resources): resources = {resources} @@ -451,13 +481,19 @@ def set_resources(self, resources: Union['resources_lib.Resources', def get_resources(self): return self.resources - def set_time_estimator(self, func): - """Sets a func mapping resources to estimated time (secs).""" + def set_time_estimator(self, func) -> 'Task': + """Sets a func mapping resources to estimated time (secs). + + This is EXPERIMENTAL. + """ self.time_estimator_func = func return self def estimate_runtime(self, resources): - """Returns a func mapping resources to estimated time (secs).""" + """Returns a func mapping resources to estimated time (secs). + + This is EXPERIMENTAL. + """ if self.time_estimator_func is None: raise NotImplementedError( 'Node [{}] does not have a cost model set; ' @@ -467,23 +503,32 @@ def estimate_runtime(self, resources): def set_storage_mounts( self, storage_mounts: Optional[Dict[str, storage_lib.Storage]], - ): - """Sets the storage mounts for this Task + ) -> 'Task': + """Sets the storage mounts for this task. - Advanced method for users. Storage mounts map a mount path on the Cloud - VM to a Storage object (see data/storage.py). The storage object can be - from a local folder or from an existing cloud bucket. + Storage mounts are a dictionary: ``{mount_path: sky.Storage object}``, + each of which mounts a sky.Storage object (a cloud object store bucket) + to a path inside the remote cluster. + + A sky.Storage object can be created by uploading from a local directory + (setting ``source``), or backed by an existing cloud bucket (setting + ``name`` to the bucket name; or setting ``source`` to the bucket URI). Example: - task.set_storage_mounts({ - '/tmp/imagenet/': \ - Storage(name='imagenet', source='s3://imagenet-bucket'): - }) + >>> task.set_storage_mounts({ + >>> '/tmp/imagenet/': sky.Storage(source='/data/imagenet'), + >>> }) Args: - storage_mounts: a dict of {mount_path: Storage}, where mount_path - is the path on the Cloud VM where the Storage object will be - mounted on + storage_mounts: an optional dict of ``{mount_path: sky.Storage + object}``, where mount_path is the path inside the remote VM(s) + where the Storage object will be mounted on. + + Returns: + self: The current task, with storage mounts set. + + Raises: + ValueError: if input paths are invalid. """ if storage_mounts is None: self.storage_mounts = None @@ -510,9 +555,27 @@ def set_storage_mounts( self.storage_mounts = storage_mounts return self - def update_storage_mounts(self, storage_mounts: Dict[str, - storage_lib.Storage]): - """Updates the storage mounts for this Task""" + def update_storage_mounts( + self, storage_mounts: Dict[str, storage_lib.Storage]) -> 'Task': + """Updates the storage mounts for this task. + + Different from set_storage_mounts(), this function updates into the + existing storage_mounts (calls ``dict.update()``), rather than + overwritting it. + + This should be called before provisioning in order to take effect. + + Args: + storage_mounts: an optional dict of ``{mount_path: sky.Storage + object}``, where mount_path is the path inside the remote VM(s) + where the Storage object will be mounted on. + + Returns: + self: The current task, with storage mounts updated. + + Raises: + ValueError: if input paths are invalid. + """ if not storage_mounts: return self task_storage_mounts = self.storage_mounts if self.storage_mounts else {} @@ -555,8 +618,12 @@ def get_preferred_store_type(self) -> storage_lib.StoreType: return store_type def sync_storage_mounts(self) -> None: - """Syncs storage mounts: sync files/dirs to cloud storage.""" + """(INTERNAL) Eagerly syncs storage mounts to cloud storage. + After syncing up, COPY-mode storage mounts are translated into regular + file_mounts of the form ``{ /remote/path: {s3,gs,..}:// + }``. + """ for storage in self.storage_mounts.values(): if len(storage.stores) == 0: store_type = self.get_preferred_store_type() @@ -597,29 +664,36 @@ def sync_storage_mounts(self) -> None: raise ValueError(f'Storage Type {store_type} ' 'does not exist!') - def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> None: - """Sets the file mounts for this Task. + def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> 'Task': + """Sets the file mounts for this task. - File mounts are a dictionary of { remote_path: local_path/cloud URI }. + Useful for syncing datasets, dotfiles, etc. + + File mounts are a dictionary: ``{remote_path: local_path/cloud URI}``. Local (or cloud) files/directories will be synced to the specified paths on the remote VM(s) where this Task will run. - Used for syncing datasets, dotfiles, etc. - - Paths cannot end with a slash (for clarity). + Neither source or destimation paths can end with a slash. Example: - task.set_file_mounts({ - '~/.dotfile': '/local/.dotfile', - # /remote/dir/ will contain the contents of /local/dir/. - '/remote/dir': '/local/dir', - }) + >>> task.set_file_mounts({ + >>> '~/.dotfile': '/local/.dotfile', + >>> # /remote/dir/ will contain the contents of /local/dir/. + >>> '/remote/dir': '/local/dir', + >>> }) Args: - file_mounts: either None or a dict of { remote_path: local_path/cloud - URI }, where remote is the VM on which this Task will eventually - run on, and local is the node from which the task is launched. + file_mounts: an optional dict of ``{remote_path: local_path/cloud + URI}``, where remote means the VM(s) on which this Task will + eventually run on, and local means the node from which the task is + launched. + + Returns: + self: the current task, with file mounts set. + + Raises: + ValueError: if input paths are invalid. """ if file_mounts is None: self.file_mounts = None @@ -658,22 +732,32 @@ def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> None: self.file_mounts = file_mounts return self - def update_file_mounts(self, file_mounts: Dict[str, str]): - """Updates the file mounts for this Task. + def update_file_mounts(self, file_mounts: Dict[str, str]) -> 'Task': + """Updates the file mounts for this task. - This should be run before provisioning. + Different from set_file_mounts(), this function updates into the + existing file_mounts (calls ``dict.update()``), rather than + overwritting it. + + This should be called before provisioning in order to take effect. Example: - task.update_file_mounts({ - '~/.config': '~/Documents/config', - '/tmp/workdir': '/local/workdir/cnn-cifar10', - }) + >>> task.update_file_mounts({ + >>> '~/.config': '~/Documents/config', + >>> '/tmp/workdir': '/local/workdir/cnn-cifar10', + >>> }) Args: - file_mounts: a dict of { remote_path: local_path }, where remote is - the VM on which this Task will eventually run on, and local is the - node from which the task is launched. + file_mounts: a dict of ``{remote_path: local_path/cloud URI}``, where + remote means the VM(s) on which this Task will eventually run on, + and local means the node from which the task is launched. + + Returns: + self: the current task, with file mounts updated. + + Raises: + ValueError: if input paths are invalid. """ if self.file_mounts is None: self.file_mounts = {} @@ -684,7 +768,7 @@ def update_file_mounts(self, file_mounts: Dict[str, str]): def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: """Returns file mounts of the form (dst=VM path, src=local path). - Any cloud object store URLs (gs://, s3://, etc.), either as source or + Any cloud object store URIs (gs://, s3://, etc.), either as source or destination, are not included. """ if self.file_mounts is None: From 0f48992560d3e7b5ab10426d4ac5076d1951b908 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 24 Oct 2022 21:01:07 -0700 Subject: [PATCH 3/7] docs: expose Task (a subset of methods); hide Dag. --- docs/source/reference/api.rst | 16 +++------------- docs/source/reference/cli.rst | 2 +- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index ca98ab649b3..52ca64453ba 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -18,7 +18,7 @@ Core API ----------- sky.launch -~~~~~~~~ +~~~~~~~~~~ .. autofunction:: sky.launch @@ -54,21 +54,11 @@ sky.autostop .. _sky-dag-ref: -Task and DAG +Task ----------------- - -sky.Task -~~~~~~~~~ - .. autoclass:: sky.Task :members: - :exclude-members: estimate_runtime + :exclude-members: estimate_runtime, get_inputs_cloud, set_time_estimator, sync_storage_mounts .. automethod:: __init__ - -sky.Dag -~~~~~~~~~ - -.. autoclass:: sky.Dag - :members: diff --git a/docs/source/reference/cli.rst b/docs/source/reference/cli.rst index ce1d2a7d707..2f5382ca3fc 100644 --- a/docs/source/reference/cli.rst +++ b/docs/source/reference/cli.rst @@ -1,7 +1,7 @@ .. _cli: Command Line Interface -============= +========================== Core CLI --------- From 1a4fa741667010e09e5b5ddc1721f0b036f6bab7 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Mon, 24 Oct 2022 21:25:33 -0700 Subject: [PATCH 4/7] Tweak Task method order; in docs display methods by source order. --- docs/source/conf.py | 23 ++-- sky/task.py | 292 ++++++++++++++++++++++---------------------- 2 files changed, 160 insertions(+), 155 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 8d19677b652..6f7ceb4e9f7 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -52,17 +52,22 @@ autosummary_generate = True napolean_use_rtype = False +# -- Options for autodoc + +# Python methods should be presented in source code order +autodoc_member_order = 'bysource' + # -- Options for HTML output html_theme = 'sphinx_book_theme' html_theme_options = { # 'show_toc_level': 2, 'logo_only': True, - "repository_url": "https://github.com/skypilot-org/skypilot", - "use_repository_button": True, - "use_issues_button": True, - "use_edit_page_button": True, - "path_to_docs": "docs/source", + 'repository_url': 'https://github.com/skypilot-org/skypilot', + 'use_repository_button': True, + 'use_issues_button': True, + 'use_edit_page_button': True, + 'path_to_docs': 'docs/source', } # -- Options for EPUB output @@ -76,13 +81,13 @@ # The name of an image file (relative to this directory) to place at the top # of the sidebar. -html_logo = "images/skypilot-wide-light-1k.png" +html_logo = 'images/skypilot-wide-light-1k.png' # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico), 16x16 or 32x32 pixels. -html_favicon = "_static/favicon.ico" +html_favicon = '_static/favicon.ico' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +# so a file named 'default.css' will overwrite the builtin 'default.css'. +html_static_path = ['_static'] diff --git a/sky/task.py b/sky/task.py index 280a0b9b337..461cd5334ff 100644 --- a/sky/task.py +++ b/sky/task.py @@ -132,9 +132,9 @@ def __init__( setup/run command, where ``run`` can either be a str, meaning all nodes get the same command, or a lambda, with the semantics documented above. - docker_image: (Only in effect when LocalDockerBackend is used.) The - base docker image that this Task will be built on. Defaults to - 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'. + docker_image: (EXPERIMENTAL: Only in effect when LocalDockerBackend + is used.) The base docker image that this Task will be built on. + Defaults to 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'. """ self.name = name self.run = run @@ -323,47 +323,6 @@ def from_yaml(yaml_path: str) -> 'Task': assert not config, f'Invalid task args: {config.keys()}' return task - def to_yaml_config(self) -> Dict[str, Any]: - """Returns a yaml-style dict representation of the task.""" - config = dict() - - def add_if_not_none(key, value): - if value is not None: - config[key] = value - - add_if_not_none('name', self.name) - - if self.resources is not None: - assert len(self.resources) == 1 - resources = list(self.resources)[0] - add_if_not_none('resources', resources.to_yaml_config()) - add_if_not_none('num_nodes', self.num_nodes) - - if self.inputs is not None: - add_if_not_none('inputs', - {self.inputs: self.estimated_inputs_size_gigabytes}) - if self.outputs is not None: - add_if_not_none( - 'outputs', - {self.outputs: self.estimated_outputs_size_gigabytes}) - - add_if_not_none('setup', self.setup) - add_if_not_none('workdir', self.workdir) - add_if_not_none('run', self.run) - add_if_not_none('envs', self.envs) - - add_if_not_none('file_mounts', dict()) - - if self.file_mounts is not None: - config['file_mounts'].update(self.file_mounts) - - if self.storage_mounts is not None: - config['file_mounts'].update({ - mount_path: storage.to_yaml_config() - for mount_path, storage in self.storage_mounts.items() - }) - return config - @property def num_nodes(self) -> int: return self._num_nodes @@ -375,7 +334,7 @@ def envs(self) -> Dict[str, str]: def set_envs( self, envs: Union[None, Tuple[Tuple[str, str]], Dict[str, str]]) -> 'Task': - """Sets the environment variables for the setup and run commands. + """Sets the environment variables for use inside the setup/run commands. Returns: self: The current task, with envs set. @@ -500,6 +459,107 @@ def estimate_runtime(self, resources): 'call set_time_estimator() first'.format(self)) return self.time_estimator_func(resources) + def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> 'Task': + """Sets the file mounts for this task. + + Useful for syncing datasets, dotfiles, etc. + + File mounts are a dictionary: ``{remote_path: local_path/cloud URI}``. + Local (or cloud) files/directories will be synced to the specified + paths on the remote VM(s) where this Task will run. + + Neither source or destimation paths can end with a slash. + + Example: + + >>> task.set_file_mounts({ + >>> '~/.dotfile': '/local/.dotfile', + >>> # /remote/dir/ will contain the contents of /local/dir/. + >>> '/remote/dir': '/local/dir', + >>> }) + + Args: + file_mounts: an optional dict of ``{remote_path: local_path/cloud + URI}``, where remote means the VM(s) on which this Task will + eventually run on, and local means the node from which the task is + launched. + + Returns: + self: the current task, with file mounts set. + + Raises: + ValueError: if input paths are invalid. + """ + if file_mounts is None: + self.file_mounts = None + return self + for target, source in file_mounts.items(): + if target.endswith('/') or source.endswith('/'): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'File mount paths cannot end with a slash ' + '(try "/mydir: /mydir" or "/myfile: /myfile"). ' + f'Found: target={target} source={source}') + if data_utils.is_cloud_store_url(target): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'File mount destination paths cannot be cloud storage') + if not data_utils.is_cloud_store_url(source): + if not os.path.exists( + os.path.abspath(os.path.expanduser(source))): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'File mount source {source!r} does not exist ' + 'locally. To fix: check if it exists, and correct ' + 'the path.') + # TODO(zhwu): /home/username/sky_workdir as the target path need + # to be filtered out as well. + if (target == constants.SKY_REMOTE_WORKDIR and + self.workdir is not None): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Cannot use {constants.SKY_REMOTE_WORKDIR!r} as a ' + 'destination path of a file mount, as it will be used ' + 'by the workdir. If uploading a file/folder to the ' + 'workdir is needed, please specify the full path to ' + 'the file/folder.') + + self.file_mounts = file_mounts + return self + + def update_file_mounts(self, file_mounts: Dict[str, str]) -> 'Task': + """Updates the file mounts for this task. + + Different from set_file_mounts(), this function updates into the + existing file_mounts (calls ``dict.update()``), rather than + overwritting it. + + This should be called before provisioning in order to take effect. + + Example: + + >>> task.update_file_mounts({ + >>> '~/.config': '~/Documents/config', + >>> '/tmp/workdir': '/local/workdir/cnn-cifar10', + >>> }) + + Args: + file_mounts: a dict of ``{remote_path: local_path/cloud URI}``, where + remote means the VM(s) on which this Task will eventually run on, + and local means the node from which the task is launched. + + Returns: + self: the current task, with file mounts updated. + + Raises: + ValueError: if input paths are invalid. + """ + if self.file_mounts is None: + self.file_mounts = {} + self.file_mounts.update(file_mounts) + # For validation logic: + return self.set_file_mounts(self.file_mounts) + def set_storage_mounts( self, storage_mounts: Optional[Dict[str, storage_lib.Storage]], @@ -664,107 +724,6 @@ def sync_storage_mounts(self) -> None: raise ValueError(f'Storage Type {store_type} ' 'does not exist!') - def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> 'Task': - """Sets the file mounts for this task. - - Useful for syncing datasets, dotfiles, etc. - - File mounts are a dictionary: ``{remote_path: local_path/cloud URI}``. - Local (or cloud) files/directories will be synced to the specified - paths on the remote VM(s) where this Task will run. - - Neither source or destimation paths can end with a slash. - - Example: - - >>> task.set_file_mounts({ - >>> '~/.dotfile': '/local/.dotfile', - >>> # /remote/dir/ will contain the contents of /local/dir/. - >>> '/remote/dir': '/local/dir', - >>> }) - - Args: - file_mounts: an optional dict of ``{remote_path: local_path/cloud - URI}``, where remote means the VM(s) on which this Task will - eventually run on, and local means the node from which the task is - launched. - - Returns: - self: the current task, with file mounts set. - - Raises: - ValueError: if input paths are invalid. - """ - if file_mounts is None: - self.file_mounts = None - return self - for target, source in file_mounts.items(): - if target.endswith('/') or source.endswith('/'): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'File mount paths cannot end with a slash ' - '(try "/mydir: /mydir" or "/myfile: /myfile"). ' - f'Found: target={target} source={source}') - if data_utils.is_cloud_store_url(target): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'File mount destination paths cannot be cloud storage') - if not data_utils.is_cloud_store_url(source): - if not os.path.exists( - os.path.abspath(os.path.expanduser(source))): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - f'File mount source {source!r} does not exist ' - 'locally. To fix: check if it exists, and correct ' - 'the path.') - # TODO(zhwu): /home/username/sky_workdir as the target path need - # to be filtered out as well. - if (target == constants.SKY_REMOTE_WORKDIR and - self.workdir is not None): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - f'Cannot use {constants.SKY_REMOTE_WORKDIR!r} as a ' - 'destination path of a file mount, as it will be used ' - 'by the workdir. If uploading a file/folder to the ' - 'workdir is needed, please specify the full path to ' - 'the file/folder.') - - self.file_mounts = file_mounts - return self - - def update_file_mounts(self, file_mounts: Dict[str, str]) -> 'Task': - """Updates the file mounts for this task. - - Different from set_file_mounts(), this function updates into the - existing file_mounts (calls ``dict.update()``), rather than - overwritting it. - - This should be called before provisioning in order to take effect. - - Example: - - >>> task.update_file_mounts({ - >>> '~/.config': '~/Documents/config', - >>> '/tmp/workdir': '/local/workdir/cnn-cifar10', - >>> }) - - Args: - file_mounts: a dict of ``{remote_path: local_path/cloud URI}``, where - remote means the VM(s) on which this Task will eventually run on, - and local means the node from which the task is launched. - - Returns: - self: the current task, with file mounts updated. - - Raises: - ValueError: if input paths are invalid. - """ - if self.file_mounts is None: - self.file_mounts = {} - self.file_mounts.update(file_mounts) - # For validation logic: - return self.set_file_mounts(self.file_mounts) - def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: """Returns file mounts of the form (dst=VM path, src=local path). @@ -795,6 +754,47 @@ def get_cloud_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: d[k] = v return d + def to_yaml_config(self) -> Dict[str, Any]: + """Returns a yaml-style dict representation of the task.""" + config = dict() + + def add_if_not_none(key, value): + if value is not None: + config[key] = value + + add_if_not_none('name', self.name) + + if self.resources is not None: + assert len(self.resources) == 1 + resources = list(self.resources)[0] + add_if_not_none('resources', resources.to_yaml_config()) + add_if_not_none('num_nodes', self.num_nodes) + + if self.inputs is not None: + add_if_not_none('inputs', + {self.inputs: self.estimated_inputs_size_gigabytes}) + if self.outputs is not None: + add_if_not_none( + 'outputs', + {self.outputs: self.estimated_outputs_size_gigabytes}) + + add_if_not_none('setup', self.setup) + add_if_not_none('workdir', self.workdir) + add_if_not_none('run', self.run) + add_if_not_none('envs', self.envs) + + add_if_not_none('file_mounts', dict()) + + if self.file_mounts is not None: + config['file_mounts'].update(self.file_mounts) + + if self.storage_mounts is not None: + config['file_mounts'].update({ + mount_path: storage.to_yaml_config() + for mount_path, storage in self.storage_mounts.items() + }) + return config + def __rshift__(self, b): sky.dag.get_current_dag().add_edge(self, b) From 31898c576caa798628447b4cab892b14c379abcf Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 25 Oct 2022 10:08:16 -0700 Subject: [PATCH 5/7] CLI docs: tweak order; tweak `spot launch`. --- docs/source/reference/cli.rst | 65 +++++++++++++++++------------------ sky/cli.py | 22 ++++++++++-- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/docs/source/reference/cli.rst b/docs/source/reference/cli.rst index 2f5382ca3fc..7b131e36b39 100644 --- a/docs/source/reference/cli.rst +++ b/docs/source/reference/cli.rst @@ -34,24 +34,6 @@ Core CLI :prog: sky autostop :nested: full - -Interactive Node CLI ------------------------ - -.. click:: sky.cli:cpunode - :prog: sky cpunode - :nested: full - -.. _sky-gpunode: -.. click:: sky.cli:gpunode - :prog: sky gpunode - :nested: full - -.. click:: sky.cli:tpunode - :prog: sky tpunode - :nested: full - - Job Queue CLI -------------- @@ -68,17 +50,6 @@ Job Queue CLI :nested: full -Storage CLI ------------- - -.. click:: sky.cli:storage_ls - :prog: sky storage ls - :nested: full - -.. click:: sky.cli:storage_delete - :prog: sky storage delete - :nested: full - Managed Spot Jobs CLI --------------------------- @@ -98,14 +69,42 @@ Managed Spot Jobs CLI :prog: sky spot logs :nested: full -Miscellaneous -------------- +Interactive Node CLI +----------------------- -.. click:: sky.cli:check - :prog: sky check +.. click:: sky.cli:cpunode + :prog: sky cpunode + :nested: full + +.. _sky-gpunode: +.. click:: sky.cli:gpunode + :prog: sky gpunode + :nested: full + +.. click:: sky.cli:tpunode + :prog: sky tpunode + :nested: full + + +Storage CLI +------------ + +.. click:: sky.cli:storage_ls + :prog: sky storage ls :nested: full +.. click:: sky.cli:storage_delete + :prog: sky storage delete + :nested: full + +Utils: ``show-gpus``, ``check`` +--------------------------------------- + .. click:: sky.cli:show_gpus :prog: sky show-gpus :nested: full + +.. click:: sky.cli:check + :prog: sky check + :nested: full diff --git a/sky/cli.py b/sky/cli.py index 025010bf68c..1ccb421c0ff 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -2361,10 +2361,15 @@ def check(): def show_gpus(gpu_name: Optional[str], all: bool, cloud: Optional[str]): # pylint: disable=redefined-builtin """Show supported GPU/TPU/accelerators. + The names and counts shown can be set in the ``accelerators`` field in task + YAMLs, or in the ``--gpus`` flag in CLI commands. For example, if this + table shows 8x V100s are supported, then the string ``V100:8`` will be + accepted by the above. + To show the detailed information of a GPU/TPU type (which clouds offer it, the quantity in each VM type, etc.), use ``sky show-gpus ``. - To show all GPUs, including less common ones and their detailed + To show all accelerators, including less common ones and their detailed information, use ``sky show-gpus --all``. NOTE: The price displayed for each instance type is the lowest across all @@ -2691,7 +2696,20 @@ def spot_launch( retry_until_up: bool, yes: bool, ): - """Launch a managed spot job.""" + """Launch a managed spot job from a YAML or a command. + + If ENTRYPOINT points to a valid YAML file, it is read in as the task + specification. Otherwise, it is interpreted as a bash command. + + Examples: + + .. code-block:: bash + + # You can use normal task YAMLs. + sky spot launch task.yaml + + sky spot launch 'echo hello!' + """ if name is None: name = backend_utils.generate_cluster_name() else: From fb9fa7e72f9758f49e42ae4f4932dbacf9f21674 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 25 Oct 2022 14:47:07 -0700 Subject: [PATCH 6/7] Address comments. --- docs/source/reference/api.rst | 2 +- sky/task.py | 27 ++++++++++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index 52ca64453ba..de8e9aea4c5 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -59,6 +59,6 @@ Task .. autoclass:: sky.Task :members: - :exclude-members: estimate_runtime, get_inputs_cloud, set_time_estimator, sync_storage_mounts + :exclude-members: estimate_runtime, get_cloud_to_remote_file_mounts, get_inputs_cloud, get_local_to_remote_file_mounts, set_time_estimator, sync_storage_mounts, to_yaml_config .. automethod:: __init__ diff --git a/sky/task.py b/sky/task.py index 461cd5334ff..d0f87ad8502 100644 --- a/sky/task.py +++ b/sky/task.py @@ -229,6 +229,10 @@ def _validate(self): def from_yaml(yaml_path: str) -> 'Task': """Initializes a task from a task YAML. + Example: + + >>> task = sky.Task.from_yaml('/path/to/task.yaml') + Args: yaml_path: file path to a valid task yaml file. @@ -336,6 +340,10 @@ def set_envs( Dict[str, str]]) -> 'Task': """Sets the environment variables for use inside the setup/run commands. + Args: + envs: (optional) either a list of ``(env_name, value)`` or a dict + ``{env_name: value}``. + Returns: self: The current task, with envs set. @@ -426,8 +434,8 @@ def set_resources( Args: resources: either a sky.Resources, or a set of them. The latter case - is EXPERIMENTAL and indicates asking the optimizer to "pick any one - of these resources" to run this task. + is EXPERIMENTAL and indicates asking the optimizer to "pick the + best of these resources" to run this task. Returns: self: The current task, with resources set. @@ -440,7 +448,8 @@ def set_resources( def get_resources(self): return self.resources - def set_time_estimator(self, func) -> 'Task': + def set_time_estimator(self, func: Callable[['sky.Resources'], + int]) -> 'Task': """Sets a func mapping resources to estimated time (secs). This is EXPERIMENTAL. @@ -576,7 +585,8 @@ def set_storage_mounts( Example: >>> task.set_storage_mounts({ - >>> '/tmp/imagenet/': sky.Storage(source='/data/imagenet'), + >>> '/remote/imagenet/': sky.Storage(name='my-bucket', + >>> source='/local/imagenet'), >>> }) Args: @@ -729,6 +739,8 @@ def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: Any cloud object store URIs (gs://, s3://, etc.), either as source or destination, are not included. + + INTERNAL: this method is internal-facing. """ if self.file_mounts is None: return None @@ -744,6 +756,8 @@ def get_cloud_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: Local-to-remote file mounts are excluded (handled by get_local_to_remote_file_mounts()). + + INTERNAL: this method is internal-facing. """ if self.file_mounts is None: return None @@ -755,7 +769,10 @@ def get_cloud_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: return d def to_yaml_config(self) -> Dict[str, Any]: - """Returns a yaml-style dict representation of the task.""" + """Returns a yaml-style dict representation of the task. + + INTERNAL: this method is internal-facing. + """ config = dict() def add_if_not_none(key, value): From 9431625269e8bc74b262c4e5c244168dc404cb7a Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 25 Oct 2022 22:09:27 -0700 Subject: [PATCH 7/7] Code block formatting. --- sky/execution.py | 16 ++++++-------- sky/task.py | 57 +++++++++++++++++++++++++++--------------------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/sky/execution.py b/sky/execution.py index 250a5240fc6..5cf12e6d960 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -333,15 +333,13 @@ def launch( no_setup: if True, do not re-run setup commands. Example: - >>> import sky - >>> task = sky.Task(run='echo hello SkyPilot') - >>> task.set_resources( - ... sky.Resources( - ... cloud=sky.AWS(), - ... accelerators='V100:4' - ... ) - ... ) - >>> sky.launch(task, cluster_name='my-cluster') + .. code-block:: python + + import sky + task = sky.Task(run='echo hello SkyPilot') + task.set_resources( + sky.Resources(cloud=sky.AWS(), accelerators='V100:4')) + sky.launch(task, cluster_name='my-cluster') """ entrypoint = task diff --git a/sky/task.py b/sky/task.py index d0f87ad8502..416300bb25e 100644 --- a/sky/task.py +++ b/sky/task.py @@ -98,17 +98,19 @@ def __init__( they are fluent APIs and can be chained together. Example: - >>> # A Task that will sync up local workdir '.', containing - >>> # requirements.txt and train.py. - >>> sky.Task(setup='pip install requirements.txt', - >>> run='python train.py', - >>> workdir='.') - >>> - >>> # An empty Task for provisioning a cluster. - >>> task = sky.Task(num_nodes=n).set_resources(...) - >>> - >>> # Chaining setters. - >>> sky.Task().set_resources(...).set_file_mounts(...) + .. code-block:: python + + # A Task that will sync up local workdir '.', containing + # requirements.txt and train.py. + sky.Task(setup='pip install requirements.txt', + run='python train.py', + workdir='.') + + # An empty Task for provisioning a cluster. + task = sky.Task(num_nodes=n).set_resources(...) + + # Chaining setters. + sky.Task().set_resources(...).set_file_mounts(...) Args: name: A string name for the Task for display purposes. @@ -230,8 +232,9 @@ def from_yaml(yaml_path: str) -> 'Task': """Initializes a task from a task YAML. Example: + .. code-block:: python - >>> task = sky.Task.from_yaml('/path/to/task.yaml') + task = sky.Task.from_yaml('/path/to/task.yaml') Args: yaml_path: file path to a valid task yaml file. @@ -480,12 +483,13 @@ def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> 'Task': Neither source or destimation paths can end with a slash. Example: + .. code-block:: python - >>> task.set_file_mounts({ - >>> '~/.dotfile': '/local/.dotfile', - >>> # /remote/dir/ will contain the contents of /local/dir/. - >>> '/remote/dir': '/local/dir', - >>> }) + task.set_file_mounts({ + '~/.dotfile': '/local/.dotfile', + # /remote/dir/ will contain the contents of /local/dir/. + '/remote/dir': '/local/dir', + }) Args: file_mounts: an optional dict of ``{remote_path: local_path/cloud @@ -546,11 +550,12 @@ def update_file_mounts(self, file_mounts: Dict[str, str]) -> 'Task': This should be called before provisioning in order to take effect. Example: + .. code-block:: python - >>> task.update_file_mounts({ - >>> '~/.config': '~/Documents/config', - >>> '/tmp/workdir': '/local/workdir/cnn-cifar10', - >>> }) + task.update_file_mounts({ + '~/.config': '~/Documents/config', + '/tmp/workdir': '/local/workdir/cnn-cifar10', + }) Args: file_mounts: a dict of ``{remote_path: local_path/cloud URI}``, where @@ -584,10 +589,12 @@ def set_storage_mounts( ``name`` to the bucket name; or setting ``source`` to the bucket URI). Example: - >>> task.set_storage_mounts({ - >>> '/remote/imagenet/': sky.Storage(name='my-bucket', - >>> source='/local/imagenet'), - >>> }) + .. code-block:: python + + task.set_storage_mounts({ + '/remote/imagenet/': sky.Storage(name='my-bucket', + source='/local/imagenet'), + }) Args: storage_mounts: an optional dict of ``{mount_path: sky.Storage