Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small fixes #291

Merged
merged 4 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions aurora/pipelines/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
run_xrds = run_xrds_input.differentiate("time")

else:
print(f"{decimation_obj.prewhitening_type} pre-whitening not implemented")
print(

Check warning on line 63 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L63

Added line #L63 was not covered by tests
f"{decimation_obj.prewhitening_type} pre-whitening not implemented"
)
raise NotImplementedError
return run_xrds

Expand All @@ -85,11 +87,12 @@
return stft_obj
if not decimation_obj.recoloring:
return stft_obj

if decimation_obj.prewhitening_type == "first difference":
# replace below with decimation_obj.get_fft_harmonics() ?
freqs = get_fft_harmonics(
decimation_obj.window.num_samples, decimation_obj.sample_rate_decimation
decimation_obj.window.num_samples,
decimation_obj.sample_rate_decimation,
)
prewhitening_correction = 1.0j * 2 * np.pi * freqs # jw

Expand All @@ -105,7 +108,9 @@
# MA = 4 # add this to processing config

else:
print(f"{decimation_obj.prewhitening_type} recoloring not yet implemented")
print(

Check warning on line 111 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L111

Added line #L111 was not covered by tests
f"{decimation_obj.prewhitening_type} recoloring not yet implemented"
)
raise NotImplementedError

return stft_obj
Expand Down Expand Up @@ -194,7 +199,9 @@
pass # time series start is already clock zero
else:
windowing_scheme = window_scheme_from_decimation(decimation_obj)
number_of_steps = delta_t_seconds / windowing_scheme.duration_advance
number_of_steps = (
delta_t_seconds / windowing_scheme.duration_advance
)
n_partial_steps = number_of_steps - np.floor(number_of_steps)
n_clip = n_partial_steps * windowing_scheme.num_samples_advance
n_clip = int(np.round(n_clip))
Expand All @@ -208,6 +215,24 @@
return run_xrds


def nan_to_mean(xrds):
"""
Set Nan values to mean value

:param xrds: DESCRIPTION
:type xrds: TYPE
:return: DESCRIPTION
:rtype: TYPE

"""
for ch in xrds.keys():
if not (xrds[ch].isnull() == False).all().data:
print("Null values detected in xrds -- this is not expected and should be examined")
value = np.nan_to_num(np.nanmean(xrds[ch].data))
xrds[ch] = xrds[ch].fillna(value)

Check warning on line 232 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L230-L232

Added lines #L230 - L232 were not covered by tests
return xrds


def run_ts_to_stft(decimation_obj, run_xrds_orig):
"""

Expand All @@ -225,6 +250,9 @@
recoloring. This really doesn't matter since we don't use the DC harmonic for
anything.
"""
# need to remove any nans before windowing, or else if there is a single
# nan then the whole channel becomes nan.
run_xrds = nan_to_mean(run_xrds_orig)
run_xrds = apply_prewhitening(decimation_obj, run_xrds_orig)
run_xrds = truncate_to_clock_zero(decimation_obj, run_xrds)
windowing_scheme = window_scheme_from_decimation(decimation_obj)
Expand All @@ -234,7 +262,9 @@
if not np.prod(windowed_obj.to_array().data.shape):
raise ValueError

windowed_obj = WindowedTimeSeries.detrend(data=windowed_obj, detrend_type="linear")
windowed_obj = WindowedTimeSeries.detrend(
data=windowed_obj, detrend_type="linear"
)
tapered_obj = windowed_obj * windowing_scheme.taper
stft_obj = windowing_scheme.apply_fft(
tapered_obj, detrend_type=decimation_obj.extra_pre_fft_detrend_type
Expand All @@ -244,7 +274,9 @@
return stft_obj


def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None):
def calibrate_stft_obj(
stft_obj, run_obj, units="MT", channel_scale_factors=None
):
"""

Parameters
Expand Down Expand Up @@ -272,8 +304,12 @@
print("WARNING UNEXPECTED CHANNEL WITH NO FILTERS")
if channel_id == "hy":
print("Channel HY has no filters, try using filters from HX")
channel_filter = run_obj.get_channel("hx").channel_response_filter
calibration_response = channel_filter.complex_response(stft_obj.frequency.data)
channel_filter = run_obj.get_channel(

Check warning on line 307 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L307

Added line #L307 was not covered by tests
"hx"
).channel_response_filter
calibration_response = channel_filter.complex_response(
stft_obj.frequency.data
)
if channel_scale_factors:
try:
channel_scale_factor = channel_scale_factors[channel_id]
Expand Down Expand Up @@ -314,7 +350,9 @@
num_channels = len(channel_labels)
new_data = np.full((num_observations, num_channels), np.nan)
for i_ch, ch_label in enumerate(channel_labels):
new_data[:, i_ch] = ssig.decimate(run_xrds[ch_label], int(config.factor))
new_data[:, i_ch] = ssig.decimate(
run_xrds[ch_label], int(config.factor)
)

xr_da = xr.DataArray(
new_data,
Expand Down Expand Up @@ -348,7 +386,9 @@
xr_ds: xr.Dataset
Decimated version of the input run_xrds
"""
new_xr_ds = run_xrds.coarsen(time=int(config.factor), boundary="trim").mean()
new_xr_ds = run_xrds.coarsen(

Check warning on line 389 in aurora/pipelines/time_series_helpers.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/time_series_helpers.py#L389

Added line #L389 was not covered by tests
time=int(config.factor), boundary="trim"
).mean()
attr_dict = run_xrds.attrs
attr_dict["sample_rate"] = config.sample_rate
new_xr_ds.attrs = attr_dict
Expand Down
101 changes: 76 additions & 25 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
remote station id: mth5.mth5.MTH5
"""

local_mth5_obj = initialize_mth5(self.config.stations.local.mth5_path, mode=mode)
local_mth5_obj = initialize_mth5(
self.config.stations.local.mth5_path, mode=mode
)
if self.config.stations.remote:
remote_path = self.config.stations.remote[0].mth5_path
remote_mth5_obj = initialize_mth5(remote_path, mode="r")
Expand All @@ -85,7 +87,7 @@
self._mth5_objs = mth5_objs
return

def update_dataset_df(self,i_dec_level):
def update_dataset_df(self, i_dec_level):
"""
This function has two different modes. The first mode initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
Expand Down Expand Up @@ -132,7 +134,11 @@
run_xrds = row["run_dataarray"].to_dataset("channel")
decimation = self.config.decimations[i_dec_level].decimation
decimated_xrds = prototype_decimate(decimation, run_xrds)
self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array("channel") # See Note 1 above
self.dataset_df["run_dataarray"].at[
i
] = decimated_xrds.to_array(
"channel"
) # See Note 1 above

print("DATASET DF UPDATED")
return
Expand Down Expand Up @@ -177,7 +183,11 @@
Modifies self.dataset_df inplace, assigning bools to the "fc" column

"""
groupby = ['survey', 'station_id', 'run_id',]
groupby = [
"survey",
"station_id",
"run_id",
]
grouper = self.processing_summary.groupby(groupby)

for (survey_id, station_id, run_id), df in grouper:
Expand All @@ -188,10 +198,12 @@

if len(associated_run_sub_df) > 1:
# See Note #4
print("Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs")
print(

Check warning on line 201 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L201

Added line #L201 was not covered by tests
"Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs"
)

dataset_df_indices = np.r_[associated_run_sub_df.index]
#dataset_df_indices = associated_run_sub_df.index.to_numpy()
# dataset_df_indices = associated_run_sub_df.index.to_numpy()
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {run_row.survey}, station_id: {run_row.station_id}, run_id: {run_row.run_id}"

Expand All @@ -204,34 +216,48 @@
print(msg)
self.dataset_df.loc[dataset_df_indices, "fc"] = False
else:
print("Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements...")
print(
"Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements..."
)
# Assume FC Groups are keyed by run_id, check if there is a relevant group
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
fc_group = (
station_obj.fourier_coefficients_group.get_fc_group(
run_id
)
)
except MTH5Error:
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Run ID {run_id} not found in FC Groups, -- will need to build them ")
print(

Check warning on line 231 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L231

Added line #L231 was not covered by tests
f"Run ID {run_id} not found in FC Groups, -- will need to build them "
)
continue

if len(fc_group.groups_list) < self.processing_config.num_decimation_levels:
if (
len(fc_group.groups_list)
< self.processing_config.num_decimation_levels
):
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Not enough FC Groups available for {row_ssr_str} -- will need to build them ")
print(

Check warning on line 241 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L241

Added line #L241 was not covered by tests
f"Not enough FC Groups available for {row_ssr_str} -- will need to build them "
)
continue

# Can check time periods here if desired, but unique (survey, station, run) should make this unneeded
# processing_run = self.processing_config.stations.local.get_run(run_id)
# for tp in processing_run.time_periods:
# assert tp in fc_group time periods


# See note #2
fcs_already_there = fc_group.supports_aurora_processing_config(self.processing_config,
run_row.remote)
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_already_there
fcs_already_there = fc_group.supports_aurora_processing_config(
self.processing_config, run_row.remote
)
self.dataset_df.loc[
dataset_df_indices, "fc"
] = fcs_already_there

return


def make_processing_summary(self):
"""
Melt the decimation levels over the run summary. Add columns to estimate
Expand All @@ -250,11 +276,15 @@
decimation_info = self.config.decimation_info()
for i_dec, dec_factor in decimation_info.items():
tmp[i_dec] = dec_factor
tmp = tmp.melt(id_vars=id_vars, value_name="dec_factor", var_name="dec_level")
tmp = tmp.melt(
id_vars=id_vars, value_name="dec_factor", var_name="dec_level"
)
sortby = ["survey", "station_id", "run_id", "start", "dec_level"]
tmp.sort_values(by=sortby, inplace=True)
tmp.reset_index(drop=True, inplace=True)
tmp.drop("sample_rate", axis=1, inplace=True) # not valid for decimated data
tmp.drop(
"sample_rate", axis=1, inplace=True
) # not valid for decimated data

# Add window info
group_by = [
Expand All @@ -269,11 +299,21 @@
print(group)
print(df)
try:
assert (df.dec_level.diff()[1:] == 1).all() # dec levels increment by 1
try:
assert (
df.dec_level.diff()[1:] == 1
).all() # dec levels increment by 1
except AssertionError:
print(

Check warning on line 307 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L306-L307

Added lines #L306 - L307 were not covered by tests
f"Skipping {group} because decimation levels are messy."
)
continue

Check warning on line 310 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L310

Added line #L310 was not covered by tests
assert df.dec_factor.iloc[0] == 1
assert df.dec_level.iloc[0] == 0
except AssertionError:
raise AssertionError("Decimation levels not structured as expected")
raise AssertionError(

Check warning on line 314 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L314

Added line #L314 was not covered by tests
"Decimation levels not structured as expected"
)
# df.sample_rate /= np.cumprod(df.dec_factor) # better to take from config
window_params_df = self.config.window_scheme(as_type="df")
df.reset_index(inplace=True, drop=True)
Expand Down Expand Up @@ -324,8 +364,14 @@

"""
if min_num_stft_windows is None:
min_stft_window_info = {x.decimation.level: x.min_num_stft_windows for x in self.processing_config.decimations}
min_stft_window_list = [min_stft_window_info[x] for x in self.processing_summary.dec_level]
min_stft_window_info = {
x.decimation.level: x.min_num_stft_windows
for x in self.processing_config.decimations
}
min_stft_window_list = [
min_stft_window_info[x]
for x in self.processing_summary.dec_level
]
min_num_stft_windows = pd.Series(min_stft_window_list)

self.processing_summary["valid"] = (
Expand Down Expand Up @@ -373,8 +419,12 @@
valid_levels = tmp.dec_level.unique()

dec_levels = [x for x in self.config.decimations]
dec_levels = [x for x in dec_levels if x.decimation.level in valid_levels]
print(f"After validation there are {len(dec_levels)} valid decimation levels")
dec_levels = [
x for x in dec_levels if x.decimation.level in valid_levels
]
print(
f"After validation there are {len(dec_levels)} valid decimation levels"
)
return dec_levels

def is_valid_dataset(self, row, i_dec):
Expand Down Expand Up @@ -404,7 +454,8 @@

cond = cond1 & cond2 & cond3 & cond4 & cond5
processing_row = self.processing_summary[cond]
assert len(processing_row) == 1
if len(processing_row) != 1:
return False

Check warning on line 458 in aurora/pipelines/transfer_function_kernel.py

View check run for this annotation

Codecov / codecov/patch

aurora/pipelines/transfer_function_kernel.py#L458

Added line #L458 was not covered by tests
is_valid = processing_row.valid.iloc[0]
return is_valid

Expand Down
Loading