From a7794b9aa894b982da5b93569f155a9e5f4f348f Mon Sep 17 00:00:00 2001 From: tianwei Date: Mon, 12 Dec 2022 13:32:52 +0800 Subject: [PATCH] support cloud remote dataset build by dataset sdk --- client/starwhale/api/_impl/dataset/builder.py | 28 +++- client/starwhale/api/_impl/dataset/model.py | 124 +++++++++++++----- client/starwhale/api/_impl/wrapper.py | 4 +- client/starwhale/base/bundle_copy.py | 45 ++++--- client/starwhale/core/dataset/copy.py | 22 +--- client/starwhale/core/dataset/model.py | 5 +- client/starwhale/core/dataset/tabular.py | 6 +- client/starwhale/core/model/copy.py | 3 + 8 files changed, 161 insertions(+), 76 deletions(-) diff --git a/client/starwhale/api/_impl/dataset/builder.py b/client/starwhale/api/_impl/dataset/builder.py index ef7aa56c7a..33d087e68f 100644 --- a/client/starwhale/api/_impl/dataset/builder.py +++ b/client/starwhale/api/_impl/dataset/builder.py @@ -16,10 +16,20 @@ import jsonlines from loguru import logger -from starwhale.consts import AUTH_ENV_FNAME, DEFAULT_PROJECT, SWDS_DATA_FNAME_FMT +from starwhale.consts import ( + AUTH_ENV_FNAME, + DEFAULT_PROJECT, + STANDALONE_INSTANCE, + SWDS_DATA_FNAME_FMT, +) from starwhale.base.uri import URI from starwhale.utils.fs import empty_dir, ensure_dir -from starwhale.base.type import DataFormatType, DataOriginType, ObjectStoreType +from starwhale.base.type import ( + InstanceType, + DataFormatType, + DataOriginType, + ObjectStoreType, +) from starwhale.utils.error import FormatError, NoSupportError from starwhale.core.dataset import model from starwhale.core.dataset.type import ( @@ -61,6 +71,7 @@ def __init__( append_from_version: str = "", append_from_uri: t.Optional[URI] = None, data_mime_type: MIMEType = MIMEType.UNDEFINED, + instance_name: str = STANDALONE_INSTANCE, ) -> None: # TODO: add more docstring for args # TODO: validate group upper and lower? @@ -80,11 +91,20 @@ def __init__( self.dataset_name = dataset_name self.dataset_version = dataset_version self.tabular_dataset = TabularDataset( - dataset_name, dataset_version, project_name + dataset_name, + dataset_version, + project_name, + instance_name=instance_name, ) self._forked_summary: t.Optional[DatasetSummary] if append and append_from_uri: + # TODOļ¼š controller supports cloud dataset fork api + if append_from_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError( + f"Can't build dataset from existed cloud dataset: {append_from_uri}" + ) + self._forked_last_seq_id, self._forked_rows = self.tabular_dataset.fork( append_from_version ) @@ -546,6 +566,7 @@ def __init__( append_from_version: str = "", append_from_uri: t.Optional[URI] = None, append_with_swds_bin: bool = True, + instance_name: str = STANDALONE_INSTANCE, ) -> None: super().__init__( name=f"RowWriter-{dataset_name}-{dataset_version}-{project_name}" @@ -561,6 +582,7 @@ def __init__( "append": append, "append_from_version": append_from_version, "append_from_uri": append_from_uri, + "instance_name": instance_name, } self._queue: queue.Queue[t.Optional[DataRow]] = queue.Queue() diff --git a/client/starwhale/api/_impl/dataset/model.py b/client/starwhale/api/_impl/dataset/model.py index 15da09b024..7c330722c6 100644 --- a/client/starwhale/api/_impl/dataset/model.py +++ b/client/starwhale/api/_impl/dataset/model.py @@ -6,12 +6,19 @@ from types import TracebackType from pathlib import Path from functools import wraps +from contextlib import ExitStack from loguru import logger from starwhale.utils import gen_uniq_version -from starwhale.consts import HTTPMethod, DEFAULT_PAGE_IDX, DEFAULT_PAGE_SIZE +from starwhale.consts import ( + HTTPMethod, + DEFAULT_PAGE_IDX, + DEFAULT_PAGE_SIZE, + STANDALONE_INSTANCE, +) from starwhale.base.uri import URI, URIType +from starwhale.utils.fs import move_dir, empty_dir from starwhale.base.type import InstanceType from starwhale.base.cloud import CloudRequestMixed from starwhale.utils.error import ExistedError, NotFoundError, NoSupportError @@ -104,6 +111,17 @@ def __init__( _summary = None if self._check_uri_exists(_origin_uri): if create: + # TODO: support build cloud dataset from the existed dataset + if _origin_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError( + f"Can't build dataset from the existed cloud dataset uri:{_origin_uri}" + ) + + if self.uri.instance_type == InstanceType.CLOUD: + raise NoSupportError( + f"Can't build cloud dataset({self.uri}) from existed dataset uri" + ) + self._append_from_version = version self._create_by_append = True self._fork_dataset() @@ -139,10 +157,21 @@ def __init__( def _fork_dataset(self) -> None: # TODO: support cloud dataset prepare in the tmp dir # TODO: lazy fork dataset - self.__core_dataset._prepare_snapshot() - self.__core_dataset._fork_swds( - self._create_by_append, self._append_from_version - ) + if not isinstance(self.__core_dataset, StandaloneDataset): + raise NoSupportError( + f"only support standalone dataset fork: {self.__core_dataset}" + ) + + def _when_exit() -> None: + self.__core_dataset.store.building = False + + with ExitStack() as stack: + stack.callback(_when_exit) + self.__core_dataset.store.building = True + self.__core_dataset._prepare_snapshot() + self.__core_dataset._fork_swds( + self._create_by_append, self._append_from_version + ) def _auto_complete_version(self, version: str) -> str: version = version.strip() @@ -431,11 +460,6 @@ def __setitem__( ) -> None: # TODO: tune the performance of getitem by cache self._trigger_icode_build = True - if not isinstance(self.__core_dataset, StandaloneDataset): - raise NoSupportError( - f"setitem only supports for standalone dataset: {self.__core_dataset}" - ) - _row_writer = self._get_row_writer() if not isinstance(key, (int, str)): @@ -487,20 +511,16 @@ def _get_row_writer(self) -> RowWriter: append_from_version = "" # TODO: support alignment_bytes_size, volume_bytes_size arguments - if not isinstance(self.__core_dataset, StandaloneDataset): - raise NoSupportError( - f"setitem only supports for standalone dataset: {self.__core_dataset}" - ) - self._row_writer = RowWriter( dataset_name=self.name, dataset_version=self.version, project_name=self.project_uri.project, - workdir=self.__core_dataset.store.snapshot_workdir, # TODO: use tmpdir which is same as dataset build command + workdir=self.__core_dataset.store.tmp_dir, append=self._create_by_append, append_from_version=append_from_version, append_from_uri=append_from_uri, append_with_swds_bin=self._append_use_swds_bin, + instance_name=self.project_uri.instance, ) return self._row_writer @@ -573,28 +593,64 @@ def build_with_copy_src( @_check_readonly @_forbid_handler_build def _do_build_from_interactive_code(self) -> None: - ds = self.__core_dataset - if isinstance(ds, StandaloneDataset): - if self._row_writer is None: - raise RuntimeError("row writer is none, no data was written") + if self._row_writer is None: + raise RuntimeError("row writer is none, no data was written") - self.flush() - self._row_writer.close() - # TODO: use the elegant method to refactor manifest update - self._summary = self._row_writer.summary - self._summary.rows = len(self) - ds._manifest["dataset_summary"] = self._summary.asdict() - ds._calculate_signature() - ds._render_manifest() - ds._make_swds_meta_tar() - ds._make_auto_tags() + self.flush() + self._row_writer.close() + self._summary = self._row_writer.summary + + # TODO: use the elegant method to refactor manifest update + self._summary.rows = len(self) + + if isinstance(self.__core_dataset, StandaloneDataset): + local_ds = self.__core_dataset + local_uri = self.uri else: - # TODO: support cloud dataset build - raise NoSupportError("only support standalone dataset build") + local_uri = URI.capsulate_uri( + instance=STANDALONE_INSTANCE, + project=self.uri.project, + obj_type=self.uri.object.typ, + obj_name=self.uri.object.name, + obj_ver=self.uri.object.version, + ) + local_ds = StandaloneDataset(local_uri) + local_ds.store._tmp_dir = self.__core_dataset.store.tmp_dir + setattr(local_ds, "_version", self.version) + + def _when_standalone_exit() -> None: + local_ds._make_auto_tags() + move_dir(local_ds.store.tmp_dir, local_ds.store.snapshot_workdir) + + def _when_cloud_exit() -> None: + from starwhale.core.dataset.copy import DatasetCopy + + dc = DatasetCopy(str(local_uri), str(self.uri), URIType.DATASET) + dc._do_upload_bundle_dir(workdir=local_ds.store.tmp_dir) + empty_dir(local_ds.store.tmp_dir) + + def _when_exit() -> None: + local_ds.store.building = False + if isinstance(self.__core_dataset, StandaloneDataset): + _when_standalone_exit() + else: + _when_cloud_exit() + + with ExitStack() as stack: + stack.callback(_when_exit) + local_ds.store.building = True + local_ds._manifest["dataset_summary"] = self._summary.asdict() + local_ds._calculate_signature() + local_ds._render_manifest() + local_ds._make_swds_meta_tar() @_check_readonly @_forbid_icode_build def _do_build_from_handler(self) -> None: + # TODO: support build dataset for cloud uri directly + if self.project_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError("no support to build cloud dataset directly") + self._trigger_icode_build = True config = DatasetConfig( name=self.name, @@ -620,10 +676,6 @@ def _do_build_from_handler(self) -> None: @_check_readonly def build(self) -> None: - # TODO: support build dataset for cloud uri directly - if self.project_uri.instance_type == InstanceType.CLOUD: - raise NoSupportError("no support to build cloud dataset directly") - if self._trigger_icode_build: self._do_build_from_interactive_code() elif self._trigger_handler_build and self.build_handler: diff --git a/client/starwhale/api/_impl/wrapper.py b/client/starwhale/api/_impl/wrapper.py index bd04bb5102..e46f1641ff 100644 --- a/client/starwhale/api/_impl/wrapper.py +++ b/client/starwhale/api/_impl/wrapper.py @@ -161,7 +161,7 @@ def flush(self, table_name: str) -> None: class Dataset(Logger): def __init__( - self, dataset_id: str, project: str, instance_uri: str = "", token: str = "" + self, dataset_id: str, project: str, instance_name: str = "", token: str = "" ) -> None: if not dataset_id: raise RuntimeError("id should not be None") @@ -172,7 +172,7 @@ def __init__( self.dataset_id = dataset_id self.project = project self._meta_table_name = f"project/{self.project}/dataset/{self.dataset_id}/meta" - self._data_store = data_store.get_data_store(instance_uri, token) + self._data_store = data_store.get_data_store(instance_name, token) self._init_writers([self._meta_table_name]) def put(self, data_id: Union[str, int], **kwargs: Any) -> None: diff --git a/client/starwhale/base/bundle_copy.py b/client/starwhale/base/bundle_copy.py index a4c5d77620..ca106b3bac 100644 --- a/client/starwhale/base/bundle_copy.py +++ b/client/starwhale/base/bundle_copy.py @@ -302,15 +302,19 @@ def _download(_tid: TaskID, fd: FileDesc) -> None: def _do_ubd_bundle_prepare( self, - progress: Progress, + progress: t.Optional[Progress], workdir: Path, url_path: str, ) -> t.Any: manifest_path = workdir / DEFAULT_MANIFEST_NAME - task_id = progress.add_task( - f":arrow_up: {manifest_path.name}", - total=manifest_path.stat().st_size, - ) + if progress is None: + task_id = TaskID(0) + else: + task_id = progress.add_task( + f":arrow_up: {manifest_path.name}", + total=manifest_path.stat().st_size, + ) + # TODO: use rich progress r = self.do_multipart_upload_file( url_path=url_path, @@ -331,7 +335,7 @@ def _do_ubd_bundle_prepare( def _do_ubd_blobs( self, - progress: Progress, + progress: t.Optional[Progress], workdir: Path, upload_id: str, url_path: str, @@ -348,7 +352,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: _upload_headers["X-SW-UPLOAD-TYPE"] = fd.file_type.name _upload_headers["X-SW-UPLOAD-OBJECT-HASH"] = fd.signature - progress.update(_tid, visible=True) + if progress is not None: + progress.update(_tid, visible=True) + self.do_multipart_upload_file( url_path=url_path, file_path=fd.path, @@ -364,14 +370,18 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: ) _p_map = {} - for _f in self.upload_files(workdir=workdir): + for _id, _f in enumerate(self.upload_files(workdir=workdir)): if existed_files and _f.signature in existed_files: continue - _tid = progress.add_task( - f":arrow_up: {_f.path.name}", - total=float(_f.size), - visible=False, - ) + + if progress is None: + _tid = TaskID(_id) + else: + _tid = progress.add_task( + f":arrow_up: {_f.path.name}", + total=float(_f.size), + visible=False, + ) _p_map[_tid] = _f with ThreadPoolExecutor( @@ -383,6 +393,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: ] wait(futures) + def _do_ubd_datastore(self) -> None: + raise NotImplementedError + def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None: phase = _UploadPhase.END if ok else _UploadPhase.CANCEL self.do_http_request( @@ -401,9 +414,10 @@ def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None: def _do_upload_bundle_dir( self, - progress: Progress, + progress: t.Optional[Progress] = None, + workdir: t.Optional[Path] = None, ) -> None: - workdir: Path = self._get_target_path(self.src_uri) + workdir = workdir or self._get_target_path(self.src_uri) url_path = self._get_remote_instance_rc_url() res_data = self._do_ubd_bundle_prepare( @@ -416,6 +430,7 @@ def _do_upload_bundle_dir( raise Exception("upload_id is empty") exists_files: list = res_data.get("existed", []) try: + self._do_ubd_datastore() self._do_ubd_blobs( progress=progress, workdir=workdir, diff --git a/client/starwhale/core/dataset/copy.py b/client/starwhale/core/dataset/copy.py index f2ed0dc8a5..bfcfae2f4c 100644 --- a/client/starwhale/core/dataset/copy.py +++ b/client/starwhale/core/dataset/copy.py @@ -1,5 +1,5 @@ import os -from typing import List, Iterator, Optional +from typing import Iterator from pathlib import Path from rich.progress import Progress @@ -79,24 +79,17 @@ def download_files(self, workdir: Path) -> Iterator[FileDesc]: file_type=FileType.SRC_TAR, ) - def _do_ubd_blobs( - self, - progress: Progress, - workdir: Path, - upload_id: str, - url_path: str, - existed_files: Optional[List] = None, - ) -> None: + def _do_ubd_datastore(self) -> None: with TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.src_uri.project, - instance_uri=STANDALONE_INSTANCE, + instance_name=STANDALONE_INSTANCE, ) as local, TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.dest_uri.project, - instance_uri=self.dest_uri.instance, + instance_name=self.dest_uri.instance, ) as remote: console.print( f":bear_face: dump dataset meta from standalone to cloud({remote._ds_wrapper._meta_table_name})" @@ -105,20 +98,17 @@ def _do_ubd_blobs( for row in local.scan(): remote.put(row) - super()._do_ubd_blobs(progress, workdir, upload_id, url_path, existed_files) - def _do_download_bundle_dir(self, progress: Progress) -> None: - with TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.dest_uri.project, - instance_uri=STANDALONE_INSTANCE, + instance_name=STANDALONE_INSTANCE, ) as local, TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.src_uri.project, - instance_uri=self.src_uri.instance, + instance_name=self.src_uri.instance, ) as remote: console.print( f":bird: load dataset meta from cloud({remote._ds_wrapper._meta_table_name}) to standalone" diff --git a/client/starwhale/core/dataset/model.py b/client/starwhale/core/dataset/model.py index 24833d3b62..16f2149901 100644 --- a/client/starwhale/core/dataset/model.py +++ b/client/starwhale/core/dataset/model.py @@ -39,6 +39,10 @@ class Dataset(BaseBundle, metaclass=ABCMeta): + def __init__(self, uri: URI) -> None: + self.store = DatasetStorage(uri) + super().__init__(uri) + def __str__(self) -> str: return f"Starwhale Dataset: {self.uri}" @@ -125,7 +129,6 @@ class StandaloneDataset(Dataset, LocalStorageBundleMixin): def __init__(self, uri: URI) -> None: super().__init__(uri) self.typ = InstanceType.STANDALONE - self.store = DatasetStorage(uri) self.tag = StandaloneTag(uri) self._manifest: t.Dict[ str, t.Any diff --git a/client/starwhale/core/dataset/tabular.py b/client/starwhale/core/dataset/tabular.py index 122bc76738..32e9bcd00e 100644 --- a/client/starwhale/core/dataset/tabular.py +++ b/client/starwhale/core/dataset/tabular.py @@ -176,7 +176,7 @@ def __init__( project: str, start: t.Optional[t.Any] = None, end: t.Optional[t.Any] = None, - instance_uri: str = "", + instance_name: str = "", token: str = "", ) -> None: self.name = name @@ -184,7 +184,7 @@ def __init__( self.project = project self.table_name = f"{name}/{version[:VERSION_PREFIX_CNT]}/{version}" self._ds_wrapper = DatastoreWrapperDataset( - self.table_name, project, instance_uri=instance_uri, token=token + self.table_name, project, instance_name=instance_name, token=token ) self.start = start @@ -306,7 +306,7 @@ def from_uri( uri.project, start=start, end=end, - instance_uri=uri.instance, + instance_name=uri.instance, ) diff --git a/client/starwhale/core/model/copy.py b/client/starwhale/core/model/copy.py index 2a6b15547d..8b0b5e0cfa 100644 --- a/client/starwhale/core/model/copy.py +++ b/client/starwhale/core/model/copy.py @@ -68,3 +68,6 @@ def download_files(self, workdir: Path) -> Iterator[FileDesc]: # Path(workdir / _m["path"]).symlink_to( # _dest # the unify dir # ) + + def _do_ubd_datastore(self) -> None: + ...