Skip to content

Commit

Permalink
Merge tf_kernel
Browse files Browse the repository at this point in the history
- keep processing_type method, but also add fix to check_if_fc_levels_already_exist from widescale_tests
  • Loading branch information
kkappler committed Sep 14, 2023
1 parent c5ff9f4 commit 0a4e991
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def check_if_fc_levels_already_exist(self):
(Could also iterate over kernel_dataset.dataframe, to get the groupby).
If all FC Levels for a given station-run are already built, mark the RunSummary with a True in
the "fc" column.
the "fc" column. Otherwise its False
Note 1: Because decimation is a cascading operation, we avoid the case where some (valid) decimation
levels exist in the mth5 FC archive and others do not. The maximum granularity tolerated will be at the
Expand All @@ -163,8 +163,18 @@ def check_if_fc_levels_already_exist(self):
of the dataset_df may refernece the same h5, and I don't know if updating one row will have unintended
consequences.
Note #4: associated_run_sub_df may have multiple rows, even though the run id is unique.
This could happen for example when you have a long run at the local station, but multiple (say two) shorter runs
at the reference station. In that case, the processing summary will have a separate row for the
intersection of the long run with each of the remote runs. We ignore this for now, selecting only the first
element of the associated_run_sub_df, under the assumption that FCs have been created for the entire run,
or not at all. This assumption can be relaxed in future by using the time_period attribute of the FC layer.
For now, we proceed with the all-or-none logic. That is, if a ['survey', 'station_id', 'run_id',] has FCs,
assume that the FCs are present for the entire run. We assign the "fc" column of dataset_df to have the same
boolean value for all rows of same ['survey', 'station_id', 'run_id',] .
Returns: None
Modifies self.dataset_df inplace, assigning bool to the "fc" column
Modifies self.dataset_df inplace, assigning bools to the "fc" column
"""
groupby = ['survey', 'station_id', 'run_id',]
Expand All @@ -175,31 +185,36 @@ def check_if_fc_levels_already_exist(self):
cond2 = self.dataset_df.station_id == station_id
cond3 = self.dataset_df.run_id == run_id
associated_run_sub_df = self.dataset_df[cond1 & cond2 & cond3]
assert len(associated_run_sub_df) == 1 # should be unique
dataset_df_index = associated_run_sub_df.index[0]

if len(associated_run_sub_df) > 1:
# See Note #4
print("Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs")

dataset_df_indices = np.r_[associated_run_sub_df.index]
#dataset_df_indices = associated_run_sub_df.index.to_numpy()
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {run_row.survey}, station_id: {run_row.station_id}, run_id: {run_row.run_id}"

# See Note #3 above
# See Note #3 above relating to mixing multiple surveys in a processing scheme
mth5_obj = self.mth5_objs[station_id]
survey_obj = mth5_obj.get_survey(survey_id)
station_obj = survey_obj.stations_group.get_station(station_id)
if not station_obj.fourier_coefficients_group.groups_list:
msg = f"Prebuilt Fourier Coefficients not detected for {row_ssr_str} -- will need to build them "
print(msg)
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
else:
print("Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements...")
# Assume FC Groups are keyed by run_id, check if there is a relevant group
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
except MTH5Error:
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Run ID {run_id} not found in FC Groups, -- will need to build them ")
continue

if len(fc_group.groups_list) < self.processing_config.num_decimation_levels:
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Not enough FC Groups available for {row_ssr_str} -- will need to build them ")
continue

Expand All @@ -212,7 +227,7 @@ def check_if_fc_levels_already_exist(self):
# See note #2
fcs_already_there = fc_group.supports_aurora_processing_config(self.processing_config,
run_row.remote)
self.dataset_df["fc"].iat[dataset_df_index] = fcs_already_there
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_already_there

return

Expand Down

0 comments on commit 0a4e991

Please sign in to comment.