Skip to content

Commit

Permalink
WIP: recover payload size from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
jvansanten committed Feb 12, 2024
1 parent f1001b7 commit 49cdd67
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
16 changes: 12 additions & 4 deletions ampel/ztf/archive/server/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from functools import lru_cache
import io
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

import boto3
from botocore.exceptions import ClientError
Expand All @@ -13,8 +14,9 @@
from mypy_boto3_s3.service_resource import Bucket
from typing import BinaryIO

class NoSuchKey(KeyError):
...

class NoSuchKey(KeyError): ...


@lru_cache(maxsize=1)
def get_s3_bucket() -> "Bucket":
Expand All @@ -38,7 +40,7 @@ def get_object(bucket: "Bucket", key: str) -> bytes:
def get_stream(bucket: "Bucket", key: str) -> "BinaryIO":
response = bucket.Object(key).get()
if response["ResponseMetadata"]["HTTPStatusCode"] <= 400:
return response["Body"] # type: ignore[return-value]
return response["Body"] # type: ignore[return-value]
else:
raise KeyError

Expand All @@ -60,10 +62,16 @@ def get_range(
schema = get_parsed_schema(read_schema(get_stream(bucket, key)))
else:
schema = ALERT_SCHEMAS[schema_key]
return response["Body"], schema # type: ignore[return-value]
return response["Body"], schema # type: ignore[return-value]
else:
raise KeyError


def get_url_for_key(bucket: "Bucket", key: str) -> str:
return f"{settings.s3_endpoint_url or ''}/{bucket.name}/{key}"


def get_key_for_url(bucket: "Bucket", uri: str) -> str:
path = urlsplit(uri).path.split("/")
assert path[-2] == bucket.name
return path
48 changes: 48 additions & 0 deletions migrations/02_add_archive_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python

import logging
import time
from argparse import ArgumentParser
import sqlalchemy as sa
import fastavro

import boto3

from ampel.ztf.t0.ArchiveUpdater import ArchiveUpdater
from ampel.ztf.archive.server.s3 import get_object, get_key_for_url


logging.basicConfig(level="INFO", format="[%(asctime)s] %(message)s")
log = logging.getLogger()

parser = ArgumentParser()
parser.add_argument("uri")
parser.add_argument("--s3-bucket")
parser.add_argument("--s3-endpoint")


args = parser.parse_args()

engine = sa.engine.create_engine(args.uri)

bucket = boto3.resource("s3", endpoint_url=args.s3_endpoint).Bucket(args.s3_bucket)

with engine.connect() as connection:
meta = sa.MetaData(connection)
meta.reflect()

Alert = meta.tables["alert"]
Archive = meta.tables["avro_archive"]

query = Archive.select(Archive.c.alerts is None)

for row in connection.execute(query).fetchone():
key = get_key_for_url(row["uri"])
reader = fastavro.reader(get_object(bucket, key))
alerts = len(list(reader))

print(row, alerts)

assert False

# TODO: update alerts field
5 changes: 5 additions & 0 deletions tests/test-data/initdb/archive/25_avro_archive_count.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE avro_archive ADD count INTEGER;

COMMIT;

0 comments on commit 49cdd67

Please sign in to comment.