Skip to content

Commit

Permalink
Merge pull request #1166 from metno/mos_cams2_83_cli
Browse files Browse the repository at this point in the history
code to make the cams2_83 mos evaluation work
  • Loading branch information
charlienegri authored May 14, 2024
2 parents 82c70d1 + f4697fc commit 015d473
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 131 deletions.
8 changes: 6 additions & 2 deletions pyaerocom/aeroval/experiment_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _run_single_entry(self, model_name, obs_name, var_list):
)
else:
preprocessed_coldata_dir = ocfg["coldata_dir"]
mask = f"{preprocessed_coldata_dir}/*.nc"
mask = f"{preprocessed_coldata_dir}/{model_name}/*.nc"
files_to_convert = glob.glob(mask)
engine = ColdataToJsonEngine(self.cfg)
engine.run(files_to_convert)
Expand Down Expand Up @@ -122,7 +122,11 @@ def run(self, model_name=None, obs_name=None, var_list=None, update_interface=Tr
if not self.cfg.model_cfg:
logger.info("No model found, will make dummy model data")
self.cfg.webdisp_opts.hide_charts = ["scatterplot"]
self.cfg.webdisp_opts.hide_pages = ["maps.php", "intercomp.php", "overall.php"]
self.cfg.webdisp_opts.hide_pages = [
"maps.php",
"intercomp.php",
"overall.php",
]
model_id = make_dummy_model(obs_list, self.cfg)
self.cfg.processing_opts.obs_only = True
use_dummy_model = True
Expand Down
134 changes: 12 additions & 122 deletions pyaerocom/scripts/cams2_83/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,25 @@

import logging
import multiprocessing as mp
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from copy import deepcopy
from datetime import date, datetime, timedelta
from multiprocessing import cpu_count
from pathlib import Path
from pprint import pformat
from typing import List, Optional

import typer

from pyaerocom import change_verbosity, const
from pyaerocom.aeroval import EvalSetup, ExperimentProcessor
from pyaerocom.io import ReadUngridded
from pyaerocom.io.cachehandler_ungridded import list_cache_files
from pyaerocom.io.cams2_83.models import ModelName, RunType
from pyaerocom.io.cams2_83.read_obs import DATA_FOLDER_PATH as DEFAULT_OBS_PATH
from pyaerocom.io.cams2_83.read_obs import obs_paths
from pyaerocom.io.cams2_83.reader import DATA_FOLDER_PATH as DEFAULT_MODEL_PATH
from pyaerocom.scripts.cams2_83.config import CFG, species_list
from pyaerocom.scripts.cams2_83.evaluation import EvalType, date_range
from pyaerocom.scripts.cams2_83.processer import CAMS2_83_Processer

"""
TODO:
- Add option for species
- Add option for periodes [Done]
- Add option for running only som observations/models/species
- Add options with defaults for the different folders (data/coldata/cache)
"""

from pyaerocom.scripts.cams2_83.config import CFG
from pyaerocom.scripts.cams2_83.evaluation import EvalType, date_range, runner, runnermedianscores

app = typer.Typer(add_completion=False, no_args_is_help=True)
logger = logging.getLogger(__name__)


def clear_cache():
"""Delete cached data objects"""
for path in list_cache_files():
path.unlink()


def make_model_entry(
start_date: datetime,
end_date: datetime,
Expand Down Expand Up @@ -130,100 +107,6 @@ def make_config(
return cfg


def read_observations(specie: str, *, files: List, cache: str | Path | None) -> None:
logger.info(f"Running {specie}")

if cache is not None:
const.CACHEDIR = str(cache)

reader = ReadUngridded()

reader.read(
data_ids="CAMS2_83.NRT",
vars_to_retrieve=specie,
files=files,
force_caching=True,
)

logger.info(f"Finished {specie}")


def run_forecast(specie: str, *, stp: EvalSetup, analysis: bool) -> None:
ana_cams2_83 = CAMS2_83_Processer(stp)
ana_cams2_83.run(analysis=analysis, var_list=specie)


def runner(
cfg: dict,
cache: str | Path | None,
dry_run: bool = False,
pool: int = 1,
):
logger.info(f"Running the evaluation for the config\n{pformat(cfg)}")
if dry_run:
return

if cache is not None:
const.CACHEDIR = str(cache)

stp = EvalSetup(**cfg)

logger.info(f"Clearing cache at {const.CACHEDIR}")
clear_cache()

if pool > 1:
logger.info(f"Running observation reading with pool {pool}")
files = cfg["obs_cfg"]["EEA"]["read_opts_ungridded"]["files"]
with ProcessPoolExecutor(max_workers=pool) as executor:
futures = [
executor.submit(read_observations, specie, files=files, cache=cache)
for specie in species_list
]
for future in as_completed(futures):
future.result()

logger.info("Running Statistics")
ExperimentProcessor(stp).run()
print("Done Running Statistics")


def runnermedianscores(
cfg: dict,
cache: str | Path | None,
*,
analysis: bool = False,
dry_run: bool = False,
pool: int = 1,
):
if dry_run:
return

if cache is not None:
const.CACHEDIR = str(cache)

stp = EvalSetup(**cfg)

start = time.time()

logger.info(
"Running CAMS2_83 Specific Statistics, cache is not cleared, colocated data is assumed in place, regular statistics are assumed to have been run"
)
if pool > 1:
logger.info(f"Making median scores plot with pool {pool} and analysis {analysis}")
with ProcessPoolExecutor(max_workers=pool) as executor:
futures = [
executor.submit(run_forecast, specie, stp=stp, analysis=analysis)
for specie in species_list
]
for future in as_completed(futures):
future.result()
else:
logger.info(f"Making median scores plot with pool {pool} and analysis {analysis}")
CAMS2_83_Processer(stp).run(analysis=analysis)

print(f"Long run: {time.time() - start} sec")


@app.command()
def main(
run_type: RunType = typer.Argument(...),
Expand Down Expand Up @@ -286,17 +169,20 @@ def main(
help="Will only make and print the config without running the evaluation",
),
pool: int = typer.Option(
1, "--pool", "-p", min=1, max=cpu_count(), help="CPUs for reading OBS"
1,
"--pool",
"-p",
min=1,
help="CPUs for reading OBS and running median scores",
),
):
if dry_run:
change_verbosity(logging.INFO)

if pool > mp.cpu_count():
logger.warning(
f"The given pool {pool} is larger than the maximum CPU count {mp.cpu_count()}. Using that instead"
f"The given pool {pool} is larger than the maximum CPU count {mp.cpu_count()}."
)
pool = mp.cpu_count()

cfg = make_config(
start_date,
Expand Down Expand Up @@ -338,3 +224,7 @@ def main(
else:
logger.info("Standard run")
runner(cfg, cache, dry_run=dry_run, pool=pool)


if __name__ == "__main__":
main()
128 changes: 128 additions & 0 deletions pyaerocom/scripts/cams2_83/cli_mos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

import logging
import multiprocessing as mp
from copy import deepcopy
from datetime import date, datetime
from pathlib import Path
from typing import Optional

import typer

from pyaerocom import const
from pyaerocom.scripts.cams2_83.config import CFG
from pyaerocom.scripts.cams2_83.evaluation import EvalType, runnermedianscores, runnermos

app = typer.Typer(add_completion=False, no_args_is_help=True)
logger = logging.getLogger(__name__)


def make_config_mos(
start_date: date,
end_date: date,
data_path: Path,
coldata_path: Path,
eval_type: EvalType,
id: str,
name: str,
description: str,
) -> dict:
logger.info("Making the configuration")

models = ["ENS", "MOS"]

cfg = deepcopy(CFG)
cfg.update(
model_cfg={
f"{model}": dict(
model_id=f"CAMS2-83.{model}.day0.FC",
model_kwargs=dict(
daterange=[f"{start_date}", f"{end_date}"],
),
)
for model in models
},
periods=eval_type.periods(start_date, end_date),
json_basedir=str(data_path),
coldata_basedir=str(coldata_path),
)

if eval_type is not None:
eval_type.check_dates(start_date, end_date)
cfg.update(eval_type.freqs_config())

cfg.update(only_json=True)

cfg.update(exp_id=id, exp_name=name, exp_descr=description)

cfg.update(use_fairmode=True)

return cfg


@app.command()
def main(
eval_type: EvalType = typer.Argument(...),
start_date: datetime = typer.Argument(
..., formats=["%Y-%m-%d", "%Y%m%d"], help="evaluation start date"
),
end_date: datetime = typer.Argument(
..., formats=["%Y-%m-%d", "%Y%m%d"], help="evaluation end date"
),
data_path: Path = typer.Option(
Path("../../data").resolve(),
exists=True,
readable=True,
writable=True,
help="where results are stored",
),
coldata_path: Path = typer.Option(
Path("../../coldata").resolve(),
exists=True,
readable=True,
writable=True,
help="where colocated data are stored, this is useless here but this path needs to exist",
),
cache: Optional[Path] = typer.Option(
None,
help="Optional path to cache. If nothing is given, the default pyaerocom cache is used",
),
id: str = typer.Option(CFG["exp_id"], help="experiment ID"),
name: str = typer.Option(CFG["exp_name"], help="experiment name"),
description: str = typer.Option(CFG["exp_descr"], help="experiment description"),
pool: int = typer.Option(
1,
"--pool",
"-p",
min=1,
help="CPUs for running the median scores",
),
):
if pool > mp.cpu_count():
logger.warning(
f"The given pool {pool} is larger than the maximum CPU count {mp.cpu_count()}. Using that instead"
)
pool = 1

cfg = make_config_mos(
start_date,
end_date,
data_path,
coldata_path,
eval_type,
id,
name,
description,
)

# we do not want the cache produced in previous runs to be silently cleared
const.RM_CACHE_OUTDATED = False

logger.info("Standard run")
runnermos(cfg, cache, dry_run=False)
logger.info("Special run for median scores only")
runnermedianscores(cfg, cache, dry_run=False, analysis=False, pool=pool)


if __name__ == "__main__":
main()
18 changes: 15 additions & 3 deletions pyaerocom/scripts/cams2_83/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,20 @@ def process_coldata(self, coldata: list[ColocatedData]) -> None:
else:
obs_var = model_var = "UNDEFINED"

model = ModelName[coldata[0].model_name.split("-")[2]]
# for the MOS/ENS evaluation experiment the models are just strings
# we do not want them added to the ModelName class
# so we need a bunch of ugly special cases here
modelname = coldata[0].model_name.split("-")[2]
if modelname == "ENS" or modelname == "MOS":
model = modelname
else:
model = ModelName[modelname]
vert_code = coldata[0].get_meta_item("vert_code")
obs_name = coldata[0].obs_name
mcfg = self.cfg.model_cfg.get_entry(model.name)
if modelname == "ENS" or modelname == "MOS": # MOS/ENS evaluation special case
mcfg = self.cfg.model_cfg.get_entry(modelname)
else:
mcfg = self.cfg.model_cfg.get_entry(model.name)
var_name_web = mcfg.get_varname_web(model_var, obs_var)
seasons = self.cfg.time_cfg.get_seasons()

Expand Down Expand Up @@ -120,7 +130,9 @@ def process_coldata(self, coldata: list[ColocatedData]) -> None:
obs_name,
var_name_web,
vert_code,
model.name,
(
modelname if (modelname == "ENS" or modelname == "MOS") else model.name
), # MOS/ENS evaluation special case
model_var,
)

Expand Down
Loading

0 comments on commit 015d473

Please sign in to comment.