Skip to content

Commit

Permalink
Merge pull request #184 from simpeg/fix_issue_80
Browse files Browse the repository at this point in the history
Merged Runs in Frequency Domain
  • Loading branch information
kkappler authored Jul 3, 2022
2 parents b9f535a + 9671db8 commit 27cdc59
Show file tree
Hide file tree
Showing 40 changed files with 2,764 additions and 2,237 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: [3.8, 3.7, 3.6]
python-version: [3.8,]# 3.7, 3.6]

steps:
- uses: actions/checkout@v2
Expand Down
19 changes: 13 additions & 6 deletions aurora/config/config_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self, **kwargs):

def create_run_processing_object(
self, station_id=None, run_id=None, mth5_path=None, sample_rate=1,
input_channels=["hx", "hy"], output_channels=["hz", "ex", "ey"],
input_channels=["hx", "hy"], output_channels=["hz", "ex", "ey"],
estimator=None,
emtf_band_file=BANDS_DEFAULT_FILE, **kwargs):
"""
Create a default processing object
Expand Down Expand Up @@ -60,11 +61,17 @@ def create_run_processing_object(
else:
d = 4
sr = sample_rate / (d ** int(key))
processing_obj.decimations_dict[key].decimation.factor = d
processing_obj.decimations_dict[key].decimation.sample_rate = sr
processing_obj.decimations_dict[key].input_channels = input_channels
processing_obj.decimations_dict[key].output_channels = output_channels

decimation_obj = processing_obj.decimations_dict[key]
decimation_obj.decimation.factor = d
decimation_obj.decimation.sample_rate = sr
decimation_obj.input_channels = input_channels
decimation_obj.output_channels = output_channels
#set estimator if provided as kwarg
if estimator:
try:
decimation_obj.estimator.engine = estimator["engine"]
except KeyError:
pass
return processing_obj

def to_json(self, processing_obj, path=None, nested=True, required=False):
Expand Down
13 changes: 12 additions & 1 deletion aurora/config/metadata/decimation_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,18 @@ def frequency_bands_obj(self):
self.window.num_samples)
return frequency_bands



@property
def windowing_scheme(self):
from aurora.time_series.windowing_scheme import WindowingScheme
windowing_scheme = WindowingScheme(
taper_family=self.window.type,
num_samples_window=self.window.num_samples,
num_samples_overlap=self.window.overlap,
taper_additional_args=self.window.additional_args,
sample_rate=self.decimation.sample_rate,
)
return windowing_scheme

# def to_stft_config_dict(self):
# """
Expand Down
22 changes: 21 additions & 1 deletion aurora/config/metadata/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,24 @@ def num_decimation_levels(self):
def drop_reference_channels(self):
for decimation in self.decimations:
decimation.reference_channels = []
return
return

def validate(self):
"""
Placeholder. Some of the checks and methods here maybe better placed in
TFKernel, which would validate the dataset against the processing config.
The reason the validator is being created is that the default estimation
engine from the json file is "RME_RR", which is fine (we expect to in general
do more RR processing than SS) but if there is only one station (no remote)
then the RME_RR should be replaced by default with "RME".
Returns
-------
"""
# Make sure a RR method is not being called for a SS config
if not self.stations.remote:
for decimation in self.decimations:
if decimation.estimator.engine == "RME_RR":
decimation.estimator.engine = "RME"
25 changes: 25 additions & 0 deletions aurora/config/metadata/standards/window.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,30 @@
"alias": [],
"example": "hamming",
"default": "boxcar"
},
"clock_zero_type": {
"type": "string",
"required": true,
"units": null,
"style": "controlled vocabulary",
"description": "how the clock-zero is specified",
"options": [
"user specified",
"data start",
"ignore"],
"alias": [],
"example": "user specified",
"default": "ignore"
},
"clock_zero": {
"type": "string",
"required": false,
"units": null,
"style": "time",
"description": "Start date and time of the first data window",
"options": [],
"alias": [],
"example": "2020-02-01T09:23:45.453670+00:00",
"default": "1980-01-01T00:00:00+00:00"
}
}
2 changes: 1 addition & 1 deletion aurora/config/metadata/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def from_dataset_dataframe(self, df):

station = df[df.remote==False].station_id.unique()[0]
rr_stations = df[df.remote==True].station_id.unique()

self.local.from_dataset_dataframe(df[df.station_id==station])

for rr_station in rr_stations:
Expand Down
102 changes: 50 additions & 52 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,9 @@ def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrts, units,
"""
Operates on a "per-run" basis
Note 1: CHECK DATA COVERAGE IS THE SAME IN BOTH LOCAL AND RR
This should be pushed into a previous validator before pipeline starts
# if config.reference_station_id:
# local_run_xrts = local_run_xrts.where(local_run_xrts.time <=
# remote_run_xrts.time[-1]).dropna(
# dim="time")
This method could be modifed in a multiple station code so that it doesn't care
if the station is "local" or "remote" but rather uses scale factors keyed by
station_id
Parameters
----------
Expand All @@ -138,18 +134,15 @@ def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrts, units,
"""
stft_config = processing_config.get_decimation_level(i_dec_level)
stft_obj = run_ts_to_stft(stft_config, run_xrts)

print("fix this so that it gets from config based on station_id, without caring "
"if local or remote")
# stft_obj = run_ts_to_stft_scipy(stft_config, run_xrts)
run_id = run_obj.metadata.id
if station_id==processing_config.stations.local.id:
scale_factors = processing_config.stations.local.run_dict[
run_id].channel_scale_factors
#Need to add logic here to look through list of remote ids
elif station_id==processing_config.stations.remote[0].id:
scale_factors = processing_config.stations.remote[0].run_dict[
run_id].channel_scale_factors
# local_stft_obj = run_ts_to_stft_scipy(config, local_run_xrts)

stft_obj = calibrate_stft_obj(
stft_obj,
run_obj,
Expand Down Expand Up @@ -204,9 +197,17 @@ def export_tf(tf_collection, station_metadata_dict={}, survey_dict={}):
This method may wind up being embedded in the TF class
Assign transfer_function, residual_covariance, inverse_signal_power, station, survey
Parameters
----------
tf_collection: aurora.transfer_function.transfer_function_collection
.TransferFunctionCollection
station_metadata_dict: dict
survey_dict: dict
Returns
-------
tf_cls: mt_metadata.transfer_functions.core.TF
Transfer function container
"""
merged_tf_dict = tf_collection.get_merged_dict()
tf_cls = TF()
Expand All @@ -233,7 +234,7 @@ def export_tf(tf_collection, station_metadata_dict={}, survey_dict={}):

def populate_dataset_df(i_dec_level, config, dataset_df):
"""
Move this into a method of DatasetDefinition, self.populate_with_data()
Move this into a method of TFKDataset, self.populate_with_data()
Notes:
1. When iterating over dataframe, (i)ndex must run from 0 to len(df), otherwise
Expand Down Expand Up @@ -313,24 +314,29 @@ def close_mths_objs(df):

def process_mth5(
config,
dataset_definition=None,
tfk_dataset=None,
units="MT",
show_plot=False,
z_file_path=None,
return_collection=True,
):
"""
1. Read in the config and figure out how many decimation levels there are
2. ToDo: Based on the run durations, and sampling rates, determined which runs
2. ToDo TFK: Based on the run durations, and sampling rates, determined which runs
are valid for which decimation levels, or for which effective sample rates. This
action should be taken before we get here. The dataset_definition should already
action should be taken before we get here. The tfk_dataset should already
be trimmed to exactly what will be processed.
3. ToDo TFK Check that data coverage is the same in both local and RR data
# if config.remote_station_id:
# local_run_xrts = local_run_xrts.where(local_run_xrts.time <=
# remote_run_xrts.time[-1]).dropna(
# dim="time")
Parameters
----------
config: aurora.config.metadata.processing.Processing or path to json
All processing parameters
dataset_definition: aurora.tf_kernel.dataset.DatasetDefinition or None
tfk_dataset: aurora.tf_kernel.dataset.Dataset or None
Specifies what datasets to process according to config
units: string
"MT" or "SI". To be deprecated once data have units embedded
Expand All @@ -348,13 +354,16 @@ def process_mth5(
"""

processing_config, mth5_objs = initialize_pipeline(config)
dataset_df = dataset_definition.df
dataset_df = tfk_dataset.df

# Here is where any checks that would be done by TF Kernel would be applied
#see notes labelled with ToDo TFK above

#Assign additional columns to dataset_df, populate with mth5_objs
all_mth5_objs = len(dataset_df) * [None]
mth5_obj_column = len(dataset_df) * [None]
for i, station_id in enumerate(dataset_df["station_id"]):
all_mth5_objs[i] = mth5_objs[station_id]
dataset_df["mth5_obj"] = all_mth5_objs
mth5_obj_column[i] = mth5_objs[station_id]
dataset_df["mth5_obj"] = mth5_obj_column
dataset_df["run"] = None
dataset_df["run_dataarray"] = None
dataset_df["stft"] = None
Expand All @@ -369,15 +378,22 @@ def process_mth5(
dataset_df = populate_dataset_df(i_dec_level, dec_level_config, dataset_df)
#ANY MERGING OF RUNS IN TIME DOMAIN WOULD GO HERE

#TFK 1: get clock-zero from data if needed
if dec_level_config.window.clock_zero_type == "data start":
dec_level_config.window.clock_zero = str(dataset_df.start.min())

# Apply STFT to all runs
local_stfts = []
remote_stfts = []
for i,row in dataset_df.iterrows():
run_xrts = row["run_dataarray"].to_dataset("channel")
run_obj = row["run"]
station_id = row.station_id
stft_obj = make_stft_objects(processing_config, i_dec_level, run_obj,
run_xrts, units, station_id)
stft_obj = make_stft_objects(processing_config,
i_dec_level,
run_obj,
run_xrts,
units,
row.station_id)

if row.station_id == processing_config.stations.local.id:
local_stfts.append(stft_obj)
Expand All @@ -390,7 +406,7 @@ def process_mth5(
# Could mute bad FCs here - Not implemented yet.
# RETURN FC_OBJECT

if processing_config.stations.remote:#reference_station_id:
if processing_config.stations.remote:
remote_merged_stft_obj = xr.concat(remote_stfts, "time")
else:
remote_merged_stft_obj = None
Expand Down Expand Up @@ -422,39 +438,21 @@ def process_mth5(
close_mths_objs(dataset_df)
return tf_collection
else:
# intended to be the default in future

#See ISSUE #181: Uncomment this once we have a mature multi-run test
# #dataset_definition.get_station_metadata_for_tf_archive()
# #get a list of local runs:
# cond1 = dataset_df["station_id"]==processing_config.stations.local.id
# sub_df = dataset_df[cond1]
# #sanity check:
# run_ids = sub_df.run_id.unique()
# assert(len(run_ids) == len(sub_df))
# # iterate over these runs, packing metadata into
# station_metadata = None
# for i,row in sub_df.iterrows():
# local_run_obj = row.run
# if station_metadata is None:
# station_metadata = local_run_obj.station_group.metadata
# station_metadata._runs = []
# run_metadata = local_run_obj.metadata
# station_metadata.add_run(run_metadata)

station_metadata = local_run_obj.station_group.metadata
station_metadata._runs = []
run_metadata = local_run_obj.metadata
station_metadata.add_run(run_metadata)
# intended to be the default in future (return tf_cls, not tf_collection)

local_station_id = processing_config.stations.local.id
station_metadata = tfk_dataset.get_station_metadata(local_station_id)

# Need to create an issue for this as well
if len(mth5_objs) == 1:
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
else:
print("We do not currently handle multiple mth5 objs for "
print("WARNING We do not currently handle multiple mth5 objs for "
"non-tf_collection output")
raise Exception
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
#raise Exception

print(station_metadata.run_list)
tf_cls = export_tf(
Expand Down
Loading

0 comments on commit 27cdc59

Please sign in to comment.