diff --git a/ampel/ztf/archive/server/s3.py b/ampel/ztf/archive/server/s3.py index e2f3826..3024d27 100644 --- a/ampel/ztf/archive/server/s3.py +++ b/ampel/ztf/archive/server/s3.py @@ -1,6 +1,7 @@ from functools import lru_cache import io from typing import TYPE_CHECKING +from urllib.parse import urlsplit import boto3 from botocore.exceptions import ClientError @@ -13,8 +14,9 @@ from mypy_boto3_s3.service_resource import Bucket from typing import BinaryIO -class NoSuchKey(KeyError): - ... + +class NoSuchKey(KeyError): ... + @lru_cache(maxsize=1) def get_s3_bucket() -> "Bucket": @@ -38,7 +40,7 @@ def get_object(bucket: "Bucket", key: str) -> bytes: def get_stream(bucket: "Bucket", key: str) -> "BinaryIO": response = bucket.Object(key).get() if response["ResponseMetadata"]["HTTPStatusCode"] <= 400: - return response["Body"] # type: ignore[return-value] + return response["Body"] # type: ignore[return-value] else: raise KeyError @@ -60,10 +62,16 @@ def get_range( schema = get_parsed_schema(read_schema(get_stream(bucket, key))) else: schema = ALERT_SCHEMAS[schema_key] - return response["Body"], schema # type: ignore[return-value] + return response["Body"], schema # type: ignore[return-value] else: raise KeyError def get_url_for_key(bucket: "Bucket", key: str) -> str: return f"{settings.s3_endpoint_url or ''}/{bucket.name}/{key}" + + +def get_key_for_url(bucket: "Bucket", uri: str) -> str: + path = urlsplit(uri).path.split("/") + assert path[-2] == bucket.name + return path diff --git a/migrations/02_add_archive_count.py b/migrations/02_add_archive_count.py new file mode 100644 index 0000000..1869926 --- /dev/null +++ b/migrations/02_add_archive_count.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +import logging +import time +from argparse import ArgumentParser +import sqlalchemy as sa +import fastavro + +import boto3 + +from ampel.ztf.t0.ArchiveUpdater import ArchiveUpdater +from ampel.ztf.archive.server.s3 import get_object, get_key_for_url + + +logging.basicConfig(level="INFO", format="[%(asctime)s] %(message)s") +log = logging.getLogger() + +parser = ArgumentParser() +parser.add_argument("uri") +parser.add_argument("--s3-bucket") +parser.add_argument("--s3-endpoint") + + +args = parser.parse_args() + +engine = sa.engine.create_engine(args.uri) + +bucket = boto3.resource("s3", endpoint_url=args.s3_endpoint).Bucket(args.s3_bucket) + +with engine.connect() as connection: + meta = sa.MetaData(connection) + meta.reflect() + + Alert = meta.tables["alert"] + Archive = meta.tables["avro_archive"] + + query = Archive.select(Archive.c.alerts is None) + + for row in connection.execute(query).fetchone(): + key = get_key_for_url(row["uri"]) + reader = fastavro.reader(get_object(bucket, key)) + alerts = len(list(reader)) + + print(row, alerts) + + assert False + + # TODO: update alerts field diff --git a/tests/test-data/initdb/archive/25_avro_archive_count.sql b/tests/test-data/initdb/archive/25_avro_archive_count.sql new file mode 100644 index 0000000..0539bc4 --- /dev/null +++ b/tests/test-data/initdb/archive/25_avro_archive_count.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE avro_archive ADD count INTEGER; + +COMMIT; \ No newline at end of file