Skip to content

Commit

Permalink
split gamma diffuse in train/test with new child configs (#483)
Browse files Browse the repository at this point in the history
* split gamma diffuse in train/test with new child configs

* introduces new configs to split gamma diffuse into train/test and run the workflow accorcingly

* rename the FullSplitDiffuse config

* fix full to include train_test split stage

* fix issues with script generating nsb configs and use confif with split as default one

* remove unused dec_list and make consistent naming for NSB

* add test on diffuse gammas in FullSplitDiffuse config

---------

Co-authored-by: vuillaut <vuillaume@lapp.in2p3.fr>
  • Loading branch information
vuillaut and vuillaut committed Sep 9, 2024
1 parent fd95605 commit b1a2571
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 49 deletions.
170 changes: 157 additions & 13 deletions lstmcpipe/config/paths_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,15 @@ def dl1ab(self):

class PathConfigAllSkyBase(PathConfig):
"""
Standard paths configuration for a prod5_trans_80 MC production
Standard paths configuration for a LSTProd2 MC production
dataset_type: 'Training' or 'Testing'
"""

def __init__(self, prod_id, dec):
super().__init__(prod_id)
self.prod_id = prod_id
self.dec = dec
self.base_dir = "/fefs/aswg/data/mc/{data_level}/AllSky/{prod_id}/{dataset_type}/{dec}/{particle}/{pointing}/"
self.base_dir = "/fefs/aswg/data/mc/{data_level}/AllSky/{prod_id}/{dataset_type}/{particle}/{dec}/{pointing}/"

self.paths = {}
self.stages = []
Expand Down Expand Up @@ -623,7 +623,7 @@ def r0_to_dl1(self):
return paths

def training_merged_dl1(self, particle):
return os.path.join(self.dl1_dir(particle, ''), f'dl1_{self.prod_id}_{self.dec}_{particle}_merged.h5')
return os.path.join(self.dl1_dir(particle, ''), f'dl1_{self.prod_id}_train_{self.dec}_{particle}_merged.h5')

@property
def merge_dl1(self):
Expand All @@ -637,7 +637,7 @@ def merge_dl1(self):
'input': dl1,
'output': merged_dl1,
'options': '--pattern */*.h5 --no-image',
'extra_slurm_options': {'partition': 'long'},
'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'},
}
)
return paths
Expand All @@ -651,23 +651,65 @@ def train_pipe(self):
'proton': self.training_merged_dl1('Protons'),
},
'output': self.models_dir(),
'extra_slurm_options': {'partition': 'xxl', 'mem': '160G', 'cpus-per-task': 16}
'extra_slurm_options': {'partition': 'xxl', 'mem': '160G', 'cpus-per-task': 16, 'time': '03-00:00:00'}
if self.dec == _crab_dec
else {'partition': 'xxl', 'mem': '100G', 'cpus-per-task': 16},
else {'partition': 'xxl', 'mem': '100G', 'cpus-per-task': 16, 'time': '03-00:00:00'},
}
]
return paths



class PathConfigAllSkyTrainingWithSplit(PathConfigAllSkyTraining):
def __init__(self, prod_id, dec):
super().__init__(prod_id, dec)
self.stages.insert(1, 'train_test_split')

def dl1_diffuse_test_dir(self, pointing):
return self.dl1_dir('GammaDiffuse', pointing).replace('TrainingDataset', 'TestingDataset') + '/test'

def dl1_diffuse_train_dir(self, pointing):
return self.dl1_dir('GammaDiffuse', pointing) + '/train'

@property
def train_test_split(self):
paths = []
for pointing in self.pointing_dirs('GammaDiffuse'):
dl1 = self.dl1_dir('GammaDiffuse', pointing)
train = self.dl1_diffuse_train_dir(pointing)
test = self.dl1_diffuse_test_dir(pointing)
paths.append({'input': dl1, 'output': {'train': train, 'test': test}, 'options': {'test_size': 0.5}})
return paths

@property
def merge_dl1(self):
# for the training particles, all the nodes get merged
paths = []
for particle in self.training_particles:
dl1 = self.dl1_dir(particle, '')
merged_dl1 = self.training_merged_dl1(particle)
pattern = '*/*/*.h5' if particle == 'GammaDiffuse' else '*/*.h5' # this is needed because search is not recursive in lstchain. can be changed after https://github.com/cta-observatory/cta-lstchain/pull/1286
paths.append(
{
'input': dl1,
'output': merged_dl1,
'options': f'--pattern {pattern} --no-image',
'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'},
}
)
return paths


class PathConfigAllSkyTesting(PathConfigAllSkyBase):
def __init__(self, prod_id, dec):
super().__init__(prod_id, dec)
self.testing_dir = "/fefs/aswg/data/mc/DL0/LSTProd2/TestDataset/sim_telarray/{pointing}/output_v1.4/"
self.dataset_type = 'TestingDataset'
self.particle = 'Gamma'
self.stages = ['r0_to_dl1', 'merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs']

def pointing_dirs(self):
return self.pointings['dirname']
return self.pointings[f'dirname_{self.particle}']

def r0_dir(self, pointing):
return self.testing_dir.format(pointing=pointing)
Expand Down Expand Up @@ -698,7 +740,7 @@ def load_pointings(self):
alt, az = (90.0 - float(pt.groups()[0])) * u.deg, (float(pt.groups()[1])) * u.deg
data.append([Angle(alt).wrap_at('180d'), Angle(az).wrap_at('360d'), d])
reshaped_data = [[dd[0] for dd in data], [dd[1] for dd in data], [dd[2] for dd in data]]
self._testing_pointings = QTable(data=reshaped_data, names=['alt', 'az', 'dirname'])
self._testing_pointings = QTable(data=reshaped_data, names=['alt', 'az', f'dirname_{self.particle}'])

@property
def pointings(self):
Expand Down Expand Up @@ -745,9 +787,9 @@ def plot_pointings(self, ax=None, projection='polar', add_grid3d=False, **kwargs
)
return ax

def dl1_dir(self, pointing):
def dl1_dir(self, pointing, dec=''):
# no declination for DL1 for TestingDataset
return super().dl1_dir(particle='', pointing=pointing, dataset_type=self.dataset_type, dec='')
return super().dl1_dir(particle=self.particle, pointing=pointing, dataset_type=self.dataset_type, dec=dec)

def dl2_dir(self, pointing):
return self._data_level_dir(
Expand Down Expand Up @@ -779,7 +821,8 @@ def r0_to_dl1(self):
return paths

def testing_merged_dl1(self, pointing):
return os.path.join(self.dl1_dir(''), f'dl1_{self.prod_id}_{pointing}_merged.h5')
particle = self.particle
return os.path.join(self.dl1_dir(''), f'dl1_{self.prod_id}_{particle}_test_{pointing}_merged.h5')

@property
def merge_dl1(self):
Expand Down Expand Up @@ -811,6 +854,10 @@ def dl2_output_file(self, pointing):
filename = os.path.basename(self.testing_merged_dl1(pointing).replace('dl1_', 'dl2_'))
return os.path.join(self.dl2_dir(pointing), filename)

def irf_output_file(self, pointing):
filename = os.path.join(self.irf_dir(pointing), f'irf_{self.prod_id}_{self.particle}_{self.dec}_{pointing}.fits.gz')
return os.path.join(self.irf_dir(pointing), filename)

@property
def dl2_to_irfs(self):
paths = []
Expand All @@ -822,15 +869,74 @@ def dl2_to_irfs(self):
'proton_file': None,
'electron_file': None,
},
'output': os.path.join(self.irf_dir(pointing), f'irf_{self.prod_id}_{pointing}.fits.gz'),
'options': '--point-like --gh-efficiency 0.7 --theta-containment 0.7 --energy-dependent-gh --energy-dependent-theta ',
'output': self.irf_output_file(pointing),
'options': '--gh-efficiency 0.7 --theta-containment 0.7 --energy-dependent-gh --energy-dependent-theta ',
'extra_slurm_options': {'mem': '6GB'},
}
if self.particle == 'Gamma':
pp['options'] += ' --point-like'
paths.append(pp)

return paths


class PathConfigAllSkyTestingGammaDiffuse(PathConfigAllSkyTesting):
def __init__(self, prod_id, dec):
"""
This config must be used after a PathConfigAllSkyTrainingWithSplit has been generated and run.
It uses the test dataset of GammaDiffuse created by the train_test_split stage of PathConfigAllSkyTrainingWithSplit,
merges the nodes and runs the dl1_to_dl2 and dl2_to_irfs stages.
"""
super().__init__(prod_id, dec)
self.stages = ['merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs']
self.train_config = PathConfigAllSkyTrainingWithSplit(prod_id, dec)
# self.pointings = self.train_config.pointings
self.particle = 'GammaDiffuse'

def testing_merged_dl1(self, pointing, dec):
particle = self.particle
return os.path.join(self.dl1_dir('', dec=dec), f'dl1_{self.prod_id}_{particle}_test_{dec}_{pointing}_merged.h5')

def load_pointings(self):
self.train_config.load_pointings()
self._testing_pointings = self.train_config._training_pointings


@property
def merge_dl1(self):
paths = []
for pointing in self.train_config.pointing_dirs(self.particle):
dl1 = self.dl1_dir(pointing, dec=self.dec)
merged_dl1 = self.testing_merged_dl1(pointing, dec=self.dec)
paths.append(
{
'input': dl1,
'output': merged_dl1,
'options': '--pattern */*.h5 --no-image',
'extra_slurm_options': {'partition': 'long', 'time': '06:00:00'},
}
)
return paths

@property
def dl1_to_dl2(self):
paths = []
for pointing in self.pointing_dirs():
paths.append(
{
'input': self.testing_merged_dl1(pointing, dec=self.dec),
'path_model': self.models_dir(),
'output': self.dl2_dir(pointing),
'extra_slurm_options': {'mem': '80GB' if self.dec == _crab_dec else '60GB'},
}
)
return paths

def dl2_output_file(self, pointing):
filename = os.path.basename(self.testing_merged_dl1(pointing, dec=self.dec).replace('dl1_', 'dl2_'))
return os.path.join(self.dl2_dir(pointing), filename)


class PathConfigAllSkyFull(PathConfig):
def __init__(self, prod_id, dec_list):
"""
Expand Down Expand Up @@ -1110,3 +1216,41 @@ def check_source_prod(self):
dec_to_remove.append(dec)

self.dec_list = list(set(self.dec_list) - set(dec_to_remove))


class PathConfigAllSkyFullSplitDiffuse(PathConfigAllSkyFull):
def __init__(self, prod_id, dec_list):
super().__init__(prod_id, dec_list)
self.stages = ['r0_to_dl1', 'train_test_split', 'merge_dl1', 'train_pipe', 'dl1_to_dl2', 'dl2_to_irfs']

self.train_configs = {dec: PathConfigAllSkyTrainingWithSplit(prod_id, dec) for dec in dec_list}
self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec) for dec in dec_list}
self.test_diffuse_config = {dec: PathConfigAllSkyTestingGammaDiffuse(prod_id, dec) for dec in dec_list}

@property
def train_test_split(self):
paths = []
for dec in self.dec_list:
paths.extend(self.train_configs[dec].train_test_split)
return paths

@property
def merge_dl1(self):
paths = super().merge_dl1
for dec in self.dec_list:
paths.extend(self.test_diffuse_config[dec].merge_dl1)
return paths

@property
def dl1_to_dl2(self):
paths = super().dl1_to_dl2
for dec in self.dec_list:
paths.extend(self.test_diffuse_config[dec].dl1_to_dl2)
return paths

@property
def dl2_to_irfs(self):
paths = super().dl2_to_irfs
for dec in self.dec_list:
paths.extend(self.test_diffuse_config[dec].dl2_to_irfs)
return paths
Loading

0 comments on commit b1a2571

Please sign in to comment.