Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer description api in neorawio and xarray reference API bridge #1513

Merged
merged 31 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3083513
Proof of concept of "buffer_description_api" and xarray reference API…
samuelgarcia Jul 26, 2024
e70c014
Implement get_analogsignal_chunk() generically when a rawio class ha…
samuelgarcia Aug 27, 2024
24fe98e
wip
samuelgarcia Aug 29, 2024
22c6698
test on micromed
samuelgarcia Aug 30, 2024
38a84f6
rebase on buffer_id
samuelgarcia Sep 9, 2024
4d31b75
Implement get_analogsignal_chunk() generically when a rawio class ha…
samuelgarcia Aug 27, 2024
5506b43
wip
samuelgarcia Aug 29, 2024
7a0de15
test on micromed
samuelgarcia Aug 30, 2024
a53d147
some fix
samuelgarcia Sep 9, 2024
907fa25
Merge
samuelgarcia Sep 9, 2024
f6ea345
make strema a slice of buffer and xarray api use buffer_id
samuelgarcia Sep 13, 2024
81c7121
json api : winedr + winwcp
samuelgarcia Sep 16, 2024
2d21e48
buffer api : RawBinarySignalRawIO + RawMCSRawIO
samuelgarcia Sep 16, 2024
a64b054
json api : neuroscope + openephysraw
samuelgarcia Sep 18, 2024
bc5a122
More reader with buffer description
samuelgarcia Sep 25, 2024
356b281
merge with master
samuelgarcia Oct 11, 2024
cf612df
update with buffer api branch
samuelgarcia Oct 11, 2024
cb6ba8d
Merge branch 'add_signal_buffer_id' of github.com:samuelgarcia/python…
samuelgarcia Oct 11, 2024
4714535
wip
samuelgarcia Oct 11, 2024
e9836d3
Merge branch 'add_signal_buffer_id' of github.com:samuelgarcia/python…
samuelgarcia Oct 11, 2024
334a882
json api start hdf5 on maxwell
samuelgarcia Oct 11, 2024
383ada7
doc for signal_stream signal_buffer
samuelgarcia Oct 11, 2024
9c35752
Fix conflicts.
samuelgarcia Oct 16, 2024
672e8fe
Merci Zach
samuelgarcia Oct 16, 2024
a540840
Use class approach for buffer api : BaseRawWithBufferApiIO
samuelgarcia Oct 16, 2024
282951a
feedback
samuelgarcia Oct 21, 2024
401956a
Apply suggestions from code review
samuelgarcia Oct 21, 2024
2fa19fb
clean
samuelgarcia Oct 21, 2024
c729989
oups
samuelgarcia Oct 23, 2024
5084fba
Merge branch 'master' of github.com:NeuralEnsemble/python-neo into js…
samuelgarcia Oct 23, 2024
b001c79
more clean
samuelgarcia Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions doc/source/rawio.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,32 @@ Read event timestamps and times
In [42]: print(ev_times)
[ 0.0317]

Signal streams and signal buffers
---------------------------------

For reading analog signals **neo.rawio** has 2 important concepts:

1. The **signal_stream** : it is a group of channels that can be read together using :func:`get_analog_signal_chunk()`.
This group of channels is guaranteed to have the same sampling rate, and the same duration per segment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have logical channels, this should be same units as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment not yet. The API do not enforce and ensure this.
See this https://github.com/NeuralEnsemble/python-neo/blob/master/neo/rawio/baserawio.py#L118
Ideally units should be add but some IO are mixing maybe the units in the same stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add it as an ideal? Or would you rather first do the changes and then change it here?

I can close this for example:

#1133

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some tricky cases where the units is manually done by the user for some formats.
not sure but maybe spike2 is in that case.
What should we do ? split all groups of same units in separate streams ? this could be an good idea but we need first a deeper check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I think that the concept of stream should include same unit across the stream but this we will need to solve in a case by case basis (the deeper check that you mention).

Most of the time, this group of channel is a "logical" group of channels. In short they are from the same headstage
or from the same auxiliary board.
Optionally, depending on the format, a **signal_stream** can be a slice of or an entire **signal_buffer**.

2. The **signal_buffer** : it is group of channels that share the same data layout in a file. The most simple example
is channel that can be read by a simple :func:`signals = np.memmap(file, shape=..., dtype=... , offset=...)`.
A **signal_buffer** can contain one or several **signal_stream**'s (very often it is only one).
There are two kind of formats that handle this concept:

* those which use :func:`np.memmap()` internally
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved
* formats based on hdf5
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved

There are many formats that do not handle this concept:

* the ones that use an external python package for reading data (edf, ced, plexon2, ...)
* the ones with a complicated data layout (e.g. those where the data blocks are split without structure)
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved

To check if a format makes use of the buffer api you can check the class attribute flag `has_buffer_description_api` of the
rawio class.



Expand Down
58 changes: 38 additions & 20 deletions neo/rawio/axonrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class AxonRawIO(BaseRawIO):

extensions = ["abf"]
rawmode = "one-file"
has_buffer_description_api = True

def __init__(self, filename=""):
BaseRawIO.__init__(self)
Expand All @@ -115,7 +116,7 @@ def _parse_header(self):
head_offset = info["sections"]["DataSection"]["uBlockIndex"] * BLOCKSIZE
totalsize = info["sections"]["DataSection"]["llNumEntries"]

self._raw_data = np.memmap(self.filename, dtype=sig_dtype, mode="r", shape=(totalsize,), offset=head_offset)
# self._raw_data = np.memmap(self.filename, dtype=sig_dtype, mode="r", shape=(totalsize,), offset=head_offset)
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved

# 3 possible modes
if version < 2.0:
Expand All @@ -142,7 +143,7 @@ def _parse_header(self):
)
else:
episode_array = np.empty(1, [("offset", "i4"), ("len", "i4")])
episode_array[0]["len"] = self._raw_data.size
episode_array[0]["len"] = totalsize
episode_array[0]["offset"] = 0

# sampling_rate
Expand All @@ -154,9 +155,14 @@ def _parse_header(self):
# one sweep = one segment
nb_segment = episode_array.size

stream_id = "0"
buffer_id = "0"

# Get raw data by segment
self._raw_signals = {}
# self._raw_signals = {}
self._t_starts = {}
self._buffer_descriptions = {0 :{}}
self._stream_buffer_slice = {stream_id : None}
pos = 0
for seg_index in range(nb_segment):
length = episode_array[seg_index]["len"]
Expand All @@ -169,7 +175,17 @@ def _parse_header(self):
if (fSynchTimeUnit != 0) and (mode == 1):
length /= fSynchTimeUnit

self._raw_signals[seg_index] = self._raw_data[pos : pos + length].reshape(-1, nbchannel)
# self._raw_signals[seg_index] = self._raw_data[pos : pos + length].reshape(-1, nbchannel)

self._buffer_descriptions[0][seg_index] = {}
self._buffer_descriptions[0][seg_index][buffer_id] = {
"type" : "binary",
"file_path" : str(self.filename),
"dtype" : str(sig_dtype),
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved
"order": "C",
"file_offset" : head_offset + pos * sig_dtype.itemsize,
"shape" : (int(length // nbchannel), int(nbchannel)),
}
pos += length

t_start = float(episode_array[seg_index]["offset"])
Expand Down Expand Up @@ -227,17 +243,14 @@ def _parse_header(self):
offset -= info["listADCInfo"][chan_id]["fSignalOffset"]
else:
gain, offset = 1.0, 0.0
stream_id = "0"
buffer_id = "0"
signal_channels.append(
(name, str(chan_id), self._sampling_rate, sig_dtype, units, gain, offset, stream_id, buffer_id)
)

signal_channels.append((name, str(chan_id), self._sampling_rate, sig_dtype, units, gain, offset, stream_id, buffer_id))

signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)

# one unique signal stream and buffer
signal_buffers = np.array([("Signals", "0")], dtype=_signal_buffer_dtype)
signal_streams = np.array([("Signals", "0", "0")], dtype=_signal_stream_dtype)
signal_buffers = np.array([("Signals", buffer_id)], dtype=_signal_buffer_dtype)
signal_streams = np.array([("Signals", stream_id, buffer_id)], dtype=_signal_stream_dtype)

# only one events channel : tag
# In ABF timstamps are not attached too any particular segment
Expand Down Expand Up @@ -295,21 +308,26 @@ def _segment_t_start(self, block_index, seg_index):
return self._t_starts[seg_index]

def _segment_t_stop(self, block_index, seg_index):
t_stop = self._t_starts[seg_index] + self._raw_signals[seg_index].shape[0] / self._sampling_rate
sig_size = self.get_signal_size(block_index, seg_index, 0)
t_stop = self._t_starts[seg_index] + sig_size / self._sampling_rate
return t_stop

def _get_signal_size(self, block_index, seg_index, stream_index):
shape = self._raw_signals[seg_index].shape
return shape[0]
# def _get_signal_size(self, block_index, seg_index, stream_index):
# shape = self._raw_signals[seg_index].shape
# return shape[0]

def _get_signal_t_start(self, block_index, seg_index, stream_index):
return self._t_starts[seg_index]

def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes):
if channel_indexes is None:
channel_indexes = slice(None)
raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes]
return raw_signals
# def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes):
# if channel_indexes is None:
# channel_indexes = slice(None)
# raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes]
# return raw_signals

def _get_analogsignal_buffer_description(self, block_index, seg_index, buffer_id):
return self._buffer_descriptions[block_index][seg_index][buffer_id]


def _event_count(self, block_index, seg_index, event_channel_index):
return self._raw_ev_timestamps.size
Expand Down
132 changes: 129 additions & 3 deletions neo/rawio/baserawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@

from neo import logging_handler

from .utils import get_memmap_chunk_from_opened_file


possible_raw_modes = [
"one-file",
Expand Down Expand Up @@ -149,6 +151,9 @@ class BaseRawIO:

rawmode = None # one key from possible_raw_modes

# If true then
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved
has_buffer_description_api = False

# TODO Why multi-file would have a single filename is confusing here - shouldn't
# the name of this argument be filenames_list or filenames_base or similar?
#
Expand Down Expand Up @@ -663,7 +668,12 @@ def get_signal_size(self, block_index: int, seg_index: int, stream_index: int |

"""
stream_index = self._get_stream_index_from_arg(stream_index)
return self._get_signal_size(block_index, seg_index, stream_index)

if not self.has_buffer_description_api:
return self._get_signal_size(block_index, seg_index, stream_index)
else:
# use the buffer description
return self._get_signal_size_generic(block_index, seg_index, stream_index)

def get_signal_t_start(self, block_index: int, seg_index: int, stream_index: int | None = None):
"""
Expand Down Expand Up @@ -812,7 +822,11 @@ def get_analogsignal_chunk(
if np.all(np.diff(channel_indexes) == 1):
channel_indexes = slice(channel_indexes[0], channel_indexes[-1] + 1)

raw_chunk = self._get_analogsignal_chunk(block_index, seg_index, i_start, i_stop, stream_index, channel_indexes)
if not self.has_buffer_description_api:
raw_chunk = self._get_analogsignal_chunk(block_index, seg_index, i_start, i_stop, stream_index, channel_indexes)
else:
# use the buffer description
raw_chunk = self._get_analogsignal_chunk_generic(block_index, seg_index, i_start, i_stop, stream_index, channel_indexes)

return raw_chunk

Expand Down Expand Up @@ -1284,6 +1298,7 @@ def _get_signal_size(self, block_index: int, seg_index: int, stream_index: int):

All channels indexed must have the same size and t_start.
"""
# must NOT be implemented if has_buffer_description_api=True
samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved
raise (NotImplementedError)

def _get_signal_t_start(self, block_index: int, seg_index: int, stream_index: int):
Expand Down Expand Up @@ -1311,7 +1326,7 @@ def _get_analogsignal_chunk(
-------
array of samples, with each requested channel in a column
"""

# must NOT be implemented if has_buffer_description_api=True
raise (NotImplementedError)

###
Expand Down Expand Up @@ -1350,6 +1365,117 @@ def _rescale_event_timestamp(self, event_timestamps: np.ndarray, dtype: np.dtype
def _rescale_epoch_duration(self, raw_duration: np.ndarray, dtype: np.dtype):
raise (NotImplementedError)

###
# json buffer api zone
# must be implemented if has_buffer_description_api=True
def get_analogsignal_buffer_description(self, block_index: int = 0, seg_index: int = 0, buffer_id: str = None):
if not self.has_buffer_description_api:
raise ValueError("This reader do not support buffer_description API")
descr = self._get_analogsignal_buffer_description(block_index, seg_index, buffer_id)
return descr

def _get_analogsignal_buffer_description(self, block_index, seg_index, buffer_id):
raise (NotImplementedError)

def _get_signal_size_generic(self, block_index, seg_index, stream_index):
# When has_buffer_description_api=True this used to avoid to write _get_analogsignal_chunk())

buffer_id = self.header["signal_streams"][stream_index]["buffer_id"]
buffer_desc = self._get_analogsignal_buffer_description(block_index, seg_index, buffer_id)

# some hdf5 revert teh buffer
time_axis = buffer_desc.get("time_axis", 0)
return buffer_desc['shape'][time_axis]

def _get_analogsignal_chunk_generic(
self,
block_index: int,
seg_index: int,
i_start: int | None,
i_stop: int | None,
stream_index: int,
channel_indexes: list[int] | None,
):

stream_id = self.header["signal_streams"][stream_index]["id"]
buffer_id = self.header["signal_streams"][stream_index]["buffer_id"]

buffer_slice = self._stream_buffer_slice[stream_id]


# When has_buffer_description_api=True this used to avoid to write _get_analogsignal_chunk())
buffer_desc = self._get_analogsignal_buffer_description(block_index, seg_index, buffer_id)

i_start = i_start or 0
i_stop = i_stop or buffer_desc['shape'][0]

if buffer_desc['type'] == 'binary':

# open files on demand and keep reference to opened file
if not hasattr(self, '_memmap_analogsignal_buffers'):
self._memmap_analogsignal_buffers = {}
if block_index not in self._memmap_analogsignal_buffers:
self._memmap_analogsignal_buffers[block_index] = {}
if seg_index not in self._memmap_analogsignal_buffers[block_index]:
self._memmap_analogsignal_buffers[block_index][seg_index] = {}
if buffer_id not in self._memmap_analogsignal_buffers[block_index][seg_index]:
fid = open(buffer_desc['file_path'], mode='rb')
self._memmap_analogsignal_buffers[block_index][seg_index][buffer_id] = fid
else:
fid = self._memmap_analogsignal_buffers[block_index][seg_index][buffer_id]

num_channels = buffer_desc['shape'][1]

raw_sigs = get_memmap_chunk_from_opened_file(fid, num_channels, i_start, i_stop, np.dtype(buffer_desc['dtype']), file_offset=buffer_desc['file_offset'])


elif buffer_desc['type'] == 'hdf5':

# open files on demand and keep reference to opened file
if not hasattr(self, '_hdf5_analogsignal_buffers'):
self._hdf5_analogsignal_buffers = {}
if block_index not in self._hdf5_analogsignal_buffers:
self._hdf5_analogsignal_buffers[block_index] = {}
if seg_index not in self._hdf5_analogsignal_buffers[block_index]:
self._hdf5_analogsignal_buffers[block_index][seg_index] = {}
if buffer_id not in self._hdf5_analogsignal_buffers[block_index][seg_index]:
import h5py
h5file = h5py.File(buffer_desc['file_path'], mode="r")
self._hdf5_analogsignal_buffers[block_index][seg_index][buffer_id] = h5file
else:
h5file = self._hdf5_analogsignal_buffers[block_index][seg_index][buffer_id]

hdf5_path = buffer_desc["hdf5_path"]
full_raw_sigs = h5file[hdf5_path]

time_axis = buffer_desc.get("time_axis", 0)
if time_axis == 0:
raw_sigs = full_raw_sigs[i_start:i_stop, :]
elif time_axis == 1:
raw_sigs = full_raw_sigs[:, i_start:i_stop].T
else:
raise RuntimeError("Should never happen")

if buffer_slice is not None:
raw_sigs = raw_sigs[:, buffer_slice]



else:
raise NotImplementedError()

# this is a pre slicing when the stream do not contain all channels (for instance spikeglx when load_sync_channel=False)
if buffer_slice is not None:
raw_sigs = raw_sigs[:, buffer_slice]

# channel slice requested
if channel_indexes is not None:
raw_sigs = raw_sigs[:, channel_indexes]


return raw_sigs


samuelgarcia marked this conversation as resolved.
Show resolved Hide resolved

def pprint_vector(vector, lim: int = 8):
vector = np.asarray(vector)
Expand Down
13 changes: 8 additions & 5 deletions neo/rawio/bci2000rawio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
BCI2000RawIO is a class to read BCI2000 .dat files.
https://www.bci2000.org/mediawiki/index.php/Technical_Reference:BCI2000_File_Format

Note : BCI2000RawIO cannot implemented using has_buffer_description_api because the buffer
is not compact. The buffer of signals is not compact (has some interleaved state uint in between channels)
"""

import numpy as np
Expand Down Expand Up @@ -50,9 +53,11 @@ def _parse_header(self):
self.header["nb_block"] = 1
self.header["nb_segment"] = [1]

# one unique stream and buffer
signal_buffers = np.array(("Signals", "0"), dtype=_signal_buffer_dtype)
signal_streams = np.array([("Signals", "0", "0")], dtype=_signal_stream_dtype)
# one unique stream but no buffer because channels are not compact
stream_id = "0"
buffer_id = ""
signal_buffers = np.array([], dtype=_signal_buffer_dtype)
signal_streams = np.array([("Signals", stream_id, buffer_id)], dtype=_signal_stream_dtype)
self.header["signal_buffers"] = signal_buffers
self.header["signal_streams"] = signal_streams

Expand Down Expand Up @@ -80,8 +85,6 @@ def _parse_header(self):
if isinstance(offset, str):
offset = float(offset)

stream_id = "0"
buffer_id = "0"
sig_channels.append((ch_name, chan_id, sr, dtype, units, gain, offset, stream_id, buffer_id))
self.header["signal_channels"] = np.array(sig_channels, dtype=_signal_channel_dtype)

Expand Down
Loading
Loading