From 4934cb3e89fa441d1fa655669cd9794b3d33bf77 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:49:16 +0100 Subject: [PATCH] feat(dags): Add satellite dag --- local_archives/__init__.py | 5 +- local_archives/nwp/ceda/ceda_global.py | 5 +- local_archives/sat/__init__.py | 17 +-- local_archives/sat/assets/__init__.py | 3 - .../sat/assets/eumetsat/__init__.py | 1 - local_archives/sat/assets/eumetsat/common.py | 39 ------- local_archives/sat/assets/eumetsat/iodc.py | 14 --- local_archives/sat/jobs.py | 103 ------------------ 8 files changed, 17 insertions(+), 170 deletions(-) delete mode 100644 local_archives/sat/assets/__init__.py delete mode 100644 local_archives/sat/assets/eumetsat/__init__.py delete mode 100644 local_archives/sat/assets/eumetsat/common.py delete mode 100644 local_archives/sat/assets/eumetsat/iodc.py delete mode 100644 local_archives/sat/jobs.py diff --git a/local_archives/__init__.py b/local_archives/__init__.py index 4e1f105..45265eb 100644 --- a/local_archives/__init__.py +++ b/local_archives/__init__.py @@ -7,7 +7,7 @@ import resources from constants import LOCATIONS_BY_ENVIRONMENT -from . import nwp +from . import nwp, sat resources_by_env = { "leo": { @@ -36,14 +36,17 @@ all_assets: list[dg.AssetsDefinition] = [ *nwp.all_assets, + *sat.all_assets, ] all_jobs: list[dg.JobDefinition] = [ *nwp.all_jobs, + *sat.all_jobs, ] all_schedules: list[dg.ScheduleDefinition] = [ *nwp.all_schedules, + *sat.all_schedules, ] defs = dg.Definitions( diff --git a/local_archives/nwp/ceda/ceda_global.py b/local_archives/nwp/ceda/ceda_global.py index e5ae40a..a74ef59 100644 --- a/local_archives/nwp/ceda/ceda_global.py +++ b/local_archives/nwp/ceda/ceda_global.py @@ -1,9 +1,10 @@ -import dagster as dg +import datetime as dt import os from typing import Any -import datetime as dt +import dagster as dg from dagster_docker import PipesDockerClient + from constants import LOCATIONS_BY_ENVIRONMENT env = os.getenv("ENVIRONMENT", "local") diff --git a/local_archives/sat/__init__.py b/local_archives/sat/__init__.py index b2037ce..28aeb1d 100644 --- a/local_archives/sat/__init__.py +++ b/local_archives/sat/__init__.py @@ -1,11 +1,14 @@ -from dagster import Definitions, load_assets_from_modules +"""Definitions for the sat dagster code location.""" -from sat import assets, jobs +import dagster as dg +from . import eumetsat -all_assets = load_assets_from_modules([assets]) +all_assets: list[dg.AssetsDefinition] = [ + *eumetsat.all_assets, +] + +all_jobs: list[dg.JobDefinition] = [] + +all_schedules: list[dg.ScheduleDefinition] = [] -defs = Definitions( - assets=all_assets, - schedules=jobs.schedules, -) diff --git a/local_archives/sat/assets/__init__.py b/local_archives/sat/assets/__init__.py deleted file mode 100644 index eb0e438..0000000 --- a/local_archives/sat/assets/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from sat.assets.eumetsat.iodc import download_eumetsat_iodc_data -from sat.assets.eumetsat.rss import download_eumetsat_rss_data -from sat.assets.eumetsat.zero_deg import download_eumetsat_0_deg_data diff --git a/local_archives/sat/assets/eumetsat/__init__.py b/local_archives/sat/assets/eumetsat/__init__.py deleted file mode 100644 index f5b4ead..0000000 --- a/local_archives/sat/assets/eumetsat/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .common import download_product_range, EumetsatConfig \ No newline at end of file diff --git a/local_archives/sat/assets/eumetsat/common.py b/local_archives/sat/assets/eumetsat/common.py deleted file mode 100644 index e1cfcad..0000000 --- a/local_archives/sat/assets/eumetsat/common.py +++ /dev/null @@ -1,39 +0,0 @@ -"""EO:EUM:DAT:MSG:HRSEVIRI-IODC -""" -import os - -import pandas as pd -from dagster import Config -from satip.eumetsat import DownloadManager - - -class EumetsatConfig(Config): - api_key: str - api_secret: str - data_dir: str - start_date: str - end_date: str - -def download_product_range(api_key: str, api_secret: str, data_dir: str, product_id: str, start_date: pd.Timestamp, end_date: pd.Timestamp): - download_manager = DownloadManager(user_key=api_key, user_secret=api_secret, data_dir=data_dir) - start_str = start_date.strftime("%Y-%m-%d") - end_str = end_date.strftime("%Y-%m-%d") - date_range = pd.date_range(start=start_str, - end=end_str, - freq="30min") - filenames_downloaded = [] - for filename in os.listdir(data_dir): - filenames_downloaded.append(filename.split("/")[-1]) - for date in date_range: - start_date = pd.Timestamp(date) - pd.Timedelta("1min") - end_date = pd.Timestamp(date) + pd.Timedelta("1min") - datasets = download_manager.identify_available_datasets( - start_date=start_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"), - end_date=end_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"), - ) - filtered_datasets = [] - for dataset in datasets: - if dataset["id"] not in filenames_downloaded: - filtered_datasets.append(dataset) - datasets = filtered_datasets - download_manager.download_datasets(datasets, product_id=product_id) diff --git a/local_archives/sat/assets/eumetsat/iodc.py b/local_archives/sat/assets/eumetsat/iodc.py deleted file mode 100644 index 2591814..0000000 --- a/local_archives/sat/assets/eumetsat/iodc.py +++ /dev/null @@ -1,14 +0,0 @@ -import pandas as pd -from dagster import asset # import the `dagster` library - -from . import EumetsatConfig, download_product_range - - -@asset -def download_eumetsat_iodc_data(config: EumetsatConfig) -> None: - download_product_range(api_key=config.api_key, - api_secret=config.api_secret, - data_dir=config.data_dir, - product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", - start_date=pd.Timestamp(config.start_date), - end_date=pd.Timestamp(config.end_date)) diff --git a/local_archives/sat/jobs.py b/local_archives/sat/jobs.py deleted file mode 100644 index 9a89fac..0000000 --- a/local_archives/sat/jobs.py +++ /dev/null @@ -1,103 +0,0 @@ -import datetime as dt -import json -from typing import Any - -import dagster -from sat.assets import ( - download_eumetsat_0_deg_data, - download_eumetsat_iodc_data, - download_eumetsat_rss_data, -) -from sat.assets.eumetsat.common import EumetsatConfig - -jobs: list[dagster.JobDefinition] = [] -schedules: list[dagster.ScheduleDefinition] = [] - -base_path = "/mnt/storage_c/IODC/" - -# --- IODC jobs and schedules ---------------------------------------------- - -@dagster.daily_partitioned_config(start_date=dt.datetime(2017, 1, 1)) -def IODCDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_iodc_data": {"config": config}}} - - -@dagster.job( - config=IODCDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def iodc_daily_archive() -> None: - """Download IODC data for a given day.""" - download_eumetsat_iodc_data() - -jobs.append(iodc_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(iodc_daily_archive, hour_of_day=23)) - - -@dagster.daily_partitioned_config(start_date=dt.datetime(2008, 1, 1)) -def RSSDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_rss_data": {"config": config}}} - - -@dagster.job( - config=RSSDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def rss_daily_archive() -> None: - """Download RSS data for a given day.""" - download_eumetsat_rss_data() - -jobs.append(rss_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(rss_daily_archive, hour_of_day=23)) - -@dagster.daily_partitioned_config(start_date=dt.datetime(2008, 1, 1)) -def ZeroDegDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_0_deg_data": {"config": config}}} - - -@dagster.job( - config=ZeroDegDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def zero_deg_daily_archive() -> None: - """Download RSS data for a given day.""" - download_eumetsat_0_deg_data() - -jobs.append(zero_deg_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(zero_deg_daily_archive, hour_of_day=23))