Skip to content

Commit

Permalink
Merge pull request #82 from uhh-cms/feature/versions
Browse files Browse the repository at this point in the history
Feature/versions
  • Loading branch information
mafrahm authored Aug 12, 2024
2 parents 5e8ad03 + 2ead6c8 commit 18324b3
Show file tree
Hide file tree
Showing 51 changed files with 1,386 additions and 1,179 deletions.
245 changes: 168 additions & 77 deletions hbw/analysis/create_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
Configuration of the HH -> bbWW analysis.
"""

from __future__ import annotations

import os
import importlib

import law
import order as od

from hbw.util import timeit_multiple

thisdir = os.path.dirname(os.path.abspath(__file__))

logger = law.logger.get_logger(__name__)


@timeit_multiple
def create_hbw_analysis(
name,
id,
Expand Down Expand Up @@ -46,7 +51,7 @@ def create_hbw_analysis(

# cmssw sandboxes that should be bundled for remote jobs in case they are needed
analysis_inst.set_aux("cmssw_sandboxes", [
"$CF_BASE/sandboxes/cmssw_default.sh",
# "$CF_BASE/sandboxes/cmssw_default.sh",
])

# clear the list when cmssw bundling is disabled
Expand All @@ -58,109 +63,139 @@ def create_hbw_analysis(
analysis_inst.set_aux("config_groups", {})

#
# import campaigns and load configs
# define configs
#

from hbw.config.config_run2 import add_config

import cmsdb.campaigns.run2_2017_nano_v9
import cmsdb.campaigns.run3_2022_preEE_nano_v12
import cmsdb.campaigns.run3_2022_postEE_nano_v12

campaign_run2_2017_nano_v9 = cmsdb.campaigns.run2_2017_nano_v9.campaign_run2_2017_nano_v9
campaign_run2_2017_nano_v9.x.run = 2
campaign_run2_2017_nano_v9.x.postfix = ""

campaign_run3_2022_preEE_nano_v12 = cmsdb.campaigns.run3_2022_preEE_nano_v12.campaign_run3_2022_preEE_nano_v12
campaign_run3_2022_preEE_nano_v12.x.EE = "pre"
campaign_run3_2022_preEE_nano_v12.x.run = 3
campaign_run3_2022_preEE_nano_v12.x.postfix = ""

campaign_run3_2022_postEE_nano_v12 = cmsdb.campaigns.run3_2022_postEE_nano_v12.campaign_run3_2022_postEE_nano_v12
campaign_run3_2022_postEE_nano_v12.x.EE = "post"
campaign_run3_2022_postEE_nano_v12.x.run = 3
campaign_run3_2022_postEE_nano_v12.x.postfix = "EE"
def add_lazy_config(
campaigns: dict[str, str],
config_name: str,
config_id: int,
**kwargs,
):
"""
Helper to add configs to the analysis lazily.
Calling *add_lazy_config* will add two configs to the analysis:
- one with the given *config_name* and *config_id*,
- one with the *config_id* +1 and *config_name* with the character "c" replaced by "l",
which is used to limit the number of dataset files to 2 per dataset.
"""
def create_factory(
config_id: int,
final_config_name: str,
limit_dataset_files: int | None = None,
):
@timeit_multiple
def analysis_factory(configs: od.UniqueObjectIndex):
hbw_campaign_inst = None

for mod, campaign in campaigns.items():
# import the campaign
mod = importlib.import_module(mod)
if not hbw_campaign_inst:
# copy the main campaign
hbw_campaign_inst = getattr(mod, campaign).copy()
else:
# add datasets to the main campaign
campaign_inst = getattr(mod, campaign).copy()
for dataset in list(campaign_inst.datasets):
dataset.x.campaign = campaign
if not hbw_campaign_inst.has_dataset(dataset.name):
hbw_campaign_inst.add_dataset(dataset)

return add_config(
analysis_inst,
hbw_campaign_inst,
config_name=final_config_name,
config_id=config_id,
limit_dataset_files=limit_dataset_files,
**kwargs,
)
return analysis_factory

analysis_inst.configs.add_lazy_factory(
config_name,
create_factory(config_id, config_name),
)
limited_config_name = config_name.replace("c", "l")
analysis_inst.configs.add_lazy_factory(
limited_config_name,
create_factory(config_id + 1, limited_config_name, 2),
)

# 2017
c17 = add_config( # noqa
analysis_inst,
campaign_run2_2017_nano_v9.copy(),
config_name="c17",
config_id=1700,
add_dataset_extensions=False,
)
l17 = add_config( # noqa
analysis_inst,
campaign_run2_2017_nano_v9.copy(),
config_name="l17",
config_id=1701,
limit_dataset_files=2,
add_dataset_extensions=False,
add_lazy_config(
{
"cmsdb.campaigns.run2_2017_nano_v9": "campaign_run2_2017_nano_v9",
},
"c17",
1700,
)

# 2022 preEE
c22pre = add_config( # noqa
analysis_inst,
campaign_run3_2022_preEE_nano_v12.copy(),
config_name="c22pre",
config_id=2200,
add_dataset_extensions=False,
)
l22pre = add_config( # noqa
analysis_inst,
campaign_run3_2022_preEE_nano_v12.copy(),
config_name="l22pre",
config_id=2201,
limit_dataset_files=2,
add_dataset_extensions=False,
add_lazy_config(
{
"cmsdb.campaigns.run3_2022_preEE_nano_v12": "campaign_run3_2022_preEE_nano_v12",
"cmsdb.campaigns.run3_2022_preEE_nano_v13": "campaign_run3_2022_preEE_nano_v13",
},
"c22pre",
2200,
)

# 2022 postEE
c22post = add_config( # noqa
analysis_inst,
campaign_run3_2022_postEE_nano_v12.copy(),
config_name="c22post",
config_id=2210,
add_dataset_extensions=False,
)
l22post = add_config( # noqa
analysis_inst,
campaign_run3_2022_postEE_nano_v12.copy(),
config_name="l22post",
config_id=2211,
limit_dataset_files=2,
add_dataset_extensions=False,
add_lazy_config(
{
"cmsdb.campaigns.run3_2022_postEE_nano_v12": "campaign_run3_2022_postEE_nano_v12",
"cmsdb.campaigns.run3_2022_postEE_nano_v13": "campaign_run3_2022_postEE_nano_v13",
},
"c22post",
2210,
)

#
# modify store_parts
#

def merged_analysis_parts(task, store_parts):
software_tasks = ("cf.BundleBashSandbox", "cf.BundleCMSSWSandbox", "cf.BundleRepo", "cf.BundleSoftware")
if task.task_family in software_tasks:
store_parts["analysis"] = "software_bundles"
software_tasks = ("cf.BundleBashSandbox", "cf.BundleCMSSWSandbox", "cf.BundleSoftware")
shareable_analysis_tasks = ("cf.CalibrateEvents", "cf.GetDatasetLFNs")
limited_config_shared_tasks = ("cf.CalibrateEvents", "cf.GetDatasetLFNs", "cf.SelectEvents", "cf.ReduceEvents")
skip_new_version_schema = ("cf.CalibrateEvents", "cf.GetDatasetLFNs")
known_parts = (
# from cf
"analysis", "task_family", "config", "configs", "dataset", "shift", "version",
"calibrator", "calibrators", "selector", "producer", "producers",
"ml_model", "ml_data", "ml_models",
"weightprod", "inf_model",
"plot", "shift_sources", "shifts", "datasets",
# MLTraining
"calib", "sel", "prod",
# from hbw
"processes",
)

def software_tasks_parts(task, store_parts):
if task.task_family not in software_tasks:
logger.warning(f"task {task.task_family} is not a software task")
return store_parts

shareable_tasks = ("cf.CalibrateEvents", "cf.GetDatasetLFNs")
if task.task_family not in shareable_tasks:
logger.warning(f"task {task.task_family} is not shareable")
if "analysis" in store_parts:
store_parts["analysis"] = "software_bundles"
return store_parts

def merged_analysis_parts(task, store_parts):
if task.task_family not in shareable_analysis_tasks:
logger.warning(f"task {task.task_family} is not shareable over multiple analyses")
return store_parts

if "analysis" in store_parts:
# always use the same analysis
store_parts["analysis"] = "hbw_merged"

if "config" in store_parts:
# share outputs between limited and non-limited config
store_parts["config"] = store_parts["config"].replace("l", "c")

return store_parts

def limited_config_shared_parts(task, store_parts):
shareable_tasks = ("cf.CalibrateEvents", "cf.SelectEvents", "cf.ReduceEvents")

if task.task_family not in shareable_tasks:
if task.task_family not in limited_config_shared_tasks:
logger.warning(f"task {task.task_family} should not be shared between limited and non-limited config")
return store_parts

Expand All @@ -170,9 +205,65 @@ def limited_config_shared_parts(task, store_parts):

return store_parts

def reorganize_parts(task, store_parts):
# check for unknown parts
unknown_parts = set(store_parts.keys()) - set(known_parts)
if unknown_parts:
logger.warning(f"task {task.task_family} has unknown parts: {unknown_parts}")

# add placeholder part
store_parts.insert_before("analysis", "PLACEHOLDER", "DUMMY")

# define order of parts as we want them
parts_order_start = [
"analysis",
"calibrator", "calibrators", "calib",
"selector", "sel",
"producer", "producers", "prod",
"ml_data", "ml_model", "ml_models",
"weightprod", "inf_model",
"task_family",
"config", "dataset", "shift",
]
parts_order_end = ["version"]

# move parts to the desired position
for part in parts_order_start:
if part in store_parts:
store_parts.insert_before("PLACEHOLDER", part, store_parts.pop(part))
store_parts.pop("PLACEHOLDER")
for part in parts_order_end:
if part in store_parts:
store_parts.move_to_end(part)

task_version_from_cspmw = (
"cf.CalibrateEvents",
"cf.SelectEvents", "cf.MergeSelectionStats", "cf.MergeSelectionMasks",
"cf.ReduceEvents", "cf.MergeReducedEvents", "cf.MergeReductionStats",
"cf.ProduceColumns",
"cf.PrepareMLEvents", "cf.MergeMLEvents", "cf.MergeMLStats", "cf.MLTraining", "cf.MLEvaluation",
"hbw.MLPreTraining", "hbw.MLEvaluationSingleFold", "hbw.PlotMLResultsSingleFold",
)
if task.task_family in task_version_from_cspmw:
# skip version when it is already encoded from CSPMW TaskArrayFunction
store_parts.pop("version")

return store_parts

def hbw_parts(task, store_parts):
name = task.task_family
if name in software_tasks:
return software_tasks_parts(task, store_parts)
if name in shareable_analysis_tasks:
store_parts = merged_analysis_parts(task, store_parts)
if name in limited_config_shared_tasks:
store_parts = limited_config_shared_parts(task, store_parts)
if name not in skip_new_version_schema:
store_parts = reorganize_parts(task, store_parts)
return store_parts

analysis_inst.x.store_parts_modifiers = {
"merged_analysis": merged_analysis_parts,
"limited_config_shared": limited_config_shared_parts,
"hbw_parts": hbw_parts,
}

return analysis_inst
Loading

0 comments on commit 18324b3

Please sign in to comment.