Skip to content

Commit

Permalink
Merge pull request #1387 from dandi/zarr-checksum-optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt authored Dec 8, 2022
2 parents 9dbf73a + 9848539 commit 00b93c4
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
6 changes: 3 additions & 3 deletions dandiapi/api/management/commands/create_dev_dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

@click.command()
@click.option('--name', default='Development Dandiset')
@click.option('--owner', required=True, help='The email address of the owner')
def create_dev_dandiset(name: str, owner: str):
owner = User.objects.get(email=owner)
@click.option('--owner', 'email', required=True, help='The email address of the owner')
def create_dev_dandiset(name: str, email: str):
owner = User.objects.get(email=email)

version_metadata = {
'description': 'An informative description',
Expand Down
52 changes: 31 additions & 21 deletions dandiapi/zarr/checksums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Mapping
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
import heapq
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -132,6 +133,9 @@ class ZarrChecksumModification:
directories_to_update: list[ZarrChecksum] = field(default_factory=list)
paths_to_remove: list[str] = field(default_factory=list)

def __lt__(self, other):
return str(self.path) < str(other.path)


class ZarrChecksumModificationQueue:
"""
Expand All @@ -143,38 +147,44 @@ class ZarrChecksumModificationQueue:
"""

def __init__(self) -> None:
self._queue: dict[Path, ZarrChecksumModification] = {}
self._heap: list[tuple[int, ZarrChecksumModification]] = []
self._path_map: dict[Path, ZarrChecksumModification] = {}

def _add_path(self, key: Path):
modification = ZarrChecksumModification(path=key)

# Add link to modification
self._path_map[key] = modification

# Add modification to heap with length (negated to representa max heap)
length = len(key.parents)
heapq.heappush(self._heap, (-1 * length, modification))

def _get_path(self, key: Path):
if key not in self._path_map:
self._add_path(key)

return self._path_map[key]

def queue_file_update(self, key: Path, checksum: ZarrChecksum):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].files_to_update.append(checksum)
self._get_path(key).files_to_update.append(checksum)

def queue_directory_update(self, key: Path, checksum: ZarrChecksum):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].directories_to_update.append(checksum)
self._get_path(key).directories_to_update.append(checksum)

def queue_removal(self, key: Path, path: Path | str):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].paths_to_remove.append(str(path))
self._get_path(key).paths_to_remove.append(str(path))

def pop_deepest(self):
def pop_deepest(self) -> ZarrChecksumModification:
"""Find the deepest path in the queue, and return it and its children to be updated."""
longest_path = list(self._queue.keys())[0]
longest_path_length = str(longest_path).split('/')
# O(n) performance, consider a priority queue for optimization
for path in self._queue.keys():
path_length = str(path).split('/')
if path_length > longest_path_length:
longest_path = path
longest_path_length = path_length
return self._queue.pop(longest_path)
_, modification = heapq.heappop(self._heap)
del self._path_map[modification.path]

return modification

@property
def empty(self):
return len(self._queue) == 0
return len(self._heap) == 0


class ZarrChecksumUpdater:
Expand Down
25 changes: 14 additions & 11 deletions dandiapi/zarr/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ZarrChecksumFileUpdater,
ZarrChecksumListing,
ZarrChecksumModification,
ZarrChecksumModificationQueue,
ZarrChecksumUpdater,
)
from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus
Expand Down Expand Up @@ -134,7 +135,8 @@ def ingest_zarr_archive(

# Instantiate updater and add files as they come in
empty = True
updater = SessionZarrChecksumUpdater(zarr_archive=zarr)
queue = ZarrChecksumModificationQueue()
logger.info(f'Fetching files for zarr {zarr.zarr_id}...')
for files in yield_files(bucket=zarr.storage.bucket_name, prefix=zarr.s3_path('')):
if len(files):
empty = False
Expand All @@ -147,16 +149,17 @@ def ingest_zarr_archive(

# Update checksums
if not no_checksum:
updater.update_file_checksums(
{
file['Key'].replace(zarr.s3_path(''), ''): ZarrChecksum(
digest=file['ETag'].strip('"'),
name=Path(file['Key'].replace(zarr.s3_path(''), '')).name,
size=file['Size'],
)
for file in files
}
)
for file in files:
path = Path(file['Key'].replace(zarr.s3_path(''), ''))
checksum = ZarrChecksum(
name=path.name,
size=file['Size'],
digest=file['ETag'].strip('"'),
)
queue.queue_file_update(key=path.parent, checksum=checksum)

# Perform updates
SessionZarrChecksumUpdater(zarr_archive=zarr).modify(modifications=queue)

# Set checksum field to top level checksum, after ingestion completion
checksum = zarr.get_checksum()
Expand Down

0 comments on commit 00b93c4

Please sign in to comment.