Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ingest_zarr_archive task #1387

Merged
merged 6 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dandiapi/api/management/commands/create_dev_dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

@click.command()
@click.option('--name', default='Development Dandiset')
@click.option('--owner', required=True, help='The email address of the owner')
def create_dev_dandiset(name: str, owner: str):
owner = User.objects.get(email=owner)
@click.option('--owner', 'email', required=True, help='The email address of the owner')
def create_dev_dandiset(name: str, email: str):
owner = User.objects.get(email=email)

version_metadata = {
'description': 'An informative description',
Expand Down
52 changes: 31 additions & 21 deletions dandiapi/zarr/checksums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Mapping
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
import heapq
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -132,6 +133,9 @@ class ZarrChecksumModification:
directories_to_update: list[ZarrChecksum] = field(default_factory=list)
paths_to_remove: list[str] = field(default_factory=list)

def __lt__(self, other):
return str(self.path) < str(other.path)


class ZarrChecksumModificationQueue:
"""
Expand All @@ -143,38 +147,44 @@ class ZarrChecksumModificationQueue:
"""

def __init__(self) -> None:
self._queue: dict[Path, ZarrChecksumModification] = {}
self._heap: list[tuple[int, ZarrChecksumModification]] = []
self._path_map: dict[Path, ZarrChecksumModification] = {}

def _add_path(self, key: Path):
modification = ZarrChecksumModification(path=key)

# Add link to modification
self._path_map[key] = modification

# Add modification to heap with length (negated to representa max heap)
length = len(key.parents)
heapq.heappush(self._heap, (-1 * length, modification))

def _get_path(self, key: Path):
if key not in self._path_map:
self._add_path(key)

return self._path_map[key]

def queue_file_update(self, key: Path, checksum: ZarrChecksum):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].files_to_update.append(checksum)
self._get_path(key).files_to_update.append(checksum)

def queue_directory_update(self, key: Path, checksum: ZarrChecksum):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].directories_to_update.append(checksum)
self._get_path(key).directories_to_update.append(checksum)

def queue_removal(self, key: Path, path: Path | str):
if key not in self._queue:
self._queue[key] = ZarrChecksumModification(path=key)
self._queue[key].paths_to_remove.append(str(path))
self._get_path(key).paths_to_remove.append(str(path))

def pop_deepest(self):
def pop_deepest(self) -> ZarrChecksumModification:
"""Find the deepest path in the queue, and return it and its children to be updated."""
longest_path = list(self._queue.keys())[0]
longest_path_length = str(longest_path).split('/')
# O(n) performance, consider a priority queue for optimization
for path in self._queue.keys():
path_length = str(path).split('/')
if path_length > longest_path_length:
longest_path = path
longest_path_length = path_length
return self._queue.pop(longest_path)
_, modification = heapq.heappop(self._heap)
del self._path_map[modification.path]

return modification

@property
def empty(self):
return len(self._queue) == 0
return len(self._heap) == 0


class ZarrChecksumUpdater:
Expand Down
25 changes: 14 additions & 11 deletions dandiapi/zarr/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ZarrChecksumFileUpdater,
ZarrChecksumListing,
ZarrChecksumModification,
ZarrChecksumModificationQueue,
ZarrChecksumUpdater,
)
from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus
Expand Down Expand Up @@ -134,7 +135,8 @@ def ingest_zarr_archive(

# Instantiate updater and add files as they come in
empty = True
updater = SessionZarrChecksumUpdater(zarr_archive=zarr)
queue = ZarrChecksumModificationQueue()
logger.info(f'Fetching files for zarr {zarr.zarr_id}...')
for files in yield_files(bucket=zarr.storage.bucket_name, prefix=zarr.s3_path('')):
if len(files):
empty = False
Expand All @@ -147,16 +149,17 @@ def ingest_zarr_archive(

# Update checksums
if not no_checksum:
updater.update_file_checksums(
{
file['Key'].replace(zarr.s3_path(''), ''): ZarrChecksum(
digest=file['ETag'].strip('"'),
name=Path(file['Key'].replace(zarr.s3_path(''), '')).name,
size=file['Size'],
)
for file in files
}
)
for file in files:
path = Path(file['Key'].replace(zarr.s3_path(''), ''))
checksum = ZarrChecksum(
name=path.name,
size=file['Size'],
digest=file['ETag'].strip('"'),
)
queue.queue_file_update(key=path.parent, checksum=checksum)

# Perform updates
SessionZarrChecksumUpdater(zarr_archive=zarr).modify(modifications=queue)

# Set checksum field to top level checksum, after ingestion completion
checksum = zarr.get_checksum()
Expand Down