Skip to content

Commit

Permalink
chunk index cache: use cache/chunks.<HASH>, see #8503
Browse files Browse the repository at this point in the history
- doesn't need a separate file for the hash
- we can later write multiple partial chunkindexes to the cache

also:

add upgrade code that renames the cache from previous borg versions.
  • Loading branch information
ThomasWaldmann committed Nov 8, 2024
1 parent e9ace2d commit e9e740f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 50 deletions.
17 changes: 4 additions & 13 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from . import xattr
from .chunker import get_chunker, Chunk
from .cache import ChunkListEntry, build_chunkindex_from_repo
from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache
from .crypto.key import key_factory, UnsupportedPayloadError
from .compress import CompressionSpec
from .constants import * # NOQA
Expand Down Expand Up @@ -50,7 +50,7 @@
from .item import Item, ArchiveItem, ItemDiff
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
from .remote import RemoteRepository, cache_if_remote
from .repository import Repository, NoManifestError, StoreObjectNotFound
from .repository import Repository, NoManifestError
from .repoobj import RepoObj

has_link = hasattr(os, "link")
Expand Down Expand Up @@ -2138,18 +2138,9 @@ def valid_item(obj):

def finish(self):
if self.repair:
# we may have deleted chunks, remove the chunks index cache!
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
# we may have deleted chunks, invalidate/remove the chunks index cache!
try:
self.repository.store_delete("cache/chunks_hash")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
try:
self.repository.store_delete("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
delete_chunkindex_cache(self.repository)
logger.info("Writing Manifest.")
self.manifest.write()

Expand Down
2 changes: 1 addition & 1 deletion src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def save_chunk_index(self):
# as we put the wrong size in there, we need to clean up the size:
self.chunks[id] = entry._replace(size=0)
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True)
write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True)
self.chunks = None # nothing there (cleared!)

def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
Expand Down
97 changes: 62 additions & 35 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,63 +663,90 @@ def memorize_file(self, hashed_path, path_hash, st, chunks):
)


def load_chunks_hash(repository) -> bytes:
def try_upgrade_to_b14(repository):
# TODO: remove this before 2.0.0 release
try:
hash = repository.store_load("cache/chunks_hash")
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
hash = b""
logger.debug("cache/chunks_hash missing!")
return hash
pass # likely already upgraded
else:
old_name = "cache/chunks"
new_name = f"cache/chunks.{bin_to_hex(hash)}"
logger.debug(f"renaming {old_name} to {new_name}.")
repository.store_move(old_name, new_name)
repository.store_delete("cache/chunks_hash")


def list_chunkindex_hashes(repository):
hashes = [
info.name.removeprefix("chunks.") for info in repository.store_list("cache") if info.name.startswith("chunks.")
]
return set(hashes)


def delete_chunkindex_cache(repository):
for hash in list_chunkindex_hashes(repository):
cache_name = f"cache/chunks.{hash}"
try:
repository.store_delete(cache_name)
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass


CHUNKINDEX_HASH_SEED = 2


def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False):
cached_hash = load_chunks_hash(repository)
def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False, delete_other=False):
cached_hashes = list_chunkindex_hashes(repository)
with io.BytesIO() as f:
chunks.write(f)
data = f.getvalue()
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
new_hash = xxh64(data, seed=CHUNKINDEX_HASH_SEED)
if force_write or new_hash != cached_hash:
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
if force_write or new_hash not in cached_hashes:
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
# hash against cache/chunks_hash in the repository. if it is the same, the cache
# is valid. If it is different, the cache is either corrupted or out of date and
# has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to either update
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
# hash against the hash in its name. if it is the same, the cache is valid.
# if it is different, the cache is either corrupted or out of date and has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to delete
# all existing cache/chunks.* and maybe write a new, valid cache/chunks.<hash>,
# so that all clients will discard any client-local chunks index caches.
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
repository.store_store("cache/chunks", data)
repository.store_store("cache/chunks_hash", new_hash)
cache_name = f"cache/chunks.{new_hash}"
logger.debug(f"caching chunks index as {cache_name} in repository...")
repository.store_store(cache_name, data)
if delete_other:
for hash in cached_hashes:
cache_name = f"cache/chunks.{hash}"
repository.store_delete(cache_name)
return new_hash


def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
chunks = None
try_upgrade_to_b14(repository)
# first, try to load a pre-computed and centrally cached chunks index:
if not disable_caches:
wanted_hash = load_chunks_hash(repository)
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
try:
chunks_data = repository.store_load("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
logger.debug("cache/chunks not found in the repository.")
else:
if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == wanted_hash:
logger.debug("cache/chunks is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
return chunks
hashes = list_chunkindex_hashes(repository)
assert len(hashes) <= 1, f"chunk indexes: {hashes}" # later we change to multiple chunkindexes...
for hash in hashes:
cache_name = f"cache/chunks.{hash}"
logger.debug(f"trying to load {cache_name} from the repo...")
try:
chunks_data = repository.store_load(cache_name)
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
logger.debug(f"{cache_name} not found in the repository.")
else:
logger.debug("cache/chunks is invalid.")
if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
logger.debug(f"{cache_name} is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
return chunks
else:
logger.debug(f"{cache_name} is invalid.")
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
logger.debug("querying the chunk IDs list from the repo...")
chunks = ChunkIndex()
Expand All @@ -741,7 +768,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True)
write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
return chunks


Expand Down Expand Up @@ -817,7 +844,7 @@ def add_chunk(

def _write_chunks_cache(self, chunks):
# this is called from .close, so we can clear here:
write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True)
write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=True)
self._chunks = None # nothing there (cleared!)

def refresh_lock(self, now):
Expand Down
2 changes: 1 addition & 1 deletion src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def check_object(obj):
# if we did a full pass in one go, we built a complete, uptodate ChunkIndex, cache it!
from .cache import write_chunkindex_to_repo_cache

write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True)
write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True, delete_other=True)
except StoreObjectNotFound:
# it can be that there is no "data/" at all, then it crashes when iterating infos.
pass
Expand Down

0 comments on commit e9e740f

Please sign in to comment.