Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause error #321

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions bcipy/acquisition/protocols/lsl/connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Utility functions for connecting to an LSL Stream"""
from typing import Optional

from pylsl import StreamInfo, resolve_stream

from bcipy.acquisition.devices import DEFAULT_DEVICE_TYPE, DeviceSpec
from bcipy.acquisition.protocols.lsl.lsl_connector import channel_names


def resolve_device_stream(
device_spec: Optional[DeviceSpec] = None) -> StreamInfo:
"""Get the LSL stream for the given device."""
content_type = device_spec.content_type if device_spec else DEFAULT_DEVICE_TYPE
streams = resolve_stream('type', content_type)
if not streams:
raise Exception(
f'LSL Stream not found for content type {content_type}')
return streams[0]


def device_from_metadata(metadata: StreamInfo) -> DeviceSpec:
"""Create a device_spec from the data stream metadata."""
return DeviceSpec(name=metadata.name(),
channels=channel_names(metadata),
sample_rate=metadata.nominal_srate(),
content_type=metadata.type())
82 changes: 49 additions & 33 deletions bcipy/acquisition/protocols/lsl/lsl_client.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
"""DataAcquisitionClient for LabStreamingLayer data sources."""
import logging
from multiprocessing import Queue
from typing import Dict, List, Optional

import pandas as pd
from pylsl import (StreamInfo, StreamInlet, local_clock, resolve_byprop,
resolve_stream)
from pylsl import StreamInlet, local_clock, resolve_byprop

from bcipy.acquisition.devices import (DEFAULT_DEVICE_TYPE, IRREGULAR_RATE,
DeviceSpec)
from bcipy.acquisition.devices import IRREGULAR_RATE, DeviceSpec
from bcipy.acquisition.exceptions import InvalidClockError
from bcipy.acquisition.protocols.lsl.lsl_connector import (channel_names,
check_device)
from bcipy.acquisition.protocols.lsl.connect import (device_from_metadata,
resolve_device_stream)
from bcipy.acquisition.protocols.lsl.lsl_connector import check_device
from bcipy.acquisition.protocols.lsl.lsl_recorder import LslRecordingThread
from bcipy.acquisition.record import Record
from bcipy.config import MAX_PAUSE_SECONDS
from bcipy.gui.viewer.ring_buffer import RingBuffer
from bcipy.helpers.clock import Clock

log = logging.getLogger(__name__)

LSL_TIMEOUT = 5.0 # seconds


def time_range(stamps: List[float],
precision: int = 3,
sep: str = " to ") -> str:
"""Utility for printing a range of timestamps"""
if stamps:
return "".join([
str(round(stamps[0], precision)), sep,
str(round(stamps[-1], precision))
])
return ""


def request_desc(start: Optional[float], end: Optional[float],
limit: Optional[int]):
"""Returns a description of the request which can be logged."""
start_str = round(start, 3) if start else "None"
end_str = round(end, 3) if end else "None"
return f"Requesting data from: {start_str} to: {end_str} limit: {limit}"


class LslAcquisitionClient:
"""Data Acquisition Client for devices streaming data using Lab Streaming
Expand Down Expand Up @@ -61,8 +84,6 @@ def __init__(self,
def first_sample_time(self) -> float:
"""Timestamp returned by the first sample. If the data is being
recorded this value reflects the timestamp of the first recorded sample"""
if self.recorder:
return self.recorder.first_sample_time
return self._first_sample_time

@property
Expand All @@ -86,34 +107,37 @@ def start_acquisition(self) -> bool:
if self.inlet:
return False

content_type = self.device_spec.content_type if self.device_spec else DEFAULT_DEVICE_TYPE
streams = resolve_stream('type', content_type)
if not streams:
raise Exception(
f'LSL Stream not found for content type {content_type}')
stream_info = streams[0]

stream_info = resolve_device_stream(self.device_spec)
self.inlet = StreamInlet(
stream_info,
max_buflen=4 * self.max_buffer_len, # TODO: revisit this value
max_buflen=MAX_PAUSE_SECONDS,
max_chunklen=1)
log.info("Acquiring data from data stream:")
log.info(self.inlet.info().as_xml())

if self.device_spec:
check_device(self.device_spec, self.inlet.info())
else:
self.device_spec = device_from_metadata(self.inlet.info())

if self.save_directory:
msg_queue = Queue()
self.recorder = LslRecordingThread(
stream_info,
self.save_directory,
self.raw_data_file_name,
self.device_spec)
directory=self.save_directory,
filename=self.raw_data_file_name,
device_spec=self.device_spec,
queue=msg_queue)
self.recorder.start()
log.info("Waiting for first sample from lsl_recorder")
self._first_sample_time = msg_queue.get(block=True,
timeout=LSL_TIMEOUT)
log.info(f"First sample time: {self.first_sample_time}")

self.inlet.open_stream(timeout=LSL_TIMEOUT)
if self.max_buffer_len and self.max_buffer_len > 0:
self.buffer = RingBuffer(size_max=self.max_samples)
_, self._first_sample_time = self.inlet.pull_sample()
if not self._first_sample_time:
_, self._first_sample_time = self.inlet.pull_sample()
return True

def stop_acquisition(self) -> None:
Expand Down Expand Up @@ -178,7 +202,7 @@ def get_data(self,
-------
List of Records
"""
log.info(f"Requesting data from: {start} to: {end} limit: {limit}")
log.info(request_desc(start, end, limit))

data = self.get_latest_data()
if not data:
Expand Down Expand Up @@ -223,12 +247,12 @@ def _pull_chunk(self) -> int:
"""Pull a chunk of samples from LSL and record in the buffer.
Returns the count of samples pulled.
"""
log.debug(f"Pulling from LSL (max_samples: {self.max_samples})")
log.debug(f"\tPulling chunk (max_samples: {self.max_samples})")
# A timeout of 0.0 gets currently available samples without blocking.
samples, timestamps = self.inlet.pull_chunk(
timeout=0.0, max_samples=self.max_samples)
count = len(samples)
log.debug(f"\tReceived {count} samples")
log.debug(f"\t-> received {count} samples: {time_range(timestamps)}")
for sample, stamp in zip(samples, timestamps):
self.buffer.append(Record(sample, stamp))
return count
Expand Down Expand Up @@ -348,7 +372,7 @@ def discover_device_spec(content_type: str) -> DeviceSpec:
"""Finds the first LSL stream with the given content type and creates a
device spec from the stream's metadata."""
log.info(f"Waiting for {content_type} data to be streamed over LSL.")
streams = resolve_byprop('type', content_type, timeout=5.0)
streams = resolve_byprop('type', content_type, timeout=LSL_TIMEOUT)
if not streams:
raise Exception(
f'LSL Stream not found for content type {content_type}')
Expand All @@ -357,11 +381,3 @@ def discover_device_spec(content_type: str) -> DeviceSpec:
spec = device_from_metadata(inlet.info())
inlet.close_stream()
return spec


def device_from_metadata(metadata: StreamInfo) -> DeviceSpec:
"""Create a device_spec from the data stream metadata."""
return DeviceSpec(name=metadata.name(),
channels=channel_names(metadata),
sample_rate=metadata.nominal_srate(),
content_type=metadata.type())
49 changes: 28 additions & 21 deletions bcipy/acquisition/protocols/lsl/lsl_recorder.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Records LSL data streams to a data store."""
import logging
import time
from multiprocessing import Queue
from pathlib import Path
from typing import Optional, List
from typing import List, Optional

from pylsl import StreamInfo, StreamInlet, resolve_streams

from bcipy.acquisition.devices import DeviceSpec
from bcipy.acquisition.protocols.lsl.connect import (device_from_metadata,
resolve_device_stream)
from bcipy.acquisition.protocols.lsl.lsl_connector import (channel_names,
check_device)
from bcipy.acquisition.util import StoppableThread
from bcipy.acquisition.util import StoppableProcess
from bcipy.helpers.raw_data import RawDataWriter

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,8 +42,9 @@ def start(self) -> None:
log.info("Recording data")
# create a thread for each.
self.streams = [
LslRecordingThread(stream, self.path,
self.filenames.get(stream.type(), None))
LslRecordingThread(device_spec=device_from_metadata(StreamInlet(stream).info()),
directory=self.path,
filename=self.filenames.get(stream.type(), None))
for stream in resolve_streams()
]

Expand All @@ -65,29 +69,30 @@ def stop(self, wait: bool = False) -> None:
self.streams = None


class LslRecordingThread(StoppableThread):
class LslRecordingThread(StoppableProcess):
"""Records data for the given LabStreamingLayer (LSL) data stream.

Parameters:
----------
- stream : information about the stream of interest
- device_spec : DeviceSpec ; specifies the device from which to record.
- directory : location to store the recording
- filename : optional, name of the data file.
- device_spec : optional DeviceSpec ; if provided channel labels will come
from here.
- queue : optional multiprocessing queue; if provided the first_sample_time
will be written here when available.
"""

writer: RawDataWriter = None

def __init__(self,
stream_info: StreamInfo,
directory: str,
device_spec: DeviceSpec,
directory: Optional[str] = '.',
filename: Optional[str] = None,
device_spec: Optional[DeviceSpec] = None) -> None:
queue: Optional[Queue] = None) -> None:
super().__init__()
self.stream_info = stream_info

self.directory = directory
self.device_spec = device_spec
self.queue = queue

self.sample_count = 0
# see: https://labstreaminglayer.readthedocs.io/info/faqs.html#chunk-sizes
Expand All @@ -102,8 +107,8 @@ def __init__(self,

def default_filename(self):
"""Default filename to use if a name is not provided."""
content_type = '_'.join(self.stream_info.type().split()).lower()
name = '_'.join(self.stream_info.name().split()).lower()
content_type = '_'.join(self.device_spec.content_type.split()).lower()
name = '_'.join(self.device_spec.name.split()).lower()
return f"{content_type}_data_{name}.csv"

@property
Expand Down Expand Up @@ -132,8 +137,8 @@ def _init_data_writer(self, stream_info: StreamInfo) -> None:
log.info(f"Writing data to {path}")
self.writer = RawDataWriter(
path,
daq_type=self.stream_info.name(),
sample_rate=self.stream_info.nominal_srate(),
daq_type=stream_info.name(),
sample_rate=stream_info.nominal_srate(),
columns=['timestamp'] + channels + ['lsl_timestamp'])
self.writer.__enter__()

Expand All @@ -152,7 +157,6 @@ def _write_chunk(self, data: List, timestamps: List) -> None:
timestamps : list of timestamps
"""
assert self.writer, "Writer not initialized"

chunk = []
for i, sample in enumerate(data):
self.sample_count += 1
Expand All @@ -178,6 +182,8 @@ def _pull_chunk(self, inlet: StreamInlet) -> int:
if timestamps and data:
if not self.first_sample_time:
self.first_sample_time = timestamps[0]
if self.queue:
self.queue.put(self.first_sample_time, timeout=2.0)
self.last_sample_time = timestamps[-1]
self._write_chunk(data, timestamps)
return len(timestamps)
Expand All @@ -194,11 +200,12 @@ def run(self):
given interval, and persists the results. This happens continuously
until the `stop()` method is called.
"""
# Note that self.stream_info does not have the channel names.
inlet = StreamInlet(self.stream_info, max_chunklen=1)
# Note that stream_info does not have the channel names.
stream_info = resolve_device_stream(self.device_spec)
inlet = StreamInlet(stream_info, max_chunklen=1)
full_metadata = inlet.info()

log.info("Acquiring data from data stream:")
log.info("Recording data from data stream:")
log.info(full_metadata.as_xml())

self._reset()
Expand All @@ -219,7 +226,7 @@ def run(self):
while record_count == self.max_chunk_size:
record_count = self._pull_chunk(inlet)

log.info(f"Ending data stream recording for {self.stream_info.name()}")
log.info(f"Ending data stream recording for {stream_info.name()}")
log.info(f"Total recorded seconds: {self.recorded_seconds}")
log.info(f"Total recorded samples: {self.sample_count}")
inlet.close_stream()
Expand Down
6 changes: 4 additions & 2 deletions bcipy/acquisition/tests/protocols/lsl/test_lsl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,14 @@ def test_with_recording(self):
"""Test that recording works."""

temp_dir = tempfile.mkdtemp()
path = Path(temp_dir, f'eeg_data_{DEVICE_NAME.lower()}.csv')
filename = f'eeg_data_{DEVICE_NAME.lower()}.csv'
path = Path(temp_dir, filename)

self.assertFalse(path.exists())

client = LslAcquisitionClient(max_buffer_len=1,
save_directory=temp_dir)
save_directory=temp_dir,
raw_data_file_name=filename)
client.start_acquisition()
time.sleep(0.1)
client.stop_acquisition()
Expand Down
12 changes: 7 additions & 5 deletions bcipy/acquisition/tests/protocols/lsl/test_lsl_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pytest

from bcipy.acquisition.datastream.lsl_server import LslDataServer
from bcipy.acquisition.datastream.mock.eye_tracker_server import eye_tracker_server
from bcipy.acquisition.datastream.mock.eye_tracker_server import \
eye_tracker_server
from bcipy.acquisition.devices import preconfigured_device
from bcipy.acquisition.protocols.lsl.lsl_recorder import LslRecorder
from bcipy.helpers.raw_data import TIMESTAMP_COLUMN, load
Expand All @@ -16,6 +17,7 @@
DEVICE = preconfigured_device(DEVICE_NAME)


@pytest.mark.slow
class TestLslRecorder(unittest.TestCase):
"""Main Test class for LslRecorder code."""

Expand All @@ -39,13 +41,14 @@ def setUp(self):

def test_recorder(self):
"""Test basic recording functionality"""
path = Path(self.temp_dir, f'eeg_data_{DEVICE_NAME.lower()}.csv')
recorder = LslRecorder(path=self.temp_dir)
filename = f'eeg_data_{DEVICE_NAME.lower()}.csv'
path = Path(self.temp_dir, filename)
recorder = LslRecorder(path=self.temp_dir, filenames={'EEG': filename})
self.assertFalse(path.exists())
recorder.start()
time.sleep(0.1)
self.assertTrue(path.exists())
recorder.stop(wait=True)
self.assertTrue(path.exists())

raw_data = load(path)
self.assertEqual(raw_data.daq_type, DEVICE_NAME)
Expand All @@ -54,7 +57,6 @@ def test_recorder(self):
self.assertEqual(raw_data.columns[0], TIMESTAMP_COLUMN)
self.assertEqual(raw_data.columns[1:-1], DEVICE.channels)
self.assertEqual(raw_data.columns[-1], 'lsl_timestamp')
self.assertTrue(len(raw_data.rows) > 0)

def test_multiple_sources(self):
"""Test that recorder works with multiple sources and can be customized
Expand Down
1 change: 1 addition & 0 deletions bcipy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@

# misc configuration
WAIT_SCREEN_MESSAGE = 'Press Space to start or Esc to exit'
MAX_PAUSE_SECONDS = 365
SESSION_COMPLETE_MESSAGE = 'Complete! Saving data...'
REMOTE_SERVER = "https://github.com/CAMBI-tech/BciPy/"
3 changes: 2 additions & 1 deletion bcipy/gui/viewer/data_source/lsl_data_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Streams data from pylsl and puts it into a Queue."""
import pylsl
from bcipy.acquisition.protocols.lsl.lsl_client import device_from_metadata

from bcipy.acquisition.protocols.lsl.connect import device_from_metadata
from bcipy.gui.viewer.data_source.data_source import DataSource


Expand Down
Loading
Loading