Skip to content

Commit

Permalink
Merge branch 'epic-open-source:main' into gbowlin/add_general_binary_…
Browse files Browse the repository at this point in the history
…metric_explorer
  • Loading branch information
gbowlin authored Sep 9, 2024
2 parents f9da900 + 0839799 commit 3baeead
Show file tree
Hide file tree
Showing 11 changed files with 811 additions and 78 deletions.
2 changes: 2 additions & 0 deletions changelog/76.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added additional "merge_strategy" event configuration option.
Viable merge strategies are "first", "last", "nearest", "forward", and "count".
1 change: 1 addition & 0 deletions docs/reference/seismogram.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Properties and Accessors
Seismogram.events
Seismogram.events_columns
Seismogram.event_aggregation_method
Seismogram.event_merge_strategy
Seismogram.target
Seismogram.target_cols
Seismogram.target_event
Expand Down
7 changes: 6 additions & 1 deletion src/seismometer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import pandas as pd

from seismometer._version import __version__
from seismometer.configuration import ConfigProvider
from seismometer.core.logger import add_log_formatter, set_default_logger_config


def run_startup(
*,
config_path: str | Path = None,
output_path: str | Path = None,
config_provider: Optional[ConfigProvider] = None,
predictions_frame: Optional[pd.DataFrame] = None,
events_frame: Optional[pd.DataFrame] = None,
definitions: Optional[dict] = None,
Expand All @@ -25,9 +27,12 @@ def run_startup(
----------
config_path : Optional[str | Path], optional
The path containing the config.yml and other resources, by default None.
Optional if configProvider is provided.
output_path : Optional[str | Path], optional
An output path to write data to, overwriting the default path specified by info_dir in config.yml,
by default None.
config_provider : Optional[ConfigProvider], optional
An optional ConfigProvider instance to use instead of loading configuration from config_path, by default None.
predictions_frame : Optional[pd.DataFrame], optional
An optional DataFrame containing the fully loaded predictions data, by default None.
By default, when not specified here, these data will be loaded based on conifguration.
Expand Down Expand Up @@ -59,7 +64,7 @@ def run_startup(
if reset:
Seismogram.kill()

config = ConfigProvider(config_path, output_path=output_path, definitions=definitions)
config = config_provider or ConfigProvider(config_path, output_path=output_path, definitions=definitions)
loader = loader_factory(config)
sg = Seismogram(config, loader)
sg.load_data(predictions=predictions_frame, events=events_frame)
Expand Down
2 changes: 1 addition & 1 deletion src/seismometer/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .config import ConfigProvider
from .model import AggregationStrategies
from .model import AggregationStrategies, MergeStrategies
from .options import Option, TemplateOptions, template_options
17 changes: 17 additions & 0 deletions src/seismometer/configuration/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
FileLike = str | Path
DirLike = str | Path
AggregationStrategies = Literal["min", "max", "first", "last"]
MergeStrategies = Literal["first", "last", "nearest", "forward", "count"]


class OtherInfo(BaseModel):
Expand Down Expand Up @@ -171,6 +172,22 @@ class Event(BaseModel):
The strategy for aggregating (or selecting) scores for an event.
Supports min, max, first, and last; defaulting to max.
"""
merge_strategy: Optional[MergeStrategies] = "forward"
"""
The strategy for merging events with predictions.
Supports first, last, nearest, forward, and count; defaulting to forward.
| - first: The first event within the window is selected. If no window, the first event is selected.
| - last: The last event within the window is selected. If no window, the last event is selected.
| - nearest: The event closest to the prediction time within the window is selected.
If no window, the nearest event is selected.
| - forward: The first event at or after the prediction time within the window is selected.
If no window, the first event at or after the prediction time is selected.
| - count: Creates a count column for each event value for specified event label.
This column tracks the occurrences of each event value for each prediction.
Counts respects the window.
"""

@field_validator("source", mode="before")
@classmethod
Expand Down
29 changes: 23 additions & 6 deletions src/seismometer/data/loader/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ def merge_onto_predictions(config: ConfigProvider, event_frame: pd.DataFrame, da
The merged dataframe of predictions plus new event columns.
"""
dataframe = (
dataframe.sort_values(config.predict_time)
dataframe.sort_values(config.predict_time, kind="mergesort")
.drop_duplicates(subset=config.entity_keys + [config.predict_time])
.dropna(subset=[config.predict_time])
)
event_frame = event_frame.sort_values("Time")
event_frame = event_frame.sort_values("Time", kind="mergesort")

for one_event in config.events.values():
# Merge
Expand All @@ -103,28 +103,43 @@ def merge_onto_predictions(config: ConfigProvider, event_frame: pd.DataFrame, da
offset_hrs=one_event.offset_hr,
display=one_event.display_name,
sort=False,
impute_val=one_event.impute_val,
)
else: # No lookback
logger.debug(f"Merging event {one_event.display_name}")
dataframe = _merge_event(
config, one_event.source, dataframe, event_frame, display=one_event.display_name, sort=False
config,
one_event.source,
dataframe,
event_frame,
display=one_event.display_name,
sort=False,
impute_val=one_event.impute_val,
)

# Impute no event
if one_event.impute_val and one_event.impute_val != 0:
logger.warning(
f"Event {one_event.display_name} specified impute; "
+ "currently missing event value is inferred based on timestamp existence."
+ "currently missing event value is being inferred based on timestamp existence."
)
event_val = pdh.event_value(one_event.display_name)
impute = one_event.impute_val or 0 # Enforce binary type target
impute = one_event.impute_val
dataframe[event_val] = dataframe[event_val].fillna(impute)

return dataframe


def _merge_event(
config, event_cols, dataframe, event_frame, offset_hrs=0, window_hrs=None, display="", sort=True
config,
event_cols,
dataframe,
event_frame,
offset_hrs=0,
window_hrs=None,
display="",
sort=True,
impute_val=None,
) -> pd.DataFrame:
"""Wrapper for calling merge_windowed_event with the correct event column names."""
disp_event = display if display else event_cols[0]
Expand All @@ -139,4 +154,6 @@ def _merge_event(
window_hrs=window_hrs,
event_base_time_col="Time",
sort=sort,
merge_strategy=config.events[disp_event].merge_strategy,
impute_val=impute_val,
)
Loading

0 comments on commit 3baeead

Please sign in to comment.