From 65f7395d4aafdc9a1d090ec0257fffa165c7f7c8 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Fri, 6 Oct 2023 10:52:23 +0100 Subject: [PATCH] Add malta job, fix context manager (#18) --- dags_tests/compile_test.py | 2 +- nwp/assets/ecmwf/mars.py | 13 ++- nwp/jobs.py | 182 +++++++++++++++++++++++-------------- 3 files changed, 121 insertions(+), 76 deletions(-) diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 4c13a69..5aa6f04 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,4 +3,4 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert len(job_names) == 19 + assert len(job_names) == 20 diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 0be1ba3..1992e0e 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,22 +1,25 @@ -import nwp_consumer.cmd.main as consumer import contextlib import os + +import nwp_consumer.cmd.main as consumer from dagster import Config, OpExecutionContext, op -from dagster_docker import execute_docker_container + @contextlib.contextmanager def modify_env(vars: dict[str, str]): """Temporarily modify the environment.""" + oldvars = os.environ.copy() for var in vars: - oldval = os.environ.get(var) newval = vars[var] os.environ[var] = newval - vars[var] = oldval try: yield finally: for var in vars: - os.environ[var] = oldval + if var in oldvars: + os.environ[var] = oldvars[var] + else: + del os.environ[var] class NWPConsumerConfig(Config): """Configuration for the NWP consumer.""" diff --git a/nwp/jobs.py b/nwp/jobs.py index 022fb10..c585889 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,109 +1,151 @@ import datetime as dt import json +from collections.abc import Callable +from typing import Any -from dagster import ( - AssetSelection, - ScheduleDefinition, - build_schedule_from_partitioned_job, - daily_partitioned_config, - define_asset_job, - job, -) +import dagster from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import ( NWPConsumerConfig, nwp_consumer_convert_op, - nwp_consumer_download_op, + nwp_consumer_download_op ) -schedules = [] +jobs: list[dagster.JobDefinition] = [] +schedules: list[dagster.ScheduleDefinition] = [] + +# --- DWD ICON jobs and schedules -------------------------------------- dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict: """Create a config dict for the DWD ICON model.""" - config = IconConfig(model=model, - run=run, - delay=delay, - folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", - zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") - config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, - "zarr_path": config.zarr_path} + config = IconConfig( + model=model, + run=run, + delay=delay, + folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", + zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip" + ) + config_dict = { + "delay": config.delay, + "folder": config.folder, + "model": config.model, + "run": config.run, + "zarr_path": config.zarr_path + } return config_dict -schedules = [] for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: for delay in [0, 1]: - asset_job = define_asset_job( + asset_job = dagster.define_asset_job( name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", - selection=AssetSelection.all(), + selection=dagster.AssetSelection.all(), config={'ops': { "download_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)}, + "process_model_files": { + "config": build_config_on_runtime(model, r, delay)}, + "upload_model_files_to_hf": { + "config": build_config_on_runtime(model, r, delay)}, }} ) match (delay, r): case (0, "00"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) case (0, "06"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) case (0, "12"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) case (0, "18"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) case (1, "00"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) case (1, "06"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) case (1, "12"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) case (1, "18"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) - - -@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1)) -def ecmwf_uk_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict: - """Create a config dict for the nwp-consumer for uk data from ECMWF.""" - config: NWPConsumerConfig = NWPConsumerConfig( - date_from=start.strftime("%Y-%m-%d"), - date_to=start.strftime("%Y-%m-%d"), - source="ecmwf-mars", - env_overrides={"ECMWF_AREA": "uk"}, - zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/zarr', - raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/raw', + schedules.append( + dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) + + +# --- NWP Consumer jobs and schedules -------------------------------------- + +class NWPConsumerDagDefinition: + """A class to define the NWPConsumerDagDefinition.""" + + def __init__( + self, area: str, source: str, storage_path: str | None = None + ) -> "NWPConsumerDagDefinition": + """Create a NWPConsumerDagDefinition.""" + self.area = area + self.source = source + self.storage_path = \ + storage_path or \ + f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{self.area}' + +nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = { + "uk": NWPConsumerDagDefinition(area="uk", source="ecmwf-mars"), + "india": NWPConsumerDagDefinition(area="nw-india", source="ecmwf-mars"), + "malta": NWPConsumerDagDefinition(area="malta", source="ecmwf-mars") +} + + +def gen_partitioned_config_func(dagdef: NWPConsumerDagDefinition) \ + -> Callable[[str], dict[str, Any]]: + """Create a config dict from a partition key.""" + + def partitioned_config_func(partition_key: str) -> dict[str, Any]: + time_window = partitions_def.time_window_for_partition_key(partition_key) + consumer_config = NWPConsumerConfig( + date_from=time_window.start.strftime("%Y-%m-%d"), + date_to=time_window.start.strftime("%Y-%m-%d"), + source=dagdef.source, + env_overrides={"ECMWF_AREA": dagdef.area}, + zarr_dir=f"{dagdef.storage_path}/zarr", + raw_dir=f"{dagdef.storage_path}/raw", + ) + return { + "ops": {"nwp_consumer_download_op": { + "config": json.loads(consumer_config.json()), + }} + } + + return partitioned_config_func + +# Define the jobs and schedules from the above dict +for loc, dagdef in nwp_consumer_jobs.items(): + + partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2020, 1, 1)) + + config = dagster.PartitionedConfig( + partitions_def=partitions_def, + run_config_for_partition_key_fn=gen_partitioned_config_func(dagdef), ) - return {"ops": { - "nwp_consumer_download_op": {"config": json.loads(config.json())}, - }} - -@job(config=ecmwf_uk_daily_partitioned_config, name="ecmwf_daily_local_archive") -def ecmwf_uk_daily_local_archive() -> None: - """Download and convert ECMWF data for the UK.""" - nwp_consumer_convert_op(nwp_consumer_download_op()) - -@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1)) -def ecmwf_india_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict: - """Create a config dict for the nwp-consumer for india data from ECMWF.""" - config: NWPConsumerConfig = NWPConsumerConfig( - date_from=start.strftime("%Y-%m-%d"), - date_to=start.strftime("%Y-%m-%d"), - source="ecmwf-mars", - env_overrides={"ECMWF_AREA": "nw-india"}, - zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/zarr', - raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/raw', + + @dagster.job( + name="ecmwf_daily_local_archive" if loc=="uk" else f"ecmwf_{loc}_daily_archive", + config=config, + tags={"area": loc}, ) - return {"ops": { - "nwp_consumer_download_op": {"config": json.loads(config.json())}, - }} + def ecmwf_daily_partitioned_archive() -> None: + """Download and convert NWP data using the consumer according to input config.""" + nwp_consumer_convert_op(nwp_consumer_download_op()) -@job(config=ecmwf_india_daily_partitioned_config) -def ecmwf_india_daily_local_archive() -> None: - """Download and convert ECMWF data for India.""" - nwp_consumer_convert_op(nwp_consumer_download_op()) + schedule = dagster.build_schedule_from_partitioned_job( + job=ecmwf_daily_partitioned_archive, + hour_of_day=20 + ) -schedules.append(build_schedule_from_partitioned_job(ecmwf_uk_daily_local_archive, hour_of_day=13)) -schedules.append(build_schedule_from_partitioned_job(ecmwf_india_daily_local_archive, hour_of_day=14)) + jobs.append(ecmwf_daily_partitioned_archive) + schedules.append(schedule)