diff --git a/lstmcpipe/config/paths_config.py b/lstmcpipe/config/paths_config.py index 9e0d521c..86c2a93e 100644 --- a/lstmcpipe/config/paths_config.py +++ b/lstmcpipe/config/paths_config.py @@ -384,7 +384,7 @@ def dl1ab(self): class PathConfigAllSkyBase(PathConfig): """ - Standard paths configuration for a prod5_trans_80 MC production + Standard paths configuration for a LSTProd2 MC production dataset_type: 'Training' or 'Testing' """ @@ -392,7 +392,7 @@ def __init__(self, prod_id, dec): super().__init__(prod_id) self.prod_id = prod_id self.dec = dec - self.base_dir = "/fefs/aswg/data/mc/{data_level}/AllSky/{prod_id}/{dataset_type}/{dec}/{particle}/{pointing}/" + self.base_dir = "/fefs/aswg/data/mc/{data_level}/AllSky/{prod_id}/{dataset_type}/{particle}/{dec}/{pointing}/" self.paths = {} self.stages = [] @@ -623,7 +623,7 @@ def r0_to_dl1(self): return paths def training_merged_dl1(self, particle): - return os.path.join(self.dl1_dir(particle, ''), f'dl1_{self.prod_id}_{self.dec}_{particle}_merged.h5') + return os.path.join(self.dl1_dir(particle, ''), f'dl1_{self.prod_id}_train_{self.dec}_{particle}_merged.h5') @property def merge_dl1(self): @@ -637,7 +637,7 @@ def merge_dl1(self): 'input': dl1, 'output': merged_dl1, 'options': '--pattern */*.h5 --no-image', - 'extra_slurm_options': {'partition': 'long'}, + 'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'}, } ) return paths @@ -651,23 +651,65 @@ def train_pipe(self): 'proton': self.training_merged_dl1('Protons'), }, 'output': self.models_dir(), - 'extra_slurm_options': {'partition': 'xxl', 'mem': '160G', 'cpus-per-task': 16} + 'extra_slurm_options': {'partition': 'xxl', 'mem': '160G', 'cpus-per-task': 16, 'time': '03-00:00:00'} if self.dec == _crab_dec - else {'partition': 'xxl', 'mem': '100G', 'cpus-per-task': 16}, + else {'partition': 'xxl', 'mem': '100G', 'cpus-per-task': 16, 'time': '03-00:00:00'}, } ] return paths + +class PathConfigAllSkyTrainingWithSplit(PathConfigAllSkyTraining): + def __init__(self, prod_id, dec): + super().__init__(prod_id, dec) + self.stages.insert(1, 'train_test_split') + + def dl1_diffuse_test_dir(self, pointing): + return self.dl1_dir('GammaDiffuse', pointing).replace('TrainingDataset', 'TestingDataset') + '/test' + + def dl1_diffuse_train_dir(self, pointing): + return self.dl1_dir('GammaDiffuse', pointing) + '/train' + + @property + def train_test_split(self): + paths = [] + for pointing in self.pointing_dirs('GammaDiffuse'): + dl1 = self.dl1_dir('GammaDiffuse', pointing) + train = self.dl1_diffuse_train_dir(pointing) + test = self.dl1_diffuse_test_dir(pointing) + paths.append({'input': dl1, 'output': {'train': train, 'test': test}, 'options': {'test_size': 0.5}}) + return paths + + @property + def merge_dl1(self): + # for the training particles, all the nodes get merged + paths = [] + for particle in self.training_particles: + dl1 = self.dl1_dir(particle, '') + merged_dl1 = self.training_merged_dl1(particle) + pattern = '*/*/*.h5' if particle == 'GammaDiffuse' else '*/*.h5' # this is needed because search is not recursive in lstchain. can be changed after https://github.com/cta-observatory/cta-lstchain/pull/1286 + paths.append( + { + 'input': dl1, + 'output': merged_dl1, + 'options': f'--pattern {pattern} --no-image', + 'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'}, + } + ) + return paths + + class PathConfigAllSkyTesting(PathConfigAllSkyBase): def __init__(self, prod_id, dec): super().__init__(prod_id, dec) self.testing_dir = "/fefs/aswg/data/mc/DL0/LSTProd2/TestDataset/sim_telarray/{pointing}/output_v1.4/" self.dataset_type = 'TestingDataset' + self.particle = 'Gamma' self.stages = ['r0_to_dl1', 'merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs'] def pointing_dirs(self): - return self.pointings['dirname'] + return self.pointings[f'dirname_{self.particle}'] def r0_dir(self, pointing): return self.testing_dir.format(pointing=pointing) @@ -698,7 +740,7 @@ def load_pointings(self): alt, az = (90.0 - float(pt.groups()[0])) * u.deg, (float(pt.groups()[1])) * u.deg data.append([Angle(alt).wrap_at('180d'), Angle(az).wrap_at('360d'), d]) reshaped_data = [[dd[0] for dd in data], [dd[1] for dd in data], [dd[2] for dd in data]] - self._testing_pointings = QTable(data=reshaped_data, names=['alt', 'az', 'dirname']) + self._testing_pointings = QTable(data=reshaped_data, names=['alt', 'az', f'dirname_{self.particle}']) @property def pointings(self): @@ -745,9 +787,9 @@ def plot_pointings(self, ax=None, projection='polar', add_grid3d=False, **kwargs ) return ax - def dl1_dir(self, pointing): + def dl1_dir(self, pointing, dec=''): # no declination for DL1 for TestingDataset - return super().dl1_dir(particle='', pointing=pointing, dataset_type=self.dataset_type, dec='') + return super().dl1_dir(particle=self.particle, pointing=pointing, dataset_type=self.dataset_type, dec=dec) def dl2_dir(self, pointing): return self._data_level_dir( @@ -779,7 +821,8 @@ def r0_to_dl1(self): return paths def testing_merged_dl1(self, pointing): - return os.path.join(self.dl1_dir(''), f'dl1_{self.prod_id}_{pointing}_merged.h5') + particle = self.particle + return os.path.join(self.dl1_dir(''), f'dl1_{self.prod_id}_{particle}_test_{pointing}_merged.h5') @property def merge_dl1(self): @@ -811,6 +854,10 @@ def dl2_output_file(self, pointing): filename = os.path.basename(self.testing_merged_dl1(pointing).replace('dl1_', 'dl2_')) return os.path.join(self.dl2_dir(pointing), filename) + def irf_output_file(self, pointing): + filename = os.path.join(self.irf_dir(pointing), f'irf_{self.prod_id}_{self.particle}_{self.dec}_{pointing}.fits.gz') + return os.path.join(self.irf_dir(pointing), filename) + @property def dl2_to_irfs(self): paths = [] @@ -822,15 +869,74 @@ def dl2_to_irfs(self): 'proton_file': None, 'electron_file': None, }, - 'output': os.path.join(self.irf_dir(pointing), f'irf_{self.prod_id}_{pointing}.fits.gz'), - 'options': '--point-like --gh-efficiency 0.7 --theta-containment 0.7 --energy-dependent-gh --energy-dependent-theta ', + 'output': self.irf_output_file(pointing), + 'options': '--gh-efficiency 0.7 --theta-containment 0.7 --energy-dependent-gh --energy-dependent-theta ', 'extra_slurm_options': {'mem': '6GB'}, } + if self.particle == 'Gamma': + pp['options'] += ' --point-like' paths.append(pp) return paths +class PathConfigAllSkyTestingGammaDiffuse(PathConfigAllSkyTesting): + def __init__(self, prod_id, dec): + """ + This config must be used after a PathConfigAllSkyTrainingWithSplit has been generated and run. + It uses the test dataset of GammaDiffuse created by the train_test_split stage of PathConfigAllSkyTrainingWithSplit, + merges the nodes and runs the dl1_to_dl2 and dl2_to_irfs stages. + """ + super().__init__(prod_id, dec) + self.stages = ['merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs'] + self.train_config = PathConfigAllSkyTrainingWithSplit(prod_id, dec) + # self.pointings = self.train_config.pointings + self.particle = 'GammaDiffuse' + + def testing_merged_dl1(self, pointing, dec): + particle = self.particle + return os.path.join(self.dl1_dir('', dec=dec), f'dl1_{self.prod_id}_{particle}_test_{dec}_{pointing}_merged.h5') + + def load_pointings(self): + self.train_config.load_pointings() + self._testing_pointings = self.train_config._training_pointings + + + @property + def merge_dl1(self): + paths = [] + for pointing in self.train_config.pointing_dirs(self.particle): + dl1 = self.dl1_dir(pointing, dec=self.dec) + merged_dl1 = self.testing_merged_dl1(pointing, dec=self.dec) + paths.append( + { + 'input': dl1, + 'output': merged_dl1, + 'options': '--pattern */*.h5 --no-image', + 'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'}, + } + ) + return paths + + @property + def dl1_to_dl2(self): + paths = [] + for pointing in self.pointing_dirs(): + paths.append( + { + 'input': self.testing_merged_dl1(pointing, dec=self.dec), + 'path_model': self.models_dir(), + 'output': self.dl2_dir(pointing), + 'extra_slurm_options': {'mem': '80GB' if self.dec == _crab_dec else '60GB'}, + } + ) + return paths + + def dl2_output_file(self, pointing): + filename = os.path.basename(self.testing_merged_dl1(pointing, dec=self.dec).replace('dl1_', 'dl2_')) + return os.path.join(self.dl2_dir(pointing), filename) + + class PathConfigAllSkyFull(PathConfig): def __init__(self, prod_id, dec_list): """ @@ -1110,3 +1216,41 @@ def check_source_prod(self): dec_to_remove.append(dec) self.dec_list = list(set(self.dec_list) - set(dec_to_remove)) + + +class PathConfigAllSkyFullSplitDiffuse(PathConfigAllSkyFull): + def __init__(self, prod_id, dec_list): + super().__init__(prod_id, dec_list) + self.stages = ['r0_to_dl1', 'train_test_split', 'merge_dl1', 'train_pipe', 'dl1_to_dl2', 'dl2_to_irfs'] + + self.train_configs = {dec: PathConfigAllSkyTrainingWithSplit(prod_id, dec) for dec in dec_list} + self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec) for dec in dec_list} + self.test_diffuse_config = {dec: PathConfigAllSkyTestingGammaDiffuse(prod_id, dec) for dec in dec_list} + + @property + def train_test_split(self): + paths = [] + for dec in self.dec_list: + paths.extend(self.train_configs[dec].train_test_split) + return paths + + @property + def merge_dl1(self): + paths = super().merge_dl1 + for dec in self.dec_list: + paths.extend(self.test_diffuse_config[dec].merge_dl1) + return paths + + @property + def dl1_to_dl2(self): + paths = super().dl1_to_dl2 + for dec in self.dec_list: + paths.extend(self.test_diffuse_config[dec].dl1_to_dl2) + return paths + + @property + def dl2_to_irfs(self): + paths = super().dl2_to_irfs + for dec in self.dec_list: + paths.extend(self.test_diffuse_config[dec].dl2_to_irfs) + return paths \ No newline at end of file diff --git a/lstmcpipe/scripts/generate_nsb_levels_configs.py b/lstmcpipe/scripts/generate_nsb_levels_configs.py index b68cc199..8b9d06e2 100644 --- a/lstmcpipe/scripts/generate_nsb_levels_configs.py +++ b/lstmcpipe/scripts/generate_nsb_levels_configs.py @@ -1,8 +1,9 @@ import subprocess import json import logging -from lstchain.io.config import get_mc_config import argparse +from lstchain.io.config import get_mc_config +from pathlib import Path from datetime import date BASE_LSTCHAIN_MC_CONFIG = get_mc_config() @@ -20,9 +21,12 @@ def build_argparser(): argparse.ArgumentParser: The argument parser object. """ parser = argparse.ArgumentParser(description="Generate a set of lstchain and lstmcpipe configuration files for different nsb tuning ratios.") - parser.add_argument("--nsb_ratios", "-nsb", - nargs="+", type=float, default=None, - help="List of nsb tuning ratios. If not provided, no NSB tuning is applied.") + parser.add_argument("--config_class", "-c", + type=str, default="PathConfigAllSkyFullSplitDiffuse", + help="The class of the configuration to generate.") + parser.add_argument("--nsb", + nargs="+", type=float, default=0, + help="List of nsb tuning values in p.e. If not provided, no NSB tuning is applied.") parser.add_argument( "--dec_list", @@ -39,13 +43,15 @@ def build_argparser(): "dec_min_1802", "dec_min_2924", "dec_min_413" - ], + ] ) + parser.add_argument("--overwrite", action="store_true", help="Overwrite existing files.") + return parser -def lstchain_config_name(nsb_tuning_ratio): +def lstchain_config_name(nsb_tuning): """ Generate the name of the lstchain configuration file based on the given nsb_tuning_ratio. @@ -55,13 +61,10 @@ def lstchain_config_name(nsb_tuning_ratio): Returns: str: The name of the lstchain configuration file. """ - if nsb_tuning_ratio == 0 or nsb_tuning_ratio is None: - return "lstchain_config.json" - else: - return f"lstchain_config_nsb{nsb_tuning_ratio}.json" + return f"lstchain_config_nsb{nsb_tuning:.2f}.json" -def dump_lstchain_nsb_config(nsb_tuning_ratio): +def dump_lstchain_nsb_config(nsb_tuning, outdir="."): """ Dump the lstchain configuration file with the given nsb_tuning_ratio. @@ -69,18 +72,18 @@ def dump_lstchain_nsb_config(nsb_tuning_ratio): nsb_tuning_ratio (float): The nsb tuning ratio. """ new_config = BASE_LSTCHAIN_MC_CONFIG.copy() - if nsb_tuning_ratio == 0 or nsb_tuning_ratio is None: + if nsb_tuning == 0 or nsb_tuning is None: new_config["waveform_nsb_tuning"]["nsb_tuning"] = False else: new_config["waveform_nsb_tuning"]["nsb_tuning"] = True - new_config["waveform_nsb_tuning"]["nsb_tuning_ratio"] = nsb_tuning_ratio - json_filename = lstchain_config_name(nsb_tuning_ratio) + new_config["waveform_nsb_tuning"]["nsb_tuning_ratio"] = nsb_tuning + json_filename = Path(outdir) / lstchain_config_name(nsb_tuning) with open(json_filename, 'w') as f: json.dump(new_config, f, indent=4) logger.info(f"Dumped lstchain configuration file: {json_filename}") -def prod_id(nsb_tuning_ratio): +def prod_id(nsb_tuning): """ Generate the prod ID based on the given nsb_tuning_ratio. @@ -90,10 +93,10 @@ def prod_id(nsb_tuning_ratio): Returns: str: The product ID. """ - return f"{date.today()}_allsky_nsb_tuning_{nsb_tuning_ratio}" + return f"{date.today()}_allsky_nsb_tuning_{nsb_tuning:.2f}" -def lstmcpipe_config_filename(nsb_tuning_ratio): +def lstmcpipe_config_filename(nsb_tuning, outdir="."): """ Generate the name of the lstmcpipe configuration file based on the given nsb_tuning_ratio. @@ -103,10 +106,7 @@ def lstmcpipe_config_filename(nsb_tuning_ratio): Returns: str: The name of the lstmcpipe configuration file. """ - if nsb_tuning_ratio == 0 or nsb_tuning_ratio is None: - return "lstmcpipe_config.json" - else: - return f"lstmcpipe_config_nsb{nsb_tuning_ratio}.json" + return Path(outdir) / f"lstmcpipe_config_nsb{nsb_tuning:.2f}.yml" def main(): @@ -116,31 +116,33 @@ def main(): parser = build_argparser() args = parser.parse_args() - dec_list = " ".join(args.dec_list) - nsb_tuning_ratios = args.nsb_ratios - - if nsb_tuning_ratios is None: - nsb_tuning_ratios = [None] - for nsb_tuning_ratio in nsb_tuning_ratios: - logger.info(f"Working on ratio {nsb_tuning_ratio}") - dump_lstchain_nsb_config(nsb_tuning_ratio) + nsb_tuning_values = args.nsb + config_class = args.config_class + + for nsb_tuning in nsb_tuning_values: + logger.info(f"Working on NSB {nsb_tuning}") + outdir = Path(f"NSB-{nsb_tuning:.2f}") + outdir.mkdir(parents=True, exist_ok=True) + dump_lstchain_nsb_config(nsb_tuning, outdir) tmp_lstchain_config = "tmp_lstchain_config.json" command = [ "lstmcpipe_generate_config", - "PathConfigAllSkyFull", + config_class, "--prod_id", - prod_id(nsb_tuning_ratio), + prod_id(nsb_tuning), "-o", - lstmcpipe_config_filename(nsb_tuning_ratio), - "--dec_list", - dec_list, + lstmcpipe_config_filename(nsb_tuning, outdir), "--lstchain_conf", - tmp_lstchain_config + tmp_lstchain_config, + "--dec_list", ] + command.extend(args.dec_list) + if args.overwrite: + command.append("--overwrite") subprocess.run(command, check=True) # Delete tmp_lstchain_config (the lstchain configs with nsb tuning are already dumped) subprocess.run(["rm", tmp_lstchain_config], check=True) - logger.info(f"Generated lstmcpipe configuration file: {lstmcpipe_config_filename(nsb_tuning_ratio)}") + logger.info(f"Generated lstmcpipe configuration file: {lstmcpipe_config_filename(nsb_tuning)}") if __name__ == "__main__":