-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #622 from VisLab/develop
Started on an event manager
- Loading branch information
Showing
13 changed files
with
460 additions
and
189 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
""" Manages context and events of temporal extent. """ | ||
|
||
from hed.schema import HedSchema, HedSchemaGroup | ||
from hed.tools.analysis.temporal_event import TemporalEvent | ||
from hed.models.model_constants import DefTagNames | ||
|
||
|
||
class EventManager: | ||
|
||
def __init__(self, data, hed_schema): | ||
""" Create an event manager for an events file. | ||
Parameters: | ||
data (TabularInput): A tabular input file. | ||
hed_schema (HedSchema): A HED schema | ||
Raises: | ||
HedFileError: if there are any unmatched offsets. | ||
""" | ||
|
||
if not isinstance(hed_schema, HedSchema) and not isinstance(hed_schema, HedSchemaGroup): | ||
raise ValueError("ContextRequiresSchema", f"Context manager must have a valid HedSchema of HedSchemaGroup") | ||
self.hed_schema = hed_schema | ||
self.data = data | ||
self.event_list = [[] for _ in range(len(self.data.dataframe))] | ||
self.hed_strings = [None for _ in range(len(self.data.dataframe))] | ||
self.onset_count = 0 | ||
self.offset_count = 0 | ||
self.contexts = [] | ||
self._create_event_list() | ||
|
||
def iter_context(self): | ||
""" Iterate rows of context. | ||
Yields: | ||
int: position in the dataFrame | ||
HedStringGroup: Context | ||
""" | ||
|
||
for index in range(len(self.contexts)): | ||
yield index, self.contexts[index] | ||
|
||
def _create_event_list(self): | ||
""" Create a list of events of extended duration. | ||
Raises: | ||
HedFileError: If the hed_strings contain unmatched offsets. | ||
""" | ||
|
||
# self.hed_strings = [HedString(str(hed), hed_schema=hed_schema) for hed in hed_strings] | ||
# hed_list = list(self.data.iter_dataframe(hed_ops=[self.hed_schema], return_string_only=False, | ||
# expand_defs=False, remove_definitions=True)) | ||
|
||
onset_dict = {} | ||
event_index = 0 | ||
for hed in self.data.iter_dataframe(hed_ops=[self.hed_schema], return_string_only=True, | ||
expand_defs=False, remove_definitions=True): | ||
# to_remove = [] # tag_tuples = hed.find_tags(['Onset'], recursive=False, include_groups=1) | ||
self.hed_strings[event_index] = hed | ||
group_tuples = hed.find_top_level_tags(anchor_tags={DefTagNames.ONSET_KEY, DefTagNames.OFFSET_KEY}, | ||
include_groups=2) | ||
for tup in group_tuples: | ||
group = tup[1] | ||
anchor_tag = group.find_def_tags(recursive=False, include_groups=0)[0] | ||
anchor = anchor_tag.extension_or_value_portion.lower() | ||
if anchor in onset_dict or tup[0].short_base_tag.lower() == "offset": | ||
temporal_event = onset_dict.pop(anchor) | ||
temporal_event.set_end(event_index, self.data.dataframe.loc[event_index, "onset"]) | ||
if tup[0] == DefTagNames.ONSET_KEY: | ||
new_event = TemporalEvent(tup[1], event_index, self.data.dataframe.loc[event_index, "onset"]) | ||
self.event_list[event_index].append(new_event) | ||
onset_dict[anchor] = new_event | ||
# to_remove.append(tup[1]) | ||
# hed.remove(to_remove) | ||
event_index = event_index + 1 | ||
|
||
# Now handle the events that extend to end of list | ||
for item in onset_dict.values(): | ||
item.set_end(len(self.data.dataframe), None) | ||
|
||
def _set_event_contexts(self): | ||
""" Creates an event context for each hed string. | ||
Notes: | ||
The event context would be placed in a event context group, but is kept in a separate array without the | ||
event context group or tag. | ||
""" | ||
# contexts = [[] for _ in range(len(self.hed_strings))] | ||
# for onset in self.onset_list: | ||
# for i in range(onset.start_index+1, onset.end_index): | ||
# contexts[i].append(onset.contents) | ||
# for i in range(len(self.hed_strings)): | ||
# contexts[i] = HedString(",".join(contexts[i]), hed_schema=self.hed_schema) | ||
# self.contexts = contexts | ||
print("_set_event_contexts not implemented yet") | ||
|
||
def _update_onset_list(self, group, onset_dict, event_index): | ||
""" Process one onset or offset group to create onset_list. | ||
Parameters: | ||
group (HedGroup): The HedGroup containing the onset or offset. | ||
onset_dict (dict): A dictionary of OnsetGroup objects that keep track of span of an event. | ||
event_index (int): The event number in the list. | ||
Raises: | ||
HedFileError if an unmatched offset is encountered. | ||
Notes: | ||
- Modifies onset_dict and onset_list. | ||
""" | ||
# def_tags = group.find_def_tags(recursive=False, include_groups=0) | ||
# name = def_tags[0].extension_or_value_portion | ||
# onset_element = onset_dict.pop(name, None) | ||
# if onset_element: | ||
# onset_element.end_index = event_index | ||
# self.onset_list.append(onset_element) | ||
# elif is_offset: | ||
# raise HedFileError("UnmatchedOffset", f"Unmatched {name} offset at event {event_index}", " ") | ||
# if not is_offset: | ||
# onset_element = TemporalEvent(name, group, event_index) | ||
# onset_dict[name] = onset_element |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from hed.models import HedTag, HedGroup, HedString | ||
|
||
|
||
class TemporalEvent: | ||
def __init__(self, event_group, start_index, start_time): | ||
self.event_group = event_group | ||
self.start_index = start_index | ||
self.start_time = start_time | ||
self.duration = None | ||
self.end_index = None | ||
self.end_time = None | ||
self.anchor = None | ||
self.internal_group = None | ||
self._split_group() | ||
|
||
def set_end(self, end_index, end_time): | ||
self.end_index = end_index | ||
self.end_time = end_time | ||
|
||
def _split_group(self): | ||
for item in self.event_group.children: | ||
if isinstance(item, HedTag) and (item.short_tag.lower() != "onset"): | ||
self.anchor = item.extension_or_value_portion.lower() | ||
elif isinstance(item, HedTag): | ||
continue | ||
elif isinstance(item, HedGroup): | ||
self.internal_group = item | ||
|
||
def __str__(self): | ||
return f"{self.name}:[event markers {self.start_index}:{self.end_index} contents:{self.contents}]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import os | ||
import unittest | ||
from hed.errors.exceptions import HedFileError | ||
from hed.models.hed_group import HedGroup | ||
from hed.models.hed_string import HedString | ||
from hed.models.sidecar import Sidecar | ||
from hed.models.tabular_input import TabularInput | ||
from hed.schema.hed_schema_io import load_schema_version | ||
from hed.tools.analysis.hed_context_manager import HedContextManager, OnsetGroup | ||
from hed.tools.analysis.analysis_util import get_assembled_strings | ||
from hed.tools.analysis.event_manager import EventManager | ||
from hed.tools.analysis.temporal_event import TemporalEvent | ||
|
||
|
||
class Test(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
schema = load_schema_version(xml_version="8.1.0") | ||
bids_root_path = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)), | ||
'../../data/bids_tests/eeg_ds003645s_hed')) | ||
events_path = os.path.realpath(os.path.join(bids_root_path, | ||
'sub-002/eeg/sub-002_task-FacePerception_run-1_events.tsv')) | ||
sidecar_path = os.path.realpath(os.path.join(bids_root_path, 'task-FacePerception_events.json')) | ||
sidecar1 = Sidecar(sidecar_path, name='face_sub1_json') | ||
cls.input_data = TabularInput(events_path, sidecar=sidecar1, hed_schema=schema, name="face_sub1_events") | ||
cls.schema = schema | ||
|
||
def test_constructor(self): | ||
manager1 = EventManager(self.input_data, self.schema) | ||
self.assertEqual(len(manager1.event_list), len(self.input_data.dataframe)) | ||
event_count = 0 | ||
for index, item in enumerate(manager1.event_list): | ||
for event in item: | ||
event_count = event_count + 1 | ||
self.assertFalse(event.duration) | ||
self.assertTrue(event.end_index) | ||
self.assertEqual(event.start_index, index) | ||
self.assertEqual(event.start_index, index) | ||
self.assertEqual(event.start_time, manager1.data.dataframe.loc[index, "onset"]) | ||
if not event.end_time: | ||
self.assertEqual(event.end_index, len(manager1.data.dataframe)) | ||
|
||
print("to here") | ||
|
||
# def test_constructor(self): | ||
# with self.assertRaises(ValueError) as cont: | ||
# HedContextManager(self.test_strings1, None) | ||
# self.assertEqual(cont.exception.args[0], "ContextRequiresSchema") | ||
|
||
# def test_iter(self): | ||
# hed_strings = get_assembled_strings(self.input_data, hed_schema=self.schema, expand_defs=False) | ||
# manager1 = HedContextManager(hed_strings, self.schema) | ||
# i = 0 | ||
# for hed, context in manager1.iter_context(): | ||
# self.assertEqual(hed, manager1.hed_strings[i]) | ||
# self.assertEqual(context, manager1.contexts[i]) | ||
# i = i + 1 | ||
# | ||
# def test_constructor_from_assembled(self): | ||
# hed_strings = get_assembled_strings(self.input_data, hed_schema=self.schema, expand_defs=False) | ||
# manager1 = HedContextManager(hed_strings, self.schema) | ||
# self.assertEqual(len(manager1.hed_strings), 200, | ||
# "The constructor for assembled strings has expected # of strings") | ||
# self.assertEqual(len(manager1.onset_list), 261, | ||
# "The constructor for assembled strings has onset_list of correct length") | ||
# | ||
# def test_constructor_unmatched(self): | ||
# with self.assertRaises(HedFileError) as context: | ||
# HedContextManager(self.test_strings2, self.schema) | ||
# self.assertEqual(context.exception.args[0], 'UnmatchedOffset') | ||
# | ||
# def test_constructor_multiple_values(self): | ||
# manager = HedContextManager(self.test_strings3, self.schema) | ||
# self.assertEqual(len(manager.onset_list), 3, "Constructor should have right number of onsets") | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.