Skip to content

Commit

Permalink
convert folder to a module
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Jul 11, 2024
1 parent cf92f10 commit 4f96651
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 123 deletions.
2 changes: 1 addition & 1 deletion runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import List

RESERVED_SYSTEM_NAMES: List[str] = ["file", "s3", "gs", "azure", "here", "ssh", "sftp"]
RESERVED_SYSTEM_NAMES: List[str] = ["file", "s3", "gs", "azure", "here"]
CLUSTER_CONFIG_PATH: str = "~/.rh/cluster_config.json"
CONFIG_YAML_PATH: str = "~/.rh/config.yaml"
SERVER_LOGFILE_PATH = "~/.rh/server.log"
Expand Down
3 changes: 2 additions & 1 deletion runhouse/resources/envs/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from runhouse.logger import logger
from runhouse.resources.envs.utils import _process_env_vars, run_setup_command
from runhouse.resources.folders import Folder
from runhouse.resources.hardware import _get_cluster_from, Cluster
from runhouse.resources.packages import Package
from runhouse.resources.resource import Resource
Expand Down Expand Up @@ -107,6 +106,8 @@ def reqs(self, reqs):

def _reqs_to(self, system: Union[str, Cluster], path=None, mount=False):
"""Send self.reqs to the system (cluster or file system)"""
from runhouse.resources.folders import Folder

new_reqs = []
for req in self.reqs:
if isinstance(req, str):
Expand Down
134 changes: 89 additions & 45 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import os
import pickle
import shlex
import shutil
import subprocess
Expand All @@ -11,12 +12,13 @@

from runhouse.logger import logger
from runhouse.resources.hardware import _current_cluster, _get_cluster_from, Cluster
from runhouse.resources.module import Module
from runhouse.resources.resource import Resource
from runhouse.rns.utils.api import generate_uuid
from runhouse.utils import locate_working_dir


class Folder(Resource):
class Folder(Module):
RESOURCE_TYPE = "folder"
DEFAULT_FS = "file"
CLUSTER_FS = "ssh"
Expand All @@ -39,19 +41,10 @@ def __init__(
.. note::
To build a folder, please use the factory method :func:`folder`.
"""
super().__init__(name=name, dryrun=dryrun)
super().__init__(name=name, dryrun=dryrun, system=system)

self._system = None
self._filesystem = None

current_cluster_config = _current_cluster(key="config")
if current_cluster_config and system is None:
self.system = Cluster.from_config(current_cluster_config)
elif isinstance(system, dict):
self.system = Cluster.from_config(system)
else:
self.system = system or self.DEFAULT_FS

# TODO [DG] Should we ever be allowing this to be None?
if path is None:
self._path = self.default_path(self.rns_address, system)
Expand Down Expand Up @@ -126,7 +119,7 @@ def path(self):
if self._path is not None:
if self.system == Folder.DEFAULT_FS:
return str(Path(self._path).expanduser())
elif self._fs_str == self.CLUSTER_FS and self._path.startswith("~/"):
elif self._fs_str == self.CLUSTER_FS and str(self._path).startswith("~/"):
# sftp takes relative paths to the home directory but doesn't understand '~'
return str(self._path[2:])
return str(self._path)
Expand All @@ -138,15 +131,7 @@ def path(self, path):
self._path = path
self._local_mount_path = None

@property
def system(self):
return self._system

@system.setter
def system(self, data_source):
self._system = data_source
self._filesystem = None

# TODO [JL] we can probably kill this entirely
@property
def data_config(self):
if isinstance(self.system, Resource): # if system is a cluster
Expand Down Expand Up @@ -268,7 +253,13 @@ def to(
path: Optional[Union[str, Path]] = None,
data_config: Optional[dict] = None,
):
"""Copy the folder to a new filesystem, and return a new Folder object pointing to the new location."""
"""Copy the folder to a new filesystem.
Currently supported: ``here``, ``file``, ``gs``, ``s3``, or a cluster.
Example:
>>> local_folder = rh.folder(path="/my/local/folder")
>>> s3_folder = local_folder.to("s3")
"""
if system == "here":
current_cluster_config = _current_cluster(key="config")
if current_cluster_config:
Expand All @@ -277,6 +268,20 @@ def to(
system = "file"
path = str(Path.cwd() / self.path.split("/")[-1]) if path is None else path

if isinstance(system, Cluster):
# (the folders system may already be a cluster, which would skip the install)
# Make sure the top level directory exists on the cluster before creating the module on the cluster

# TODO: find a better way to ensure the top level dir exists on the cluster before sending the module
if self.path.startswith("/") or self.path.startswith("~"):
relative_path = os.path.relpath(self.path, str(Path.home()))
remote_path = f"~/{relative_path}"
system.run([f"mkdir -p {remote_path}"])

# Note: setting `force_install` to ensure the module gets installed the cluster
# TODO: this copies the files to the wrong location on the cluster
return super().to(system=system, force_install=True)

path = str(
path or self.default_path(self.rns_address, system)
) # Make sure it's a string and not a Path
Expand All @@ -294,19 +299,14 @@ def to(

if system == "file":
return self._to_local(dest_path=path, data_config=data_config)
elif isinstance(system, Cluster): # If system is a cluster
return self._to_cluster(dest_cluster=system, path=path)
elif system in ["s3", "gs", "azure"]:
elif system in ["s3", "gs"]:
return self._to_data_store(
system=system, data_store_path=path, data_config=data_config
)
else:
self._fsspec_copy(system, path, data_config)
new_folder = copy.deepcopy(self)
new_folder.path = path
new_folder.system = system
new_folder.data_config = data_config or {}
return new_folder
raise ValueError(
f"System '{system}' not currently supported as a destination system."
)

def _fsspec_copy(self, system: str, path: str, data_config: dict):
"""Copy the fsspec filesystem to the given new filesystem and path."""
Expand All @@ -319,11 +319,31 @@ def _destination_folder(
data_config: Optional[dict] = None,
):
"""Returns a new Folder object pointing to the destination folder."""
raise NotImplementedError
folder_config = self.config()
folder_config["system"] = dest_system
folder_config["path"] = dest_path
folder_config["data_config"] = data_config
new_folder = Folder.from_config(folder_config)

return new_folder

def _to_local(self, dest_path: str, data_config: dict):
"""Copies folder to local."""
raise NotImplementedError
"""Copies folder to local. Only relevant for the base Folder if its system is a cluster."""
if isinstance(self.system, Cluster):
# Cluster --> local copying
logger.debug(
f"Copying folder from cluster {self.system.name} to local path: {dest_path}"
)
self._cluster_to_local(self.system, dest_path)
return self

if self.system == self.DEFAULT_FS:
# Local --> local copying
logger.debug(f"Copying folder to local path: {dest_path}")
self.mv(system=self.system, path=dest_path, data_config=data_config)
return self

raise TypeError(f"Cannot copy from {self.system} to local.")

def _to_data_store(
self,
Expand Down Expand Up @@ -484,7 +504,21 @@ def _cluster_to_local(self, cluster, dest_path):
This function rsyncs down the data and return a folder with system=='file'.
"""
raise NotImplementedError
if not cluster.address:
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster._rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
contents=True,
)
new_folder = copy.deepcopy(self)
new_folder.path = dest_path
new_folder.system = "file"
# Don't need to do anything with _data_config because cluster creds are injected virtually through the
# data_config property
return new_folder

def is_local(self):
"""Whether the folder is on the local filesystem.
Expand All @@ -506,7 +540,7 @@ def _upload_command(self, src: str, dest: str):
"""CLI command for uploading folder to remote bucket. Needed when uploading a folder from a cluster."""
raise NotImplementedError

def _run_upload_cli_cmd(self, command: str):
def _upload_folder_to_bucket(self, command: str):
"""Uploads a folder to a remote bucket.
Based on the CLI command skypilot uses to upload the folder"""
# Adapted from: https://github.com/skypilot-org/skypilot/blob/983f5fa3197fe7c4b5a28be240f7b027f7192b15/sky/data/data_utils.py#L165 # noqa
Expand Down Expand Up @@ -606,7 +640,8 @@ def ls(self, full_paths: bool = True, sort: bool = False) -> List:
sort (Optional[bool]): Whether to sort the folder contents by time modified.
Defaults to ``False``.
"""
paths = [p for p in Path(self.path).iterdir()]
path = Path(self.path).expanduser()
paths = [p for p in path.iterdir()]

# Sort the paths by modification time if sort is True
if sort:
Expand Down Expand Up @@ -738,7 +773,8 @@ def locate(self, name_or_path) -> (str, str):
return None, None

def open(self, name, mode="rb", encoding=None):
"""Returns the specified file as a stream, which must be used as a content manager to be opened.
"""Returns the specified file as a stream (`botocore.response.StreamingBody`), which must be used as a
content manager to be opened.
Example:
>>> with my_folder.open('obj_name') as my_file:
Expand Down Expand Up @@ -830,7 +866,7 @@ def put(
Defaults to ``False``.
mode (Optional(str)): Write mode to use for fsspec. Defaults to ``wb``.
write_fn (Optional(Callable)): Function to use for writing file contents.
Example: ``write_fn = lambda f, data: json.dump(data, f)
Example: ``write_fn = lambda f, data: json.dump(data, f)``
Example:
>>> my_folder.put(contents={"filename.txt": data})
Expand Down Expand Up @@ -868,23 +904,31 @@ def put(
)

for filename, file_obj in contents.items():
file_path = os.path.join(self.path, filename)

if not overwrite and os.path.exists(file_path):
file_obj = self._serialize_file_obj(file_obj)
file_path = Path(self.path) / filename
if not overwrite and file_path.exists():
raise FileExistsError(f"File {file_path} already exists.")

try:
with open(file_path, mode) as f:
if write_fn:
write_fn(f, file_obj)
else:
f.write(file_obj) if isinstance(file_obj, bytes) else f.write(
file_obj.read()
)
f.write(file_obj)

except Exception as e:
raise RuntimeError(f"Failed to write {filename} to {file_path}: {e}")

@staticmethod
def _serialize_file_obj(file_obj):
if not isinstance(file_obj, bytes):
try:
file_obj = pickle.dumps(file_obj)
except (pickle.PicklingError, TypeError) as e:
raise ValueError(f"Cannot serialize file contents: {e}")

return file_obj

@staticmethod
def _bucket_name_from_path(path: str) -> str:
"""Extract the bucket name from a path (e.g. '/my-bucket/my-folder/my-file.txt' -> 'my-bucket')"""
Expand Down
19 changes: 10 additions & 9 deletions runhouse/resources/folders/folder_factory.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import warnings
from pathlib import Path
from typing import Dict, Optional, Union

from runhouse.logger import logger

from runhouse.resources.folders.folder import Folder
from runhouse.resources.hardware.utils import _get_cluster_from
from runhouse.resources.resource import Resource


def folder(
Expand All @@ -22,7 +22,6 @@ def folder(
path (Optional[str or Path]): Path (or path) that the folder is located at.
system (Optional[str or Cluster]): File system or cluster name. If providing a file system this must be one of:
[``file``, ``s3``, ``gs``].
We are working to add additional file system support.
dryrun (bool): Whether to create the Folder if it doesn't exist, or load a Folder object as a dryrun.
(Default: ``False``)
local_mount (bool): Whether or not to mount the folder locally. (Default: ``False``)
Expand Down Expand Up @@ -53,6 +52,7 @@ def folder(
if system == "s3":
from .s3_folder import S3Folder

logger.debug(f"Creating a S3 folder with name: {name}")
return S3Folder(
system=system,
path=path,
Expand All @@ -64,6 +64,7 @@ def folder(
elif system == "gs":
from .gcs_folder import GCSFolder

logger.debug(f"Creating a GS folder with name: {name}")
return GCSFolder(
system=system,
path=path,
Expand All @@ -72,11 +73,9 @@ def folder(
name=name,
dryrun=dryrun,
)
elif system == "azure":
warnings.warn("Azure folders are not currently supported.")

if system == Folder.DEFAULT_FS:
# Create the base Folder to interact with the local filesystem
logger.debug(f"Creating local folder with name: {name}")
return Folder(
system=system,
path=path,
Expand All @@ -86,9 +85,11 @@ def folder(
dryrun=dryrun,
)

cluster_system = _get_cluster_from(system, dryrun=dryrun)
if isinstance(cluster_system, Resource):
from runhouse import Cluster

cluster_system = _get_cluster_from(system, dryrun=dryrun)
if isinstance(cluster_system, Cluster):
logger.debug(f"Creating folder {name} for cluster: {cluster_system.name}")
return Folder(
system=cluster_system,
path=path,
Expand All @@ -99,6 +100,6 @@ def folder(
)

raise ValueError(
f"File system '{system}' not currently supported. If the file system "
f"System '{system}' not currently supported. If the file system "
f"is a cluster (ex: /my-user/rh-cpu), make sure the cluster config has been saved."
)
Loading

0 comments on commit 4f96651

Please sign in to comment.