Skip to content

Commit

Permalink
Add CEDA download job, backdate partitions to 2017
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 23, 2023
1 parent 9d9fb58 commit 6ac6bc4
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import dagster

from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day, fetch_cams_eu_forecast_for_day
from nwp.assets.cams import CAMSConfig, fetch_cams_eu_forecast_for_day, fetch_cams_forecast_for_day
from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import (
NWPConsumerConfig,
Expand All @@ -16,6 +16,9 @@
jobs: list[dagster.JobDefinition] = []
schedules: list[dagster.ScheduleDefinition] = []

# TODO: I would like to be able to use something like this at some point:
NWP_FOLDER = "/mnt/storage_b/archives/NWP"

# --- DWD ICON jobs and schedules --------------------------------------

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"
Expand Down Expand Up @@ -104,6 +107,7 @@ def cams_daily_archive() -> None:

@dagster.daily_partitioned_config(start_date=dt.datetime(2020, 10, 27))
def CAMSEUDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]:
"""Create a config dict for the CAMS EU model."""
config = CAMSConfig(
date=start.strftime("%Y-%m-%d"),
raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS_EU/raw",
Expand All @@ -128,29 +132,40 @@ class NWPConsumerDagDefinition:
"""A class to define the NWPConsumerDagDefinition."""

def __init__(
self, source: str, storage_path: str | None = None, env_overrides: dict[str, str] | None = None
self,
source: str,
folder: str,
storage_path: str | None = None,
env_overrides: dict[str, str] | None = None
) -> "NWPConsumerDagDefinition":
"""Create a NWPConsumerDagDefinition."""
self.source = source
area = env_overrides.get("ECMWF_AREA", "no-area")
self.storage_path = \
storage_path or \
f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{area}'
f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/{folder}'
self.env_overrides = env_overrides

# Create a dict of job names to their descriptions
nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = {
"uk": NWPConsumerDagDefinition(
"ecmwf": NWPConsumerDagDefinition(
source="ecmwf-mars",
folder="ECMWF/uk",
env_overrides={"ECMWF_AREA": "uk"}
),
"india": NWPConsumerDagDefinition(
"ecmwf_india": NWPConsumerDagDefinition(
source="ecmwf-mars",
folder="ECMWF/india",
env_overrides={"ECMWF_AREA": "nw-india", "ECMWF_HOURS": "84"}
),
"malta": NWPConsumerDagDefinition(
"ecmwf_malta": NWPConsumerDagDefinition(
source="ecmwf-mars",
folder="ECMWF/malta",
env_overrides={"ECMWF_AREA": "malta"}
),
"ceda_uk": NWPConsumerDagDefinition(
source="ceda",
folder="CEDA/uk",
),
}


Expand All @@ -177,19 +192,19 @@ def partitioned_config_func(partition_key: str) -> dict[str, Any]:
return partitioned_config_func

# Define the jobs and schedules from the above dict
for loc, dagdef in nwp_consumer_jobs.items():
for dagname, dagdef in nwp_consumer_jobs.items():

partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2020, 1, 1))
partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2017, 1, 1))

config = dagster.PartitionedConfig(
partitions_def=partitions_def,
run_config_for_partition_key_fn=gen_partitioned_config_func(dagdef),
)

@dagster.job(
name="ecmwf_daily_local_archive" if loc=="uk" else f"ecmwf_{loc}_daily_archive",
name=f"{dagname}_daily_local_archive",
config=config,
tags={"source": "ecmwf", "area": loc},
tags={"source": dagdef.source},
)
def ecmwf_daily_partitioned_archive() -> None:
"""Download and convert NWP data using the consumer according to input config."""
Expand Down

0 comments on commit 6ac6bc4

Please sign in to comment.