Skip to content

Commit

Permalink
output: serialize 3.x hash name in 'hash' field
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Jun 6, 2023
1 parent b94f097 commit 06f398a
Show file tree
Hide file tree
Showing 24 changed files with 236 additions and 67 deletions.
3 changes: 2 additions & 1 deletion dvc/dependency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def loadd_from(stage, d_list):
for d in d_list:
p = d.pop(Output.PARAM_PATH, None)
files = d.pop(Output.PARAM_FILES, None)
ret.append(_get(stage, p, d, files=files))
hash_name = d.pop(Output.PARAM_HASH, None)
ret.append(_get(stage, p, d, files=files, hash_name=hash_name))
return ret


Expand Down
1 change: 1 addition & 0 deletions dvc/dependency/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, stage, path, params=None, repo=None):
repo = repo or stage.repo
path = path or os.path.join(repo.root_dir, self.DEFAULT_PARAMS_FILE)
super().__init__(stage, path, repo=repo)
self.hash_name = self.PARAM_PARAMS
self.hash_info = hash_info

def dumpd(self, **kwargs):
Expand Down
86 changes: 70 additions & 16 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dvc_data.hashfile.build import build
from dvc_data.hashfile.checkout import checkout
from dvc_data.hashfile.db import HashFileDB, add_update_tree
from dvc_data.hashfile.hash import DEFAULT_ALGORITHM
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.istextfile import istextfile
from dvc_data.hashfile.meta import Meta
Expand Down Expand Up @@ -64,7 +65,7 @@

# NOTE: currently there are only 3 possible checksum names:
#
# 1) md5 (LOCAL, SSH);
# 1) md5 (LOCAL, SSH) (actually DVC 2.x md5-dos2unix)
# 2) etag (S3, GS, OSS, AZURE, HTTP);
# 3) checksum (HDFS);
#
Expand All @@ -73,7 +74,7 @@
HDFS_PARAM_CHECKSUM = "checksum"
S3_PARAM_CHECKSUM = "etag"
CHECKSUMS_SCHEMA = {
LocalFileSystem.PARAM_CHECKSUM: CHECKSUM_SCHEMA,
"md5": CHECKSUM_SCHEMA, # DVC 2.x md5-dos2unix
HDFS_PARAM_CHECKSUM: CHECKSUM_SCHEMA,
S3_PARAM_CHECKSUM: CASE_SENSITIVE_CHECKSUM_SCHEMA,
}
Expand All @@ -95,6 +96,7 @@ def loadd_from(stage, d_list):
annot = {field: d.pop(field, None) for field in ANNOTATION_FIELDS}
files = d.pop(Output.PARAM_FILES, None)
push = d.pop(Output.PARAM_PUSH, True)
hash_name = d.pop(Output.PARAM_HASH, None)
ret.append(
_get(
stage,
Expand All @@ -108,6 +110,7 @@ def loadd_from(stage, d_list):
**annot,
files=files,
push=push,
hash_name=hash_name,
)
)
return ret
Expand Down Expand Up @@ -227,17 +230,29 @@ def merge_file_meta_from_cloud(entry: Dict) -> Dict:
return entry


def _serialize_tree_obj_to_files(obj: "Tree") -> List[Dict[str, Any]]:
def _serialize_tree_obj_to_files(obj: Tree) -> List[Dict[str, Any]]:
key = obj.PARAM_RELPATH
return sorted(
(
{key: posixpath.sep.join(parts), **hi.to_dict(), **meta.to_dict()}
{
key: posixpath.sep.join(parts),
**_serialize_hi_to_dict(hi),
**meta.to_dict(),
}
for parts, meta, hi in obj
),
key=itemgetter(key),
)


def _serialize_hi_to_dict(hash_info: Optional[HashInfo]) -> Dict[str, Any]:
if hash_info:
if hash_info.name == "md5-dos2unix":
return {"md5": hash_info.value}
return hash_info.to_dict()
return {}


class OutputDoesNotExistError(DvcException):
def __init__(self, path):
msg = f"output '{path}' does not exist"
Expand Down Expand Up @@ -297,6 +312,7 @@ class Output:
PARAM_REMOTE = "remote"
PARAM_PUSH = "push"
PARAM_CLOUD = "cloud"
PARAM_HASH = "hash"

METRIC_SCHEMA = Any(
None,
Expand Down Expand Up @@ -330,6 +346,7 @@ def __init__( # noqa: PLR0913
fs_config=None,
files: Optional[List[Dict[str, Any]]] = None,
push: bool = True,
hash_name: Optional[str] = DEFAULT_ALGORITHM,
):
self.annot = Annotation(
desc=desc, type=type, labels=labels or [], meta=meta or {}
Expand Down Expand Up @@ -401,16 +418,29 @@ def __init__( # noqa: PLR0913
)
self.meta.version_id = version_id

self.hash_name, self.hash_info = self._compute_hash_info_from_meta(hash_name)
self._compute_meta_hash_info_from_files()

def _compute_hash_info_from_meta(
self, hash_name: Optional[str]
) -> Tuple[str, HashInfo]:
if self.is_in_repo:
self.hash_name = "md5"
if hash_name is None:
# Legacy 2.x output, use "md5-dos2unix" but read "md5" from
# file meta
hash_name = "md5-dos2unix"
meta_name = "md5"
else:
meta_name = hash_name
else:
self.hash_name = self.fs.PARAM_CHECKSUM
hash_name = meta_name = self.fs.PARAM_CHECKSUM
assert hash_name

self.hash_info = HashInfo(
name=self.hash_name,
value=getattr(self.meta, self.hash_name, None),
hash_info = HashInfo(
name=hash_name,
value=getattr(self.meta, meta_name, None),
)
self._compute_meta_hash_info_from_files()
return hash_name, hash_info

def _compute_meta_hash_info_from_files(self) -> None:
if self.files:
Expand All @@ -426,7 +456,7 @@ def _compute_meta_hash_info_from_files(self) -> None:
self.meta.remote = first(f.get("remote") for f in self.files)
elif self.meta.nfiles or self.hash_info and self.hash_info.isdir:
self.meta.isdir = True
if not self.hash_info and self.hash_name != "md5":
if not self.hash_info and self.hash_name not in ("md5", "md5-dos2unix"):
md5 = getattr(self.meta, "md5", None)
if md5:
self.hash_info = HashInfo("md5", md5)
Expand Down Expand Up @@ -503,7 +533,12 @@ def use_scm_ignore(self):

@property
def cache(self):
odb_name = "repo" if self.is_in_repo else self.protocol
from dvc.cachemgr import LEGACY_HASH_NAMES

if self.is_in_repo:
odb_name = "legacy" if self.hash_name in LEGACY_HASH_NAMES else "repo"
else:
odb_name = self.protocol
odb = getattr(self.repo.cache, odb_name)
if self.use_cache and odb is None:
raise RemoteCacheRequiredError(self.fs.protocol, self.fs_path)
Expand Down Expand Up @@ -540,6 +575,11 @@ def _get_hash_meta(self):
ignore=self.dvcignore,
dry_run=not self.use_cache,
)
if self.hash_name == "md5-dos2unix" and obj.hash_info.name == "md5":
# NOTE: If build() returns a hash with differing name it means this
# was a hash loaded from state, and in this case 'md5' comes from
# a 2.x state entry
obj.hash_info.name = "md5-dos2unix"
return meta, obj.hash_info

def get_meta(self) -> Meta:
Expand Down Expand Up @@ -781,6 +821,8 @@ def _commit_granular_dir(self, filter_info, hardlink) -> Optional["HashFile"]:
return checkout_obj

def dumpd(self, **kwargs): # noqa: C901, PLR0912
from dvc.cachemgr import LEGACY_HASH_NAMES

ret: Dict[str, Any] = {}
with_files = (
(not self.IS_DEPENDENCY or self.stage.is_import)
Expand All @@ -791,14 +833,22 @@ def dumpd(self, **kwargs): # noqa: C901, PLR0912
if not with_files:
meta_d = self.meta.to_dict()
meta_d.pop("isdir", None)
ret.update(self.hash_info.to_dict())
if self.hash_name in LEGACY_HASH_NAMES:
# 2.x checksums get serialized with file meta
name = "md5" if self.hash_name == "md5-dos2unix" else self.hash_name
ret.update({name: self.hash_info.value})
else:
ret.update(self.hash_info.to_dict())
ret.update(split_file_meta_from_cloud(meta_d))

if self.is_in_repo:
path = self.fs.path.as_posix(relpath(self.fs_path, self.stage.wdir))
else:
path = self.def_path

if self.hash_name not in LEGACY_HASH_NAMES:
ret[self.PARAM_HASH] = self.hash_name

ret[self.PARAM_PATH] = path

if not self.IS_DEPENDENCY:
Expand Down Expand Up @@ -978,7 +1028,7 @@ def transfer(
odb,
from_info,
from_fs,
"md5",
DEFAULT_ALGORITHM,
upload=upload,
no_progress_bar=no_progress_bar,
)
Expand Down Expand Up @@ -1119,7 +1169,9 @@ def get_used_objs( # noqa: C901
return {}

if self.remote:
remote = self.repo.cloud.get_remote_odb(name=self.remote)
remote = self.repo.cloud.get_remote_odb(
name=self.remote, hash_name=self.hash_name
)
else:
remote = None

Expand Down Expand Up @@ -1158,13 +1210,14 @@ def _check_can_merge(self, out):
self.hash_name,
Meta.PARAM_SIZE,
Meta.PARAM_NFILES,
Output.PARAM_HASH,
]

for opt in ignored:
my.pop(opt, None)
other.pop(opt, None)

if my != other:
if my != other or self.hash_name != out.hash_name:
raise MergeError("unable to auto-merge outputs with different options")

if not out.is_dir_checksum:
Expand Down Expand Up @@ -1428,6 +1481,7 @@ def _merge_dir_version_meta(self, other: "Output"):
Output.PARAM_PLOT: bool,
Output.PARAM_PERSIST: bool,
Output.PARAM_CLOUD: CLOUD_SCHEMA,
Output.PARAM_HASH: str,
}

DIR_FILES_SCHEMA: Dict[str, Any] = {
Expand Down
1 change: 1 addition & 0 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Required("path"): str,
Output.PARAM_CLOUD: CLOUD_SCHEMA,
Output.PARAM_FILES: [DIR_FILES_SCHEMA],
Output.PARAM_HASH: str,
}
LOCK_FILE_STAGE_SCHEMA = {
Required(StageParams.PARAM_CMD): Any(str, list),
Expand Down
11 changes: 6 additions & 5 deletions dvc/stage/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from dvc import dependency, output
from dvc.parsing import FOREACH_KWD, JOIN, EntryNotFound
from dvc.utils.objects import cached_property
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.meta import Meta

from . import PipelineStage, Stage, loads_from
Expand All @@ -19,7 +18,6 @@

if TYPE_CHECKING:
from dvc.dvcfile import ProjectFile, SingleStageFile
from dvc.output import Output

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +50,7 @@ def fill_from_lock(stage, lock_data=None):
if not lock_data:
return

from dvc.output import merge_file_meta_from_cloud
from dvc.output import Output, merge_file_meta_from_cloud

assert isinstance(lock_data, dict)
items: Iterable[Tuple[str, "Output"]] = chain(
Expand All @@ -73,9 +71,12 @@ def fill_from_lock(stage, lock_data=None):
info = info.copy()
info.pop("path", None)

hash_name = info.pop(Output.PARAM_HASH, None)
item.meta = Meta.from_dict(merge_file_meta_from_cloud(info))
hash_value = getattr(item.meta, item.hash_name, None)
item.hash_info = HashInfo(item.hash_name, hash_value)
# pylint: disable-next=protected-access
item.hash_name, item.hash_info = item._compute_hash_info_from_meta(
hash_name
)
files = get_in(checksums, [key, path, item.PARAM_FILES], None)
if files:
item.files = [merge_file_meta_from_cloud(f) for f in files]
Expand Down
11 changes: 9 additions & 2 deletions dvc/stage/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,20 @@ def to_pipeline_file(stage: "PipelineStage"):


def to_single_stage_lockfile(stage: "Stage", **kwargs) -> dict:
from dvc.output import _serialize_tree_obj_to_files, split_file_meta_from_cloud
from dvc.cachemgr import LEGACY_HASH_NAMES
from dvc.output import (
_serialize_hi_to_dict,
_serialize_tree_obj_to_files,
split_file_meta_from_cloud,
)
from dvc_data.hashfile.tree import Tree

assert stage.cmd

def _dumpd(item: "Output"):
ret: Dict[str, Any] = {item.PARAM_PATH: item.def_path}
if item.hash_name not in LEGACY_HASH_NAMES:
ret[item.PARAM_HASH] = item.hash_name
if item.hash_info.isdir and kwargs.get("with_files"):
obj = item.obj or item.get_obj()
if obj:
Expand All @@ -167,7 +174,7 @@ def _dumpd(item: "Output"):
else:
meta_d = item.meta.to_dict()
meta_d.pop("isdir", None)
ret.update(item.hash_info.to_dict())
ret.update(_serialize_hi_to_dict(item.hash_info))
ret.update(split_file_meta_from_cloud(meta_d))
return ret

Expand Down
3 changes: 3 additions & 0 deletions dvc/testing/workspace_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ def test_import_dir(self, tmp_dir, dvc, workspace, stage_md5, dir_md5):
f"- md5: {dir_md5}\n"
" size: 11\n"
" nfiles: 2\n"
" hash: md5\n"
" path: remote://workspace/dir\n"
"outs:\n"
"- md5: b6dcab6ccd17ca0a8bf4a215a37d14cc.dir\n"
" size: 11\n"
" nfiles: 2\n"
" hash: md5\n"
" path: dir\n"
)

Expand Down Expand Up @@ -177,6 +179,7 @@ def test_add(self, tmp_dir, dvc, workspace, hash_name, hash_value):
"outs:\n"
f"- {hash_name}: {hash_value}\n"
" size: 4\n"
" hash: md5\n"
" path: remote://workspace/file\n"
)
assert (workspace / "file").read_text() == "file"
Expand Down
3 changes: 3 additions & 0 deletions tests/func/test_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_add(tmp_dir, dvc):
"md5": "acbd18db4cc2f85cedef654fccc4a4d8",
"path": "foo",
"size": 3,
"hash": "md5",
}
]
}
Expand All @@ -74,6 +75,7 @@ def test_add_executable(tmp_dir, dvc):
"path": "foo",
"size": 3,
"isexec": True,
"hash": "md5",
}
]
}
Expand Down Expand Up @@ -262,6 +264,7 @@ def test_add_external_relpath(tmp_dir, dvc, local_cloud):
"outs:\n"
"- md5: 8c7dd922ad47494fc02c388e12c00eac\n"
" size: 4\n"
" hash: md5\n"
f" path: {rel}\n"
)
assert fpath.read_text() == "file"
Expand Down
Loading

0 comments on commit 06f398a

Please sign in to comment.