-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathapp.py
36 lines (32 loc) · 1.21 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import os
import requests
from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
from stactools.core import use_fsspec
from stactools.sentinel1.grd import Format
from stactools.sentinel1.grd.stac import create_item
from stactools_pipelines.cognito.utils import get_token
@event_source(data_class=SQSEvent)
def handler(event: SQSEvent, context):
ingestor_url = os.environ["INGESTOR_URL"]
ingestions_endpoint = f"{ingestor_url.strip('/')}/ingestions"
token = get_token()
headers = {"Authorization": f"bearer {token}"}
use_fsspec()
for record in event.records:
record_body = json.loads(record.body)
sentinel_message = json.loads(record_body["Message"])
href = f"s3://sentinel-s1-l1c/{sentinel_message['path']}"
print(href)
stac = create_item(
granule_href=href, archive_format=Format.COG, requester_pays=True
)
stac.collection_id = "sentinel1-grd"
response = requests.post(
url=ingestions_endpoint, data=json.dumps(stac.to_dict()), headers=headers
)
try:
response.raise_for_status()
except Exception:
print(response.text)
raise