Skip to content

Commit

Permalink
support cloud remote dataset build by dataset sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tianweidut committed Dec 12, 2022
1 parent 1f67118 commit 56716b2
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 71 deletions.
28 changes: 25 additions & 3 deletions client/starwhale/api/_impl/dataset/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@
import jsonlines
from loguru import logger

from starwhale.consts import AUTH_ENV_FNAME, DEFAULT_PROJECT, SWDS_DATA_FNAME_FMT
from starwhale.consts import (
AUTH_ENV_FNAME,
DEFAULT_PROJECT,
STANDALONE_INSTANCE,
SWDS_DATA_FNAME_FMT,
)
from starwhale.base.uri import URI
from starwhale.utils.fs import empty_dir, ensure_dir
from starwhale.base.type import DataFormatType, DataOriginType, ObjectStoreType
from starwhale.base.type import (
InstanceType,
DataFormatType,
DataOriginType,
ObjectStoreType,
)
from starwhale.utils.error import FormatError, NoSupportError
from starwhale.core.dataset import model
from starwhale.core.dataset.type import (
Expand Down Expand Up @@ -61,6 +71,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
data_mime_type: MIMEType = MIMEType.UNDEFINED,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
# TODO: add more docstring for args
# TODO: validate group upper and lower?
Expand All @@ -80,11 +91,20 @@ def __init__(
self.dataset_name = dataset_name
self.dataset_version = dataset_version
self.tabular_dataset = TabularDataset(
dataset_name, dataset_version, project_name
dataset_name,
dataset_version,
project_name,
instance_name=instance_name,
)

self._forked_summary: t.Optional[DatasetSummary]
if append and append_from_uri:
# TODO: controller supports cloud dataset fork api
if append_from_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from existed cloud dataset: {append_from_uri}"
)

self._forked_last_seq_id, self._forked_rows = self.tabular_dataset.fork(
append_from_version
)
Expand Down Expand Up @@ -546,6 +566,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
append_with_swds_bin: bool = True,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
super().__init__(
name=f"RowWriter-{dataset_name}-{dataset_version}-{project_name}"
Expand All @@ -561,6 +582,7 @@ def __init__(
"append": append,
"append_from_version": append_from_version,
"append_from_uri": append_from_uri,
"instance_name": instance_name,
}

self._queue: queue.Queue[t.Optional[DataRow]] = queue.Queue()
Expand Down
110 changes: 78 additions & 32 deletions client/starwhale/api/_impl/dataset/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
from types import TracebackType
from pathlib import Path
from functools import wraps
from contextlib import ExitStack

from loguru import logger

from starwhale.utils import gen_uniq_version
from starwhale.consts import HTTPMethod, DEFAULT_PAGE_IDX, DEFAULT_PAGE_SIZE
from starwhale.consts import (
HTTPMethod,
DEFAULT_PAGE_IDX,
DEFAULT_PAGE_SIZE,
STANDALONE_INSTANCE,
)
from starwhale.base.uri import URI, URIType
from starwhale.utils.fs import move_dir, empty_dir
from starwhale.base.type import InstanceType
from starwhale.base.cloud import CloudRequestMixed
from starwhale.utils.error import ExistedError, NotFoundError, NoSupportError
Expand Down Expand Up @@ -104,6 +111,17 @@ def __init__(
_summary = None
if self._check_uri_exists(_origin_uri):
if create:
# TODO: support build cloud dataset from the existed dataset
if _origin_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from the existed cloud dataset uri:{_origin_uri}"
)

if self.uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build cloud dataset({self.uri}) from existed dataset uri"
)

self._append_from_version = version
self._create_by_append = True
self._fork_dataset()
Expand Down Expand Up @@ -139,6 +157,11 @@ def __init__(
def _fork_dataset(self) -> None:
# TODO: support cloud dataset prepare in the tmp dir
# TODO: lazy fork dataset
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"only support standalone dataset fork: {self.__core_dataset}"
)

self.__core_dataset._prepare_snapshot()
self.__core_dataset._fork_swds(
self._create_by_append, self._append_from_version
Expand Down Expand Up @@ -431,11 +454,6 @@ def __setitem__(
) -> None:
# TODO: tune the performance of getitem by cache
self._trigger_icode_build = True
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

_row_writer = self._get_row_writer()

if not isinstance(key, (int, str)):
Expand Down Expand Up @@ -487,20 +505,16 @@ def _get_row_writer(self) -> RowWriter:
append_from_version = ""

# TODO: support alignment_bytes_size, volume_bytes_size arguments
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

self._row_writer = RowWriter(
dataset_name=self.name,
dataset_version=self.version,
project_name=self.project_uri.project,
workdir=self.__core_dataset.store.snapshot_workdir, # TODO: use tmpdir which is same as dataset build command
workdir=self.__core_dataset.store.tmp_dir,
append=self._create_by_append,
append_from_version=append_from_version,
append_from_uri=append_from_uri,
append_with_swds_bin=self._append_use_swds_bin,
instance_name=self.project_uri.instance,
)
return self._row_writer

Expand Down Expand Up @@ -573,28 +587,64 @@ def build_with_copy_src(
@_check_readonly
@_forbid_handler_build
def _do_build_from_interactive_code(self) -> None:
ds = self.__core_dataset
if isinstance(ds, StandaloneDataset):
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")

self.flush()
self._row_writer.close()
# TODO: use the elegant method to refactor manifest update
self._summary = self._row_writer.summary
self._summary.rows = len(self)
ds._manifest["dataset_summary"] = self._summary.asdict()
ds._calculate_signature()
ds._render_manifest()
ds._make_swds_meta_tar()
ds._make_auto_tags()
self.flush()
self._row_writer.close()
self._summary = self._row_writer.summary

# TODO: use the elegant method to refactor manifest update
self._summary.rows = len(self)

if isinstance(self.__core_dataset, StandaloneDataset):
local_ds = self.__core_dataset
local_uri = self.uri
else:
# TODO: support cloud dataset build
raise NoSupportError("only support standalone dataset build")
local_uri = URI.capsulate_uri(
instance=STANDALONE_INSTANCE,
project=self.uri.project,
obj_type=self.uri.object.typ,
obj_name=self.uri.object.name,
obj_ver=self.uri.object.version,
)
local_ds = StandaloneDataset(local_uri)
local_ds.store._tmp_dir = self.__core_dataset.store.tmp_dir
setattr(local_ds, "_version", self.version)

def _when_standalone_exit() -> None:
local_ds._make_auto_tags()
move_dir(local_ds.store.tmp_dir, local_ds.store.snapshot_workdir)

def _when_cloud_exit() -> None:
from starwhale.core.dataset.copy import DatasetCopy

dc = DatasetCopy(str(local_uri), str(self.uri), URIType.DATASET)
dc._do_upload_bundle_dir(workdir=local_ds.store.tmp_dir)
empty_dir(local_ds.store.tmp_dir)

def _when_exit() -> None:
local_ds.store.building = False
if isinstance(self.__core_dataset, StandaloneDataset):
_when_standalone_exit()
else:
_when_cloud_exit()

with ExitStack() as stack:
stack.callback(_when_exit)
local_ds.store.building = True
local_ds._manifest["dataset_summary"] = self._summary.asdict()
local_ds._calculate_signature()
local_ds._render_manifest()
local_ds._make_swds_meta_tar()

@_check_readonly
@_forbid_icode_build
def _do_build_from_handler(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

self._trigger_icode_build = True
config = DatasetConfig(
name=self.name,
Expand All @@ -620,10 +670,6 @@ def _do_build_from_handler(self) -> None:

@_check_readonly
def build(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

if self._trigger_icode_build:
self._do_build_from_interactive_code()
elif self._trigger_handler_build and self.build_handler:
Expand Down
4 changes: 2 additions & 2 deletions client/starwhale/api/_impl/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def flush(self, table_name: str) -> None:

class Dataset(Logger):
def __init__(
self, dataset_id: str, project: str, instance_uri: str = "", token: str = ""
self, dataset_id: str, project: str, instance_name: str = "", token: str = ""
) -> None:
if not dataset_id:
raise RuntimeError("id should not be None")
Expand All @@ -172,7 +172,7 @@ def __init__(
self.dataset_id = dataset_id
self.project = project
self._meta_table_name = f"project/{self.project}/dataset/{self.dataset_id}/meta"
self._data_store = data_store.get_data_store(instance_uri, token)
self._data_store = data_store.get_data_store(instance_name, token)
self._init_writers([self._meta_table_name])

def put(self, data_id: Union[str, int], **kwargs: Any) -> None:
Expand Down
45 changes: 30 additions & 15 deletions client/starwhale/base/bundle_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,19 @@ def _download(_tid: TaskID, fd: FileDesc) -> None:

def _do_ubd_bundle_prepare(
self,
progress: Progress,
progress: t.Optional[Progress],
workdir: Path,
url_path: str,
) -> t.Any:
manifest_path = workdir / DEFAULT_MANIFEST_NAME
task_id = progress.add_task(
f":arrow_up: {manifest_path.name}",
total=manifest_path.stat().st_size,
)
if progress is None:
task_id = TaskID(0)
else:
task_id = progress.add_task(
f":arrow_up: {manifest_path.name}",
total=manifest_path.stat().st_size,
)

# TODO: use rich progress
r = self.do_multipart_upload_file(
url_path=url_path,
Expand All @@ -331,7 +335,7 @@ def _do_ubd_bundle_prepare(

def _do_ubd_blobs(
self,
progress: Progress,
progress: t.Optional[Progress],
workdir: Path,
upload_id: str,
url_path: str,
Expand All @@ -348,7 +352,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None:
_upload_headers["X-SW-UPLOAD-TYPE"] = fd.file_type.name
_upload_headers["X-SW-UPLOAD-OBJECT-HASH"] = fd.signature

progress.update(_tid, visible=True)
if progress is not None:
progress.update(_tid, visible=True)

self.do_multipart_upload_file(
url_path=url_path,
file_path=fd.path,
Expand All @@ -364,14 +370,18 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None:
)

_p_map = {}
for _f in self.upload_files(workdir=workdir):
for _id, _f in enumerate(self.upload_files(workdir=workdir)):
if existed_files and _f.signature in existed_files:
continue
_tid = progress.add_task(
f":arrow_up: {_f.path.name}",
total=float(_f.size),
visible=False,
)

if progress is None:
_tid = _id
else:
_tid = progress.add_task(
f":arrow_up: {_f.path.name}",
total=float(_f.size),
visible=False,
)
_p_map[_tid] = _f

with ThreadPoolExecutor(
Expand All @@ -383,6 +393,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None:
]
wait(futures)

def _do_ubd_datastore(self) -> None:
raise NotImplementedError

def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None:
phase = _UploadPhase.END if ok else _UploadPhase.CANCEL
self.do_http_request(
Expand All @@ -401,9 +414,10 @@ def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None:

def _do_upload_bundle_dir(
self,
progress: Progress,
progress: t.Optional[Progress] = None,
workdir: t.Optional[Path] = None,
) -> None:
workdir: Path = self._get_target_path(self.src_uri)
workdir = workdir or self._get_target_path(self.src_uri)
url_path = self._get_remote_instance_rc_url()

res_data = self._do_ubd_bundle_prepare(
Expand All @@ -416,6 +430,7 @@ def _do_upload_bundle_dir(
raise Exception("upload_id is empty")
exists_files: list = res_data.get("existed", [])
try:
self._do_ubd_datastore()
self._do_ubd_blobs(
progress=progress,
workdir=workdir,
Expand Down
Loading

0 comments on commit 56716b2

Please sign in to comment.