Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of checksum files in zarr ingestion #1395

Merged
merged 7 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dandiapi/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def mutate_configuration(configuration: type[ComposedConfiguration]):
DANDI_DANDISETS_EMBARGO_BUCKET_NAME = values.Value(environ_required=True)
DANDI_DANDISETS_EMBARGO_BUCKET_PREFIX = values.Value(default='', environ=True)
DANDI_ZARR_PREFIX_NAME = values.Value(default='zarr', environ=True)
DANDI_ZARR_CHECKSUM_PREFIX_NAME = values.Value(default='zarr-checksums', environ=True)

# Mainly applies to unembargo
DANDI_MULTIPART_COPY_MAX_WORKERS = values.IntegerValue(environ=True, default=50)
Expand Down Expand Up @@ -134,7 +133,6 @@ class TestingConfiguration(DandiMixin, TestingBaseConfiguration):
DANDI_DANDISETS_EMBARGO_BUCKET_NAME = 'test--embargo-dandiapi-dandisets'
DANDI_DANDISETS_EMBARGO_BUCKET_PREFIX = 'test-embargo-prefix/'
DANDI_ZARR_PREFIX_NAME = 'test-zarr'
DANDI_ZARR_CHECKSUM_PREFIX_NAME = 'test-zarr-checksums'
DANDI_JUPYTERHUB_URL = 'https://hub.dandiarchive.org/'

# This makes the dandischema pydantic model allow URLs with localhost in them.
Expand Down
265 changes: 29 additions & 236 deletions dandiapi/zarr/checksums.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,11 @@
from __future__ import annotations

from collections.abc import Mapping
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
from datetime import datetime
import heapq
from pathlib import Path
import re
from typing import TYPE_CHECKING

from django.conf import settings
from django.core.files.base import ContentFile

if TYPE_CHECKING:
from dandiapi.zarr.models import EmbargoedZarrArchive, ZarrArchive

from dandischema.digests.zarr import (
ZarrChecksum,
ZarrChecksumListing,
ZarrChecksums,
ZarrJSONChecksumSerializer,
)
from dandischema.digests.zarr import EMPTY_CHECKSUM, ZarrChecksum, ZarrJSONChecksumSerializer

ZARR_CHECKSUM_REGEX = r'[0-9a-f]+-(\d+)--(\d+)'

Expand All @@ -35,145 +20,17 @@ def parse_checksum_string(checksum: str) -> tuple[int, int]:
return (int(count), int(size))


class ZarrChecksumFileUpdater(AbstractContextManager):
"""
A utility class specifically for updating zarr checksum files.

When used as a context manager, the checksum file will be loaded and written to automatically.
"""

_default_serializer = ZarrJSONChecksumSerializer()

def __init__(
self,
zarr_archive: ZarrArchive | EmbargoedZarrArchive,
zarr_directory_path: str | Path,
serializer=_default_serializer,
):
self.zarr_archive = zarr_archive
self.zarr_directory_path = f'{str(zarr_directory_path)}/'
if self.zarr_directory_path in ['/', './']:
self.zarr_directory_path = ''
self._serializer = serializer

# This is loaded when an instance is used as a context manager,
# then saved when the context manager exits.
self._checksums = None

def __enter__(self):
existing_zarr_checksum = self.read_checksum_file()
if existing_zarr_checksum:
self._checksums = existing_zarr_checksum.checksums
else:
self._checksums = ZarrChecksums()
return self

def __exit__(self, exc_type, *exc):
# If there was an exception, do not write anything
if exc_type:
return None # this means throw the exception as normal
if not self.checksum_listing.checksums.is_empty:
self.write_checksum_file(self.checksum_listing)
else:
# If there are no checksums to write, simply delete the checksum file.
self.delete_checksum_file()

@property
def checksum_file_path(self):
"""Generate the path of the checksum file to update."""
return f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}{settings.DANDI_ZARR_CHECKSUM_PREFIX_NAME}/{self.zarr_archive.zarr_id}/{self.zarr_directory_path}.checksum' # noqa: E501

@property
def checksum_listing(self) -> ZarrChecksumListing:
"""Get the current state of the updater."""
if self._checksums is None:
raise ValueError('This method is only valid when used by a context manager')
return self._serializer.generate_listing(self._checksums)

def read_checksum_file(self) -> ZarrChecksumListing | None:
"""Load a checksum listing from the checksum file."""
storage = self.zarr_archive.storage
checksum_path = self.checksum_file_path
if storage.exists(checksum_path):
with storage.open(checksum_path) as f:
x = f.read()
return self._serializer.deserialize(x)
else:
return None

def write_checksum_file(self, zarr_checksum: ZarrChecksumListing):
"""Write a checksum listing to the checksum file."""
storage = self.zarr_archive.storage
content_file = ContentFile(self._serializer.serialize(zarr_checksum).encode('utf-8'))
# save() will never overwrite an existing file, it simply appends some garbage to ensure
# uniqueness. _save() is an internal storage API that will overwite existing files.
storage._save(self.checksum_file_path, content_file)

def delete_checksum_file(self):
"""Delete the checksum file."""
storage = self.zarr_archive.storage
storage.delete(self.checksum_file_path)

def add_file_checksums(self, checksums: list[ZarrChecksum]):
"""Add a list of file checksums to the listing."""
if self._checksums is None:
raise ValueError('This method is only valid when used by a context manager')
self._checksums.add_file_checksums(checksums)

def add_directory_checksums(self, checksums: list[ZarrChecksum]):
"""Add a list of directory checksums to the listing."""
if self._checksums is None:
raise ValueError('This method is only valid when used by a context manager')
self._checksums.add_directory_checksums(checksums)

def remove_checksums(self, paths: list[str]):
"""Remove a list of paths from the listing."""
if self._checksums is None:
raise ValueError('This method is only valid when used by a context manager')
self._checksums.remove_checksums([Path(path).name for path in paths])


class SessionZarrChecksumFileUpdater(ZarrChecksumFileUpdater):
"""Override ZarrChecksumFileUpdater to ignore old files."""

def __init__(self, *args, session_start: datetime, **kwargs):
super().__init__(*args, **kwargs)

# Set the start of when new files may have been written
self._session_start = session_start

def read_checksum_file(self) -> ZarrChecksumListing | None:
"""Load a checksum listing from the checksum file."""
storage = self.zarr_archive.storage
checksum_path = self.checksum_file_path

# Check if this file was modified during this session
read_existing = False
if storage.exists(checksum_path):
read_existing = storage.modified_time(self.checksum_file_path) >= self._session_start

# Read existing file if necessary
if read_existing:
with storage.open(checksum_path) as f:
x = f.read()
return self._serializer.deserialize(x)
else:
return None


@dataclass
class ZarrChecksumModification:
"""
A set of changes to apply to a ZarrChecksumListing.

Additions or modifications are stored in files_to_update and directories_to_update.
Removals are stored in paths_to_remove.
"""

path: Path
files_to_update: list[ZarrChecksum] = field(default_factory=list)
directories_to_update: list[ZarrChecksum] = field(default_factory=list)
paths_to_remove: list[str] = field(default_factory=list)

def __lt__(self, other):
return str(self.path) < str(other.path)
Expand All @@ -192,6 +49,10 @@ def __init__(self) -> None:
self._heap: list[tuple[int, ZarrChecksumModification]] = []
self._path_map: dict[Path, ZarrChecksumModification] = {}

@property
def empty(self):
return len(self._heap) == 0

def _add_path(self, key: Path):
modification = ZarrChecksumModification(path=key)

Expand All @@ -214,105 +75,37 @@ def queue_file_update(self, key: Path, checksum: ZarrChecksum):
def queue_directory_update(self, key: Path, checksum: ZarrChecksum):
self._get_path(key).directories_to_update.append(checksum)

def queue_removal(self, key: Path, path: Path | str):
self._get_path(key).paths_to_remove.append(str(path))

def pop_deepest(self) -> ZarrChecksumModification:
"""Find the deepest path in the queue, and return it and its children to be updated."""
_, modification = heapq.heappop(self._heap)
del self._path_map[modification.path]

return modification

@property
def empty(self):
return len(self._heap) == 0


class ZarrChecksumUpdater:
"""A helper for updating batches of checksums in a zarr archive."""

def __init__(self, zarr_archive: ZarrArchive | EmbargoedZarrArchive) -> None:
self.zarr_archive = zarr_archive
def process(self):
"""Process the queue, returning the resulting top level digest."""
while not self.empty:
# Pop the deepest directory available
modification = self.pop_deepest()

def apply_modification(self, modification: ZarrChecksumModification) -> ZarrChecksumFileUpdater:
with ZarrChecksumFileUpdater(self.zarr_archive, modification.path) as file_updater:
# Removing a checksum takes precedence over adding/modifying that checksum
file_updater.add_file_checksums(modification.files_to_update)
file_updater.add_directory_checksums(modification.directories_to_update)
file_updater.remove_checksums(modification.paths_to_remove)

return file_updater

def modify(self, modifications: ZarrChecksumModificationQueue):
while not modifications.empty:
modification = modifications.pop_deepest()
print(
f'Applying modifications to {self.zarr_archive.zarr_id}:{modification.path} '
f'({len(modification.files_to_update)} files, '
f'{len(modification.directories_to_update)} directories, '
f'{len(modification.paths_to_remove)} removals)'
# Generates a sorted checksum listing for the current path
checksum_listing = ZarrJSONChecksumSerializer().generate_listing(
files=modification.files_to_update, directories=modification.directories_to_update
)
latest_checksum = checksum_listing

# If we have reached the root node, then we're done.
if modification.path == Path('.') or modification.path == Path('/'):
return latest_checksum.digest

# The parent needs to incorporate the checksum modification we just made.
self.queue_directory_update(
modification.path.parent,
ZarrChecksum(
name=modification.path.name,
digest=checksum_listing.digest,
size=checksum_listing.size,
),
)

# Apply modification
file_updater = self.apply_modification(modification)

# If we have reached the root node, then we obviously do not need to update the parent.
if modification.path != Path('.') and modification.path != Path('/'):
if file_updater.checksum_listing.checksums.is_empty:
# We have removed all checksums from this checksum file.
# ZarrChecksumFileUpdater will have already deleted the checksum file, so all
# we need to do is queue this checksum for removal with it's parent.
modifications.queue_removal(modification.path.parent, modification.path)
else:
# The parent needs to incorporate the checksum modification we just made.
modifications.queue_directory_update(
modification.path.parent,
ZarrChecksum(
name=modification.path.name,
digest=file_updater.checksum_listing.digest,
size=file_updater.checksum_listing.size,
),
)

def update_file_checksums(self, checksums: Mapping[str, ZarrChecksum]):
"""
Update the given checksums.

checksums: a mapping of path to the new checksum for that path.
"""
modifications = ZarrChecksumModificationQueue()
for path, checksum in checksums.items():
modifications.queue_file_update(Path(path).parent, checksum)
self.modify(modifications)

def remove_checksums(self, paths: list[str]):
modifications = ZarrChecksumModificationQueue()
for path in paths:
modifications.queue_removal(Path(path).parent, path)
self.modify(modifications)


class SessionZarrChecksumUpdater(ZarrChecksumUpdater):
"""ZarrChecksumUpdater to distinguish existing and new checksum files."""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

# Set microseconds to zero, since the timestamp from S3/Minio won't include them
# Failure to set this to zero will result in false negatives
self._session_start = datetime.now().replace(tzinfo=None, microsecond=0)

def apply_modification(
self, modification: ZarrChecksumModification
) -> SessionZarrChecksumFileUpdater:
with SessionZarrChecksumFileUpdater(
zarr_archive=self.zarr_archive,
zarr_directory_path=modification.path,
session_start=self._session_start,
) as file_updater:
file_updater.add_file_checksums(modification.files_to_update)
file_updater.add_directory_checksums(modification.directories_to_update)
file_updater.remove_checksums(modification.paths_to_remove)

return file_updater
return EMPTY_CHECKSUM
21 changes: 0 additions & 21 deletions dandiapi/zarr/management/commands/migrate_zarr_checksums.py

This file was deleted.

26 changes: 1 addition & 25 deletions dandiapi/zarr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
from django.db import models
from django.db.models import QuerySet
from django_extensions.db.models import TimeStampedModel
import pydantic
from rest_framework.exceptions import ValidationError

from dandiapi.api.models import Dandiset
from dandiapi.api.storage import get_embargo_storage, get_storage
from dandiapi.zarr.checksums import ZarrChecksum, ZarrChecksumFileUpdater
from dandiapi.zarr.checksums import ZarrChecksum

logger = logging.Logger(name=__name__)

Expand Down Expand Up @@ -153,29 +152,6 @@ def s3_url(self):
s3_url = urlunparse((parsed[0], parsed[1], parsed[2], '', '', ''))
return s3_url

def fetch_checksum(self, path: str | Path = '') -> str | None:
"""
Fetch the zarr checksum at a specific subdirectory (defaults to root).

If a checksum file doesn't exist at the specified path, `None` is returned.
"""
listing = ZarrChecksumFileUpdater(self, path).read_checksum_file()
if listing is not None:
return listing.digest

return None

def get_checksum(self) -> str | None:
"""Return the top level zarr checksum."""
try:
return self.fetch_checksum()

# Handle case where checksum file is malformed,
# log error and return None
except pydantic.ValidationError as e:
logger.error(e, exc_info=True)
return None

def begin_upload(self, files):
if self.upload_in_progress:
raise ValidationError('Simultaneous uploads are not allowed.')
Expand Down
Loading