Skip to content

Commit

Permalink
cache: move 3.x data objects into files/ prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Jun 6, 2023
1 parent 3e0f2ba commit b94f097
Show file tree
Hide file tree
Showing 21 changed files with 127 additions and 62 deletions.
29 changes: 27 additions & 2 deletions dvc/cachemgr.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
import os
from typing import Optional

from dvc.fs import GitFileSystem, Schemes
from dvc_data.hashfile.db import get_odb

LEGACY_HASH_NAMES = {"md5-dos2unix", "checksum", "etag", "params"}

def _get_odb(repo, settings, fs=None):

def _get_odb(
repo,
settings,
fs=None,
prefix: Optional[str] = None,
hash_name: Optional[str] = None,
**kwargs,
):
from dvc.fs import get_cloud_fs

if not settings:
return None

cls, config, fs_path = get_cloud_fs(repo, **settings)
fs = fs or cls(**config)
if prefix:
fs_path = fs.path.join(fs_path, prefix)
if hash_name:
config["hash_name"] = hash_name
return get_odb(fs, fs_path, state=repo.state, **config)


Expand All @@ -24,6 +38,7 @@ class CacheManager:
Schemes.HDFS,
Schemes.WEBHDFS,
]
FILES_DIR = "files"

def __init__(self, repo):
self._repo = repo
Expand Down Expand Up @@ -53,9 +68,11 @@ def __init__(self, repo):
if not isinstance(repo.fs, GitFileSystem):
kwargs["fs"] = repo.fs

odb = _get_odb(repo, settings, **kwargs)
odb = _get_odb(repo, settings, prefix=self.FILES_DIR, **kwargs)
self._odb["repo"] = odb
self._odb[Schemes.LOCAL] = odb
legacy_odb = _get_odb(repo, settings, hash_name="md5-dos2unix", **kwargs)
self._odb["legacy"] = legacy_odb

def _init_odb(self, schemes):
for scheme in schemes:
Expand All @@ -75,3 +92,11 @@ def __getattr__(self, name):
def by_scheme(self):
self._init_odb(self.CLOUD_SCHEMES)
yield from self._odb.items()

@property
def local_cache_dir(self) -> str:
"""Return base local cache directory without any prefixes.
(i.e. `dvc cache dir`).
"""
return self.local.fs.path.parent(self.local.path)
16 changes: 15 additions & 1 deletion dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,23 @@ def __init__(self, name: str, path: str, fs: "FileSystem", *, index=None, **conf

@cached_property
def odb(self) -> "HashFileDB":
from dvc.cachemgr import CacheManager
from dvc_data.hashfile.db import get_odb

path = self.path
if self.worktree:
path = self.fs.path.join(path, ".dvc", "cache")

else:
path = self.fs.path.join(path, CacheManager.FILES_DIR)
return get_odb(self.fs, path, hash_name="md5", **self.config)

@cached_property
def legacy_odb(self) -> "HashFileDB":
from dvc_data.hashfile.db import get_odb

path = self.path
return get_odb(self.fs, path, hash_name="md5-dos2unix", **self.config)


class DataCloud:
"""Class that manages dvc remotes.
Expand Down Expand Up @@ -101,12 +110,17 @@ def get_remote_odb(
self,
name: Optional[str] = None,
command: str = "<command>",
hash_name: str = "md5",
) -> "HashFileDB":
from dvc.cachemgr import LEGACY_HASH_NAMES

remote = self.get_remote(name=name, command=command)
if remote.fs.version_aware or remote.worktree:
raise NoRemoteError(
f"'{command}' is unsupported for cloud versioned remotes"
)
if hash_name in LEGACY_HASH_NAMES:
return remote.legacy_odb
return remote.odb

def _log_missing(self, status: "CompareStatusResult"):
Expand Down
2 changes: 1 addition & 1 deletion dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _make_fs(
from dvc.fs import DVCFileSystem

config = {"cache": self.repo.config["cache"]}
config["cache"]["dir"] = self.repo.cache.local.path
config["cache"]["dir"] = self.repo.cache.local_cache_dir

return DVCFileSystem(
url=self.def_repo[self.PARAM_URL],
Expand Down
2 changes: 1 addition & 1 deletion dvc/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _get_caches(cache):
caches = (
cache_type
for cache_type, cache_instance in cache.by_scheme()
if cache_instance and cache_type != "repo"
if cache_instance and cache_type not in ("repo", "legacy")
)

# Caches will be always non-empty including the local cache
Expand Down
14 changes: 11 additions & 3 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,14 @@ def cache(self):
raise RemoteCacheRequiredError(self.fs.protocol, self.fs_path)
return odb

@property
def local_cache(self):
from dvc.cachemgr import LEGACY_HASH_NAMES

if self.hash_name in LEGACY_HASH_NAMES:
return self.repo.cache.legacy
return self.repo.cache.local

@property
def cache_path(self):
return self.cache.fs.unstrip_protocol(
Expand All @@ -523,7 +531,7 @@ def _get_hash_meta(self):
if self.use_cache:
odb = self.cache
else:
odb = self.repo.cache.local
odb = self.local_cache
_, meta, obj = build(
odb,
self.fs_path,
Expand Down Expand Up @@ -674,7 +682,7 @@ def save(self) -> None:
)
else:
_, self.meta, self.obj = build(
self.repo.cache.local,
self.local_cache,
self.fs_path,
self.fs,
self.hash_name,
Expand Down Expand Up @@ -1292,7 +1300,7 @@ def add( # noqa: C901
)

assert self.repo
cache = self.cache if self.use_cache else self.repo.cache.local
cache = self.cache if self.use_cache else self.local_cache
assert isinstance(cache, HashFileDB)

new: "HashFile"
Expand Down
4 changes: 2 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ def _ignore(self):
flist = [self.config.files["local"]]
if tmp_dir := self.tmp_dir:
flist.append(tmp_dir)
if path_isin(self.cache.repo.path, self.root_dir):
flist.append(self.cache.repo.path)
if path_isin(self.cache.legacy.path, self.root_dir):
flist.append(self.cache.legacy.path)

for file in flist:
self.scm_context.ignore(file)
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/experiments/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def init_cache(
self, repo: "Repo", rev: str, run_cache: bool = True # noqa: ARG002
):
"""Initialize DVC cache."""
self._update_config({"cache": {"dir": repo.cache.repo.path}})
self._update_config({"cache": {"dir": repo.cache.local_cache_dir}})

def cleanup(self, infofile: Optional[str] = None):
super().cleanup(infofile)
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/open_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _get_remote_config(url):
name = "auto-generated-upstream"
return {
"core": {"remote": name},
"remote": {name: {"url": repo.cache.local.path}},
"remote": {name: {"url": repo.cache.local_cache_dir}},
}

# Use original remote to make sure that we are using correct url,
Expand Down
26 changes: 15 additions & 11 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _get_stage_hash(stage):
class StageCache:
def __init__(self, repo):
self.repo = repo
self.cache_dir = os.path.join(self.repo.cache.local.path, "runs")
self.cache_dir = os.path.join(self.repo.cache.legacy.path, "runs")

def _get_cache_dir(self, key):
return os.path.join(self.cache_dir, key[:2], key)
Expand Down Expand Up @@ -119,12 +119,12 @@ def _create_stage(self, cache, wdir=None):

@contextmanager
def _cache_type_copy(self):
cache_types = self.repo.cache.local.cache_types
self.repo.cache.local.cache_types = ["copy"]
cache_types = self.repo.cache.legacy.cache_types
self.repo.cache.legacy.cache_types = ["copy"]
try:
yield
finally:
self.repo.cache.local.cache_types = cache_types
self.repo.cache.legacy.cache_types = cache_types

def _uncached_outs(self, stage, cache):
# NOTE: using temporary stage to avoid accidentally modifying original
Expand Down Expand Up @@ -166,14 +166,14 @@ def save(self, stage):
COMPILED_LOCK_FILE_STAGE_SCHEMA(cache)

path = self._get_cache_path(cache_key, cache_value)
local_fs = self.repo.cache.local.fs
local_fs = self.repo.cache.legacy.fs
parent = local_fs.path.parent(path)
self.repo.cache.local.makedirs(parent)
self.repo.cache.legacy.makedirs(parent)
tmp = local_fs.path.join(parent, fs.utils.tmp_fname())
assert os.path.exists(parent)
assert os.path.isdir(parent)
dump_yaml(tmp, cache)
self.repo.cache.local.move(tmp, path)
self.repo.cache.legacy.move(tmp, path)

def restore(self, stage, run_cache=True, pull=False, dry=False):
from .serialize import to_single_stage_lockfile
Expand Down Expand Up @@ -258,12 +258,16 @@ def transfer(self, from_odb, to_odb):
return ret

def push(self, remote: Optional[str], odb: Optional["ObjectDB"] = None):
dest_odb = odb or self.repo.cloud.get_remote_odb(remote, "push --run-cache")
return self.transfer(self.repo.cache.local, dest_odb)
dest_odb = odb or self.repo.cloud.get_remote_odb(
remote, "push --run-cache", hash_name="md5-dos2unix"
)
return self.transfer(self.repo.cache.legacy, dest_odb)

def pull(self, remote: Optional[str], odb: Optional["ObjectDB"] = None):
odb = odb or self.repo.cloud.get_remote_odb(remote, "fetch --run-cache")
return self.transfer(odb, self.repo.cache.local)
odb = odb or self.repo.cloud.get_remote_odb(
remote, "fetch --run-cache", hash_name="md5-dos2unix"
)
return self.transfer(odb, self.repo.cache.legacy)

def get_used_objs(self, used_run_cache, *args, **kwargs):
"""Return used cache for the specified run-cached stages."""
Expand Down
2 changes: 1 addition & 1 deletion dvc/testing/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class TestAPI:
def test_get_url(self, tmp_dir, dvc, remote):
tmp_dir.dvc_gen("foo", "foo")

expected_url = (remote / "ac/bd18db4cc2f85cedef654fccc4a4d8").url
expected_url = (remote / "files" / "ac/bd18db4cc2f85cedef654fccc4a4d8").url
assert api.get_url("foo") == expected_url

def test_open(self, tmp_dir, dvc, remote):
Expand Down
6 changes: 4 additions & 2 deletions dvc/testing/workspace_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_add(self, tmp_dir, dvc, workspace, hash_name, hash_value):
)
assert (workspace / "file").read_text() == "file"
assert (
workspace / "cache" / hash_value[:2] / hash_value[2:]
workspace / "cache" / "files" / hash_value[:2] / hash_value[2:]
).read_text() == "file"

assert dvc.status() == {}
Expand All @@ -191,7 +191,9 @@ def test_add_dir(self, tmp_dir, dvc, workspace, hash_name, dir_hash_value):
workspace.gen({"dir": {"file": "file", "subdir": {"subfile": "subfile"}}})

dvc.add("remote://workspace/dir")
assert (workspace / "cache" / dir_hash_value[:2] / dir_hash_value[2:]).is_file()
assert (
workspace / "cache" / "files" / dir_hash_value[:2] / dir_hash_value[2:]
).is_file()


def match_files(fs, entries, expected):
Expand Down
18 changes: 11 additions & 7 deletions tests/func/api/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_get_url_external(tmp_dir, erepo_dir, cloud):

# Using file url to force clone to tmp repo
repo_url = f"file://{erepo_dir.as_posix()}"
expected_url = (cloud / "ac/bd18db4cc2f85cedef654fccc4a4d8").url
expected_url = (cloud / "files" / "ac/bd18db4cc2f85cedef654fccc4a4d8").url
assert api.get_url("foo", repo=repo_url) == expected_url


Expand Down Expand Up @@ -166,16 +166,16 @@ def test_get_url_granular(tmp_dir, dvc, cloud):
tmp_dir.add_remote(config=cloud.config)
tmp_dir.dvc_gen({"dir": {"foo": "foo", "bar": "bar", "nested": {"file": "file"}}})

expected_url = (cloud / "5f" / "c28ea78987408341668eba6525ebd1.dir").url
expected_url = (cloud / "files" / "5f" / "c28ea78987408341668eba6525ebd1.dir").url
assert api.get_url("dir") == expected_url

expected_url = (cloud / "ac" / "bd18db4cc2f85cedef654fccc4a4d8").url
expected_url = (cloud / "files" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8").url
assert api.get_url("dir/foo") == expected_url

expected_url = (cloud / "37" / "b51d194a7513e45b56f6524f2d51f2").url
expected_url = (cloud / "files" / "37" / "b51d194a7513e45b56f6524f2d51f2").url
assert api.get_url("dir/bar") == expected_url

expected_url = (cloud / "8c" / "7dd922ad47494fc02c388e12c00eac").url
expected_url = (cloud / "files" / "8c" / "7dd922ad47494fc02c388e12c00eac").url
assert api.get_url(os.path.join("dir", "nested", "file")) == expected_url


Expand All @@ -186,10 +186,14 @@ def test_get_url_subrepos(tmp_dir, scm, local_cloud):
subrepo.dvc_gen({"dir": {"foo": "foo"}, "bar": "bar"}, commit="add files")
subrepo.dvc.push()

expected_url = os.fspath(local_cloud / "ac" / "bd18db4cc2f85cedef654fccc4a4d8")
expected_url = os.fspath(
local_cloud / "files" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8"
)
assert api.get_url(os.path.join("subrepo", "dir", "foo")) == expected_url

expected_url = os.fspath(local_cloud / "37" / "b51d194a7513e45b56f6524f2d51f2")
expected_url = os.fspath(
local_cloud / "files" / "37" / "b51d194a7513e45b56f6524f2d51f2"
)
assert api.get_url("subrepo/bar") == expected_url


Expand Down
6 changes: 3 additions & 3 deletions tests/func/test_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,10 +755,10 @@ def test_add_symlink_file(tmp_dir, dvc):
assert (tmp_dir / "dir" / "bar").read_text() == "bar"

assert (
tmp_dir / ".dvc" / "cache" / "37" / "b51d194a7513e45b56f6524f2d51f2"
tmp_dir / ".dvc" / "cache" / "files" / "37" / "b51d194a7513e45b56f6524f2d51f2"
).read_text() == "bar"
assert not (
tmp_dir / ".dvc" / "cache" / "37" / "b51d194a7513e45b56f6524f2d51f2"
tmp_dir / ".dvc" / "cache" / "files" / "37" / "b51d194a7513e45b56f6524f2d51f2"
).is_symlink()

# Test that subsequent add succeeds
Expand Down Expand Up @@ -816,7 +816,7 @@ def test_add_with_cache_link_error(tmp_dir, dvc, mocker, capsys):
assert (tmp_dir / "foo").exists()
assert (tmp_dir / "foo.dvc").exists()
assert (
tmp_dir / ".dvc" / "cache" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8"
tmp_dir / ".dvc" / "cache" / "files" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8"
).read_text() == "foo"


Expand Down
4 changes: 2 additions & 2 deletions tests/func/test_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_commit_granular_output(tmp_dir, dvc):
no_commit=True,
)

cache = tmp_dir / ".dvc" / "cache"
cache = tmp_dir / ".dvc" / "cache" / "files"
assert not list(cache.glob("*/*"))

dvc.commit("foo")
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_commit_granular_dir(tmp_dir, dvc):
)
dvc.add("data", no_commit=True)

cache = tmp_dir / ".dvc" / "cache"
cache = tmp_dir / ".dvc" / "cache" / "files"

assert set(cache.glob("*/*")) == set()

Expand Down
4 changes: 2 additions & 2 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ def test_push_pull_fetch_pipeline_stages(tmp_dir, dvc, run_copy, local_remote):

dvc.pull("copy-foo-bar")
assert (tmp_dir / "bar").exists()
assert len(recurse_list_dir(dvc.cache.local.path)) == 2
assert len(recurse_list_dir(dvc.cache.local.path)) == 1
clean(["bar"], dvc)

dvc.fetch("copy-foo-bar")
assert len(recurse_list_dir(dvc.cache.local.path)) == 2
assert len(recurse_list_dir(dvc.cache.local.path)) == 1


def test_pull_partial(tmp_dir, dvc, local_remote):
Expand Down
Loading

0 comments on commit b94f097

Please sign in to comment.