From 6ac6bc4dcc918e0aa2c44d78c7a3f7e017b52593 Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 23 Nov 2023 15:35:49 +0000 Subject: [PATCH] Add CEDA download job, backdate partitions to 2017 --- nwp/jobs.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index 515a007..90b3dba 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -5,7 +5,7 @@ import dagster -from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day, fetch_cams_eu_forecast_for_day +from nwp.assets.cams import CAMSConfig, fetch_cams_eu_forecast_for_day, fetch_cams_forecast_for_day from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import ( NWPConsumerConfig, @@ -16,6 +16,9 @@ jobs: list[dagster.JobDefinition] = [] schedules: list[dagster.ScheduleDefinition] = [] +# TODO: I would like to be able to use something like this at some point: +NWP_FOLDER = "/mnt/storage_b/archives/NWP" + # --- DWD ICON jobs and schedules -------------------------------------- dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" @@ -104,6 +107,7 @@ def cams_daily_archive() -> None: @dagster.daily_partitioned_config(start_date=dt.datetime(2020, 10, 27)) def CAMSEUDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: + """Create a config dict for the CAMS EU model.""" config = CAMSConfig( date=start.strftime("%Y-%m-%d"), raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS_EU/raw", @@ -128,29 +132,40 @@ class NWPConsumerDagDefinition: """A class to define the NWPConsumerDagDefinition.""" def __init__( - self, source: str, storage_path: str | None = None, env_overrides: dict[str, str] | None = None + self, + source: str, + folder: str, + storage_path: str | None = None, + env_overrides: dict[str, str] | None = None ) -> "NWPConsumerDagDefinition": """Create a NWPConsumerDagDefinition.""" self.source = source - area = env_overrides.get("ECMWF_AREA", "no-area") self.storage_path = \ storage_path or \ - f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{area}' + f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/{folder}' self.env_overrides = env_overrides +# Create a dict of job names to their descriptions nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = { - "uk": NWPConsumerDagDefinition( + "ecmwf": NWPConsumerDagDefinition( source="ecmwf-mars", + folder="ECMWF/uk", env_overrides={"ECMWF_AREA": "uk"} ), - "india": NWPConsumerDagDefinition( + "ecmwf_india": NWPConsumerDagDefinition( source="ecmwf-mars", + folder="ECMWF/india", env_overrides={"ECMWF_AREA": "nw-india", "ECMWF_HOURS": "84"} ), - "malta": NWPConsumerDagDefinition( + "ecmwf_malta": NWPConsumerDagDefinition( source="ecmwf-mars", + folder="ECMWF/malta", env_overrides={"ECMWF_AREA": "malta"} ), + "ceda_uk": NWPConsumerDagDefinition( + source="ceda", + folder="CEDA/uk", + ), } @@ -177,9 +192,9 @@ def partitioned_config_func(partition_key: str) -> dict[str, Any]: return partitioned_config_func # Define the jobs and schedules from the above dict -for loc, dagdef in nwp_consumer_jobs.items(): +for dagname, dagdef in nwp_consumer_jobs.items(): - partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2020, 1, 1)) + partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2017, 1, 1)) config = dagster.PartitionedConfig( partitions_def=partitions_def, @@ -187,9 +202,9 @@ def partitioned_config_func(partition_key: str) -> dict[str, Any]: ) @dagster.job( - name="ecmwf_daily_local_archive" if loc=="uk" else f"ecmwf_{loc}_daily_archive", + name=f"{dagname}_daily_local_archive", config=config, - tags={"source": "ecmwf", "area": loc}, + tags={"source": dagdef.source}, ) def ecmwf_daily_partitioned_archive() -> None: """Download and convert NWP data using the consumer according to input config."""