Skip to content

Commit

Permalink
Merge pull request #291 from simpeg/remove_nan_in_run
Browse files Browse the repository at this point in the history
Small fixes
  • Loading branch information
kkappler authored Sep 15, 2023
2 parents d8b0c62 + 1f9af7c commit 7d999ea
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 36 deletions.
62 changes: 51 additions & 11 deletions aurora/pipelines/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def apply_prewhitening(decimation_obj, run_xrds_input):
run_xrds = run_xrds_input.differentiate("time")

else:
print(f"{decimation_obj.prewhitening_type} pre-whitening not implemented")
print(

Check warning on line 63 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L63

Added line #L63 was not covered by tests
f"{decimation_obj.prewhitening_type} pre-whitening not implemented"
)
raise NotImplementedError
return run_xrds

Expand All @@ -85,11 +87,12 @@ def apply_recoloring(decimation_obj, stft_obj):
return stft_obj
if not decimation_obj.recoloring:
return stft_obj

Check warning on line 89 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L89

Added line #L89 was not covered by tests

if decimation_obj.prewhitening_type == "first difference":
# replace below with decimation_obj.get_fft_harmonics() ?
freqs = get_fft_harmonics(
decimation_obj.window.num_samples, decimation_obj.sample_rate_decimation
decimation_obj.window.num_samples,
decimation_obj.sample_rate_decimation,
)
prewhitening_correction = 1.0j * 2 * np.pi * freqs # jw

Expand All @@ -105,7 +108,9 @@ def apply_recoloring(decimation_obj, stft_obj):
# MA = 4 # add this to processing config

else:
print(f"{decimation_obj.prewhitening_type} recoloring not yet implemented")
print(

Check warning on line 111 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L111

Added line #L111 was not covered by tests
f"{decimation_obj.prewhitening_type} recoloring not yet implemented"
)
raise NotImplementedError

return stft_obj
Expand Down Expand Up @@ -194,7 +199,9 @@ def truncate_to_clock_zero(decimation_obj, run_xrds):
pass # time series start is already clock zero
else:
windowing_scheme = window_scheme_from_decimation(decimation_obj)
number_of_steps = delta_t_seconds / windowing_scheme.duration_advance
number_of_steps = (
delta_t_seconds / windowing_scheme.duration_advance
)
n_partial_steps = number_of_steps - np.floor(number_of_steps)
n_clip = n_partial_steps * windowing_scheme.num_samples_advance
n_clip = int(np.round(n_clip))
Expand All @@ -208,6 +215,24 @@ def truncate_to_clock_zero(decimation_obj, run_xrds):
return run_xrds


def nan_to_mean(xrds):
"""
Set Nan values to mean value
:param xrds: DESCRIPTION
:type xrds: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
for ch in xrds.keys():
if not (xrds[ch].isnull() == False).all().data:
print("Null values detected in xrds -- this is not expected and should be examined")
value = np.nan_to_num(np.nanmean(xrds[ch].data))
xrds[ch] = xrds[ch].fillna(value)

Check warning on line 232 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L230-L232

Added lines #L230 - L232 were not covered by tests
return xrds


def run_ts_to_stft(decimation_obj, run_xrds_orig):
"""
Expand All @@ -225,6 +250,9 @@ def run_ts_to_stft(decimation_obj, run_xrds_orig):
recoloring. This really doesn't matter since we don't use the DC harmonic for
anything.
"""
# need to remove any nans before windowing, or else if there is a single
# nan then the whole channel becomes nan.
run_xrds = nan_to_mean(run_xrds_orig)
run_xrds = apply_prewhitening(decimation_obj, run_xrds_orig)
run_xrds = truncate_to_clock_zero(decimation_obj, run_xrds)
windowing_scheme = window_scheme_from_decimation(decimation_obj)
Expand All @@ -234,7 +262,9 @@ def run_ts_to_stft(decimation_obj, run_xrds_orig):
if not np.prod(windowed_obj.to_array().data.shape):
raise ValueError

windowed_obj = WindowedTimeSeries.detrend(data=windowed_obj, detrend_type="linear")
windowed_obj = WindowedTimeSeries.detrend(
data=windowed_obj, detrend_type="linear"
)
tapered_obj = windowed_obj * windowing_scheme.taper
stft_obj = windowing_scheme.apply_fft(
tapered_obj, detrend_type=decimation_obj.extra_pre_fft_detrend_type
Expand All @@ -244,7 +274,9 @@ def run_ts_to_stft(decimation_obj, run_xrds_orig):
return stft_obj


def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None):
def calibrate_stft_obj(
stft_obj, run_obj, units="MT", channel_scale_factors=None
):
"""
Parameters
Expand Down Expand Up @@ -272,8 +304,12 @@ def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None
print("WARNING UNEXPECTED CHANNEL WITH NO FILTERS")
if channel_id == "hy":
print("Channel HY has no filters, try using filters from HX")
channel_filter = run_obj.get_channel("hx").channel_response_filter
calibration_response = channel_filter.complex_response(stft_obj.frequency.data)
channel_filter = run_obj.get_channel(

Check warning on line 307 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L306-L307

Added lines #L306 - L307 were not covered by tests
"hx"
).channel_response_filter
calibration_response = channel_filter.complex_response(
stft_obj.frequency.data
)
if channel_scale_factors:
try:
channel_scale_factor = channel_scale_factors[channel_id]
Expand Down Expand Up @@ -314,7 +350,9 @@ def prototype_decimate(config, run_xrds):
num_channels = len(channel_labels)
new_data = np.full((num_observations, num_channels), np.nan)
for i_ch, ch_label in enumerate(channel_labels):
new_data[:, i_ch] = ssig.decimate(run_xrds[ch_label], int(config.factor))
new_data[:, i_ch] = ssig.decimate(
run_xrds[ch_label], int(config.factor)
)

xr_da = xr.DataArray(
new_data,
Expand Down Expand Up @@ -348,7 +386,9 @@ def prototype_decimate_2(config, run_xrds):
xr_ds: xr.Dataset
Decimated version of the input run_xrds
"""
new_xr_ds = run_xrds.coarsen(time=int(config.factor), boundary="trim").mean()
new_xr_ds = run_xrds.coarsen(

Check warning on line 389 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L389

Added line #L389 was not covered by tests
time=int(config.factor), boundary="trim"
).mean()
attr_dict = run_xrds.attrs
attr_dict["sample_rate"] = config.sample_rate
new_xr_ds.attrs = attr_dict
Expand Down
101 changes: 76 additions & 25 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def initialize_mth5s(self, mode="r"):
remote station id: mth5.mth5.MTH5
"""

local_mth5_obj = initialize_mth5(self.config.stations.local.mth5_path, mode=mode)
local_mth5_obj = initialize_mth5(
self.config.stations.local.mth5_path, mode=mode
)
if self.config.stations.remote:
remote_path = self.config.stations.remote[0].mth5_path
remote_mth5_obj = initialize_mth5(remote_path, mode="r")
Expand All @@ -85,7 +87,7 @@ def initialize_mth5s(self, mode="r"):
self._mth5_objs = mth5_objs
return

def update_dataset_df(self,i_dec_level):
def update_dataset_df(self, i_dec_level):
"""
This function has two different modes. The first mode initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
Expand Down Expand Up @@ -132,7 +134,11 @@ def update_dataset_df(self,i_dec_level):
run_xrds = row["run_dataarray"].to_dataset("channel")
decimation = self.config.decimations[i_dec_level].decimation
decimated_xrds = prototype_decimate(decimation, run_xrds)
self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array("channel") # See Note 1 above
self.dataset_df["run_dataarray"].at[
i
] = decimated_xrds.to_array(
"channel"
) # See Note 1 above

print("DATASET DF UPDATED")
return
Expand Down Expand Up @@ -177,7 +183,11 @@ def check_if_fc_levels_already_exist(self):
Modifies self.dataset_df inplace, assigning bools to the "fc" column
"""
groupby = ['survey', 'station_id', 'run_id',]
groupby = [
"survey",
"station_id",
"run_id",
]
grouper = self.processing_summary.groupby(groupby)

for (survey_id, station_id, run_id), df in grouper:
Expand All @@ -188,10 +198,12 @@ def check_if_fc_levels_already_exist(self):

if len(associated_run_sub_df) > 1:
# See Note #4
print("Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs")
print(

Check warning on line 201 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L201

Added line #L201 was not covered by tests
"Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs"
)

dataset_df_indices = np.r_[associated_run_sub_df.index]
#dataset_df_indices = associated_run_sub_df.index.to_numpy()
# dataset_df_indices = associated_run_sub_df.index.to_numpy()
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {run_row.survey}, station_id: {run_row.station_id}, run_id: {run_row.run_id}"

Expand All @@ -204,34 +216,48 @@ def check_if_fc_levels_already_exist(self):
print(msg)
self.dataset_df.loc[dataset_df_indices, "fc"] = False
else:
print("Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements...")
print(
"Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements..."
)
# Assume FC Groups are keyed by run_id, check if there is a relevant group
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
fc_group = (
station_obj.fourier_coefficients_group.get_fc_group(
run_id
)
)
except MTH5Error:
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Run ID {run_id} not found in FC Groups, -- will need to build them ")
print(

Check warning on line 231 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L229-L231

Added lines #L229 - L231 were not covered by tests
f"Run ID {run_id} not found in FC Groups, -- will need to build them "
)
continue

Check warning on line 234 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L234

Added line #L234 was not covered by tests

if len(fc_group.groups_list) < self.processing_config.num_decimation_levels:
if (
len(fc_group.groups_list)
< self.processing_config.num_decimation_levels
):
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Not enough FC Groups available for {row_ssr_str} -- will need to build them ")
print(

Check warning on line 241 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L240-L241

Added lines #L240 - L241 were not covered by tests
f"Not enough FC Groups available for {row_ssr_str} -- will need to build them "
)
continue

Check warning on line 244 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L244

Added line #L244 was not covered by tests

# Can check time periods here if desired, but unique (survey, station, run) should make this unneeded
# processing_run = self.processing_config.stations.local.get_run(run_id)
# for tp in processing_run.time_periods:
# assert tp in fc_group time periods


# See note #2
fcs_already_there = fc_group.supports_aurora_processing_config(self.processing_config,
run_row.remote)
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_already_there
fcs_already_there = fc_group.supports_aurora_processing_config(
self.processing_config, run_row.remote
)
self.dataset_df.loc[
dataset_df_indices, "fc"
] = fcs_already_there

return


def make_processing_summary(self):
"""
Melt the decimation levels over the run summary. Add columns to estimate
Expand All @@ -250,11 +276,15 @@ def make_processing_summary(self):
decimation_info = self.config.decimation_info()
for i_dec, dec_factor in decimation_info.items():
tmp[i_dec] = dec_factor
tmp = tmp.melt(id_vars=id_vars, value_name="dec_factor", var_name="dec_level")
tmp = tmp.melt(
id_vars=id_vars, value_name="dec_factor", var_name="dec_level"
)
sortby = ["survey", "station_id", "run_id", "start", "dec_level"]
tmp.sort_values(by=sortby, inplace=True)
tmp.reset_index(drop=True, inplace=True)
tmp.drop("sample_rate", axis=1, inplace=True) # not valid for decimated data
tmp.drop(
"sample_rate", axis=1, inplace=True
) # not valid for decimated data

# Add window info
group_by = [
Expand All @@ -269,11 +299,21 @@ def make_processing_summary(self):
print(group)
print(df)
try:
assert (df.dec_level.diff()[1:] == 1).all() # dec levels increment by 1
try:
assert (
df.dec_level.diff()[1:] == 1
).all() # dec levels increment by 1
except AssertionError:
print(

Check warning on line 307 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L306-L307

Added lines #L306 - L307 were not covered by tests
f"Skipping {group} because decimation levels are messy."
)
continue

Check warning on line 310 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L310

Added line #L310 was not covered by tests
assert df.dec_factor.iloc[0] == 1
assert df.dec_level.iloc[0] == 0
except AssertionError:
raise AssertionError("Decimation levels not structured as expected")
raise AssertionError(

Check warning on line 314 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L313-L314

Added lines #L313 - L314 were not covered by tests
"Decimation levels not structured as expected"
)
# df.sample_rate /= np.cumprod(df.dec_factor) # better to take from config
window_params_df = self.config.window_scheme(as_type="df")
df.reset_index(inplace=True, drop=True)
Expand Down Expand Up @@ -324,8 +364,14 @@ def validate_decimation_scheme_and_dataset_compatability(
"""
if min_num_stft_windows is None:
min_stft_window_info = {x.decimation.level: x.min_num_stft_windows for x in self.processing_config.decimations}
min_stft_window_list = [min_stft_window_info[x] for x in self.processing_summary.dec_level]
min_stft_window_info = {
x.decimation.level: x.min_num_stft_windows
for x in self.processing_config.decimations
}
min_stft_window_list = [
min_stft_window_info[x]
for x in self.processing_summary.dec_level
]
min_num_stft_windows = pd.Series(min_stft_window_list)

self.processing_summary["valid"] = (
Expand Down Expand Up @@ -373,8 +419,12 @@ def valid_decimations(self):
valid_levels = tmp.dec_level.unique()

dec_levels = [x for x in self.config.decimations]
dec_levels = [x for x in dec_levels if x.decimation.level in valid_levels]
print(f"After validation there are {len(dec_levels)} valid decimation levels")
dec_levels = [
x for x in dec_levels if x.decimation.level in valid_levels
]
print(
f"After validation there are {len(dec_levels)} valid decimation levels"
)
return dec_levels

def is_valid_dataset(self, row, i_dec):
Expand Down Expand Up @@ -404,7 +454,8 @@ def is_valid_dataset(self, row, i_dec):

cond = cond1 & cond2 & cond3 & cond4 & cond5
processing_row = self.processing_summary[cond]
assert len(processing_row) == 1
if len(processing_row) != 1:
return False

Check warning on line 458 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L458

Added line #L458 was not covered by tests
is_valid = processing_row.valid.iloc[0]
return is_valid

Expand Down

0 comments on commit 7d999ea

Please sign in to comment.