From b063c36b00008a0f70febb9e02b64609b5f7c97b Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 19 Feb 2023 12:39:17 +0100 Subject: [PATCH 01/13] Add command to enable or disable the status acknowledgment --- pyshimmer/bluetooth/bt_commands.py | 24 +++++++++++++++++++++++- pyshimmer/bluetooth/bt_const.py | 2 ++ test/bluetooth/test_bt_commands.py | 9 ++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pyshimmer/bluetooth/bt_commands.py b/pyshimmer/bluetooth/bt_commands.py index 0cacbdc..898d7ec 100644 --- a/pyshimmer/bluetooth/bt_commands.py +++ b/pyshimmer/bluetooth/bt_commands.py @@ -192,6 +192,7 @@ def receive(self, ser: BluetoothSerial) -> None: sr = dr2sr(sr_clock) return sr + class GetBatteryCommand(ResponseCommand): """Retrieve the battery state @@ -213,9 +214,10 @@ def receive(self, ser: BluetoothSerial) -> any: batt_voltage = calibrate_u12_adc_value(raw_values, 0, 3.0, 1.0) * 1.988 if (self._in_percent): return battery_voltage_to_percent(batt_voltage) - else: + else: return batt_voltage + class GetConfigTimeCommand(ResponseCommand): """Retrieve the config time that is stored in the Shimmer device configuration file @@ -455,6 +457,26 @@ def __init__(self, dev_name: str): super().__init__(SET_SHIMMERNAME_COMMAND, dev_name) +class EnableStatusAckCommand(ShimmerCommand): + + def __init__(self, enabled: bool): + """Command to enable/disable the ACK byte before status messages + + By default, the Shimmer firmware sends an acknowledgment byte before + sending unsolicited status messages to the host. This confuses the state + machine of the Python API but is always expected by the official Shimmer + software. This command is used by the Python API to automatically disable + the acknowledgment when connecting to a Shimmer. + + :param enabled: If set to True, the acknowledgment is sent. If set to False, + the acknowledgment is not sent. + """ + self._enabled = enabled + + def send(self, ser: BluetoothSerial) -> None: + ser.write_command(ENABLE_STATUS_ACK_COMMAND, " Date: Sun, 19 Feb 2023 13:58:38 +0100 Subject: [PATCH 02/13] Implement firmware version as comparable object --- pyshimmer/bluetooth/bt_api.py | 17 +++++----- pyshimmer/device.py | 45 ++++++++++++++++++++++++ test/bluetooth/test_bluetooth_api.py | 9 ++++- test/test_device.py | 51 +++++++++++++++++++++++++++- 4 files changed, 112 insertions(+), 10 deletions(-) diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index ac9a9eb..3fa8e6d 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -26,7 +26,8 @@ SetExperimentIDCommand, GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, GetBatteryCommand from pyshimmer.bluetooth.bt_const import ACK_COMMAND_PROCESSED, DATA_PACKET, FULL_STATUS_RESPONSE, INSTREAM_CMD_RESPONSE from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import EChannelType, ChDataTypeAssignment, ExGRegister, EFirmwareType, ChannelDataType +from pyshimmer.device import EChannelType, ChDataTypeAssignment, ExGRegister, EFirmwareType, ChannelDataType, \ + FirmwareVersion from pyshimmer.serial_base import ReadAbort from pyshimmer.util import fmt_hex, PeekQueue @@ -385,16 +386,16 @@ def get_status(self) -> List[bool]: """ return self._process_and_wait(GetStatusCommand()) - def get_firmware_version(self) -> Tuple[EFirmwareType, int, int, int]: + def get_firmware_version(self) -> Tuple[EFirmwareType, FirmwareVersion]: """Get the version of the running firmware - :return: A tuple of four values: - - The firmware type as enum, i.e. SDLog, LogAndStream, ... - - the major version as int - - the minor version as int - - the patch level as int + :return: The firmware type as enum, i.e. SDLog or LogAndStream + and the numeric firmware version """ - return self._process_and_wait(GetFirmwareVersionCommand()) + fw_type, major, minor, rel = self._process_and_wait(GetFirmwareVersionCommand()) + fw_version = FirmwareVersion(major, minor, rel) + + return fw_type, fw_version def get_exg_register(self, chip_id: int) -> ExGRegister: """Get the current configuration of one of the two ExG registers of the device diff --git a/pyshimmer/device.py b/pyshimmer/device.py index e5f922b..4a4c483 100644 --- a/pyshimmer/device.py +++ b/pyshimmer/device.py @@ -28,6 +28,51 @@ DEFAULT_BAUDRATE = 115200 +def ensure_firmware_version(func): + def wrapper(self, other): + if not isinstance(other, FirmwareVersion): + return False + + return func(self, other) + + return wrapper + + +class FirmwareVersion: + + def __init__(self, major: int, minor: int, rel: int): + """Represents the version of the Shimmer firmware + + :param major: Major version + :param minor: Minor version + :param rel: Patch level + """ + self.major = major + self.minor = minor + self.rel = rel + self._key = (major, minor, rel) + + @ensure_firmware_version + def __eq__(self, other: "FirmwareVersion") -> bool: + return self._key == other._key + + @ensure_firmware_version + def __gt__(self, other: "FirmwareVersion") -> bool: + return self._key > other._key + + @ensure_firmware_version + def __ge__(self, other: "FirmwareVersion") -> bool: + return self._key >= other._key + + @ensure_firmware_version + def __lt__(self, other: "FirmwareVersion") -> bool: + return self._key < other._key + + @ensure_firmware_version + def __le__(self, other: "FirmwareVersion") -> bool: + return self._key <= other._key + + class ChannelDataType: """Represents the binary data type and format of a Shimmer data channel diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index c14adb8..fab7641 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -21,7 +21,7 @@ from pyshimmer.bluetooth.bt_commands import GetDeviceNameCommand, SetDeviceNameCommand, DataPacket, GetStatusCommand, \ GetStringCommand, ResponseCommand from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import ChDataTypeAssignment, EChannelType +from pyshimmer.device import ChDataTypeAssignment, EChannelType, EFirmwareType, FirmwareVersion from pyshimmer.test_util import PTYSerialMockCreator @@ -401,3 +401,10 @@ def status_handler(new_pkt: List[bool]) -> None: pkt = pkts[0] self.assertEqual(pkt, [False, False, False, False, False, True, False, False]) + + def test_get_firmware_version(self): + self._submit_req_resp_handler(1, b'\xFF\x2F\x03\x00\x01\x00\x02\x03') + fwtype, fwver = self._sot.get_firmware_version() + + self.assertEqual(fwtype, EFirmwareType.LogAndStream) + self.assertEqual(fwver, FirmwareVersion(1, 2, 3)) diff --git a/test/test_device.py b/test/test_device.py index 87ba3f7..7a279fc 100644 --- a/test/test_device.py +++ b/test/test_device.py @@ -18,7 +18,7 @@ from pyshimmer.device import sr2dr, dr2sr, ChannelDataType, ChDataTypeAssignment, SensorChannelAssignment, \ SensorBitAssignments, sec2ticks, ticks2sec, get_ch_dtypes, EChannelType, ExGRegister, ExGMux, get_firmware_type, \ - EFirmwareType, ExGRLDLead, ERLDRef, get_exg_ch, is_exg_ch + EFirmwareType, ExGRLDLead, ERLDRef, get_exg_ch, is_exg_ch, FirmwareVersion def randbytes(k: int) -> bytes: @@ -267,3 +267,52 @@ def do_assert(a: bytes, b: bytes, result: bool) -> None: y = bytearray(x) y[i] = random.randrange(0, 256) do_assert(x, y, False) + + +class FirmwareVersionTest(TestCase): + + def test_version_equality(self): + a = FirmwareVersion(1, 2, 3) + b = FirmwareVersion(1, 2, 3) + c = FirmwareVersion(3, 2, 1) + + self.assertEqual(a, a) + self.assertEqual(a, b) + + self.assertNotEqual(a, None) + self.assertNotEqual(a, False) + self.assertNotEqual(a, 10) + self.assertNotEqual(a, c) + + def test_attributes(self): + ver = FirmwareVersion(1, 2, 3) + self.assertEqual(ver.major, 1) + self.assertEqual(ver.minor, 2) + self.assertEqual(ver.rel, 3) + + def test_greater_less(self): + a = FirmwareVersion(3, 2, 1) + + b = FirmwareVersion(3, 2, 1) + self.assertFalse(b > a) + self.assertTrue(b >= a) + self.assertFalse(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(2, 2, 1) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(3, 1, 1) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(3, 2, 0) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) From 5fb302816a0819fd363a15d2267708d7970b8a6a Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 10:29:01 +0100 Subject: [PATCH 03/13] Split device.py into multiple modules Split firmware version related code into separate submodule Move channel related code from device to dev.channels Move ExG related code from device to dev.exg Move device to dev.base Move methods and comments around --- pyshimmer/__init__.py | 10 +- pyshimmer/bluetooth/bt_api.py | 5 +- pyshimmer/bluetooth/bt_commands.py | 6 +- pyshimmer/bluetooth/bt_const.py | 2 +- pyshimmer/dev/__init__.py | 0 pyshimmer/dev/base.py | 92 +++++ pyshimmer/{device.py => dev/channels.py} | 447 +++-------------------- pyshimmer/dev/exg.py | 203 ++++++++++ pyshimmer/dev/fw_version.py | 66 ++++ pyshimmer/reader/binary_reader.py | 5 +- pyshimmer/reader/reader_const.py | 2 +- pyshimmer/reader/shimmer_reader.py | 5 +- pyshimmer/uart/dock_api.py | 4 +- test/bluetooth/test_bluetooth_api.py | 3 +- test/bluetooth/test_bt_commands.py | 3 +- test/reader/test_binary_reader.py | 3 +- test/reader/test_reader_const.py | 2 +- test/reader/test_shimmer_reader.py | 5 +- test/test_device.py | 16 +- test/uart/test_dock_api.py | 5 +- 20 files changed, 452 insertions(+), 432 deletions(-) create mode 100644 pyshimmer/dev/__init__.py create mode 100644 pyshimmer/dev/base.py rename pyshimmer/{device.py => dev/channels.py} (52%) create mode 100644 pyshimmer/dev/exg.py create mode 100644 pyshimmer/dev/fw_version.py diff --git a/pyshimmer/__init__.py b/pyshimmer/__init__.py index 473a9fe..91b208a 100644 --- a/pyshimmer/__init__.py +++ b/pyshimmer/__init__.py @@ -15,9 +15,11 @@ # along with this program. If not, see . from .bluetooth.bt_api import ShimmerBluetooth from .bluetooth.bt_commands import DataPacket -from .uart.dock_api import ShimmerDock -from .reader.shimmer_reader import ShimmerReader +from .dev.base import DEFAULT_BAUDRATE +from .dev.channels import ChannelDataType, EChannelType +from .dev.exg import ExGMux, ExGRLDLead, ERLDRef, ExGRegister +from .dev.fw_version import EFirmwareType from .reader.binary_reader import ShimmerBinaryReader +from .reader.shimmer_reader import ShimmerReader +from .uart.dock_api import ShimmerDock from .util import fmt_hex -from .device import EChannelType, DEFAULT_BAUDRATE, ChannelDataType, ExGRegister, EFirmwareType, ERLDRef, ExGRLDLead, \ - ExGMux diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index 3fa8e6d..aae3431 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -26,8 +26,9 @@ SetExperimentIDCommand, GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, GetBatteryCommand from pyshimmer.bluetooth.bt_const import ACK_COMMAND_PROCESSED, DATA_PACKET, FULL_STATUS_RESPONSE, INSTREAM_CMD_RESPONSE from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import EChannelType, ChDataTypeAssignment, ExGRegister, EFirmwareType, ChannelDataType, \ - FirmwareVersion +from pyshimmer.dev.channels import ChDataTypeAssignment, ChannelDataType, EChannelType +from pyshimmer.dev.exg import ExGRegister +from pyshimmer.dev.fw_version import EFirmwareType, FirmwareVersion from pyshimmer.serial_base import ReadAbort from pyshimmer.util import fmt_hex, PeekQueue diff --git a/pyshimmer/bluetooth/bt_commands.py b/pyshimmer/bluetooth/bt_commands.py index 898d7ec..c1ace05 100644 --- a/pyshimmer/bluetooth/bt_commands.py +++ b/pyshimmer/bluetooth/bt_commands.py @@ -19,8 +19,10 @@ from pyshimmer.bluetooth.bt_const import * from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import dr2sr, EChannelType, ChannelDataType, sec2ticks, ticks2sec, ExGRegister, \ - get_firmware_type +from pyshimmer.dev.base import dr2sr, sec2ticks, ticks2sec +from pyshimmer.dev.channels import ChannelDataType, EChannelType +from pyshimmer.dev.exg import ExGRegister +from pyshimmer.dev.fw_version import get_firmware_type from pyshimmer.util import bit_is_set, resp_code_to_bytes, calibrate_u12_adc_value, battery_voltage_to_percent diff --git a/pyshimmer/bluetooth/bt_const.py b/pyshimmer/bluetooth/bt_const.py index a9b95ee..9c438b6 100644 --- a/pyshimmer/bluetooth/bt_const.py +++ b/pyshimmer/bluetooth/bt_const.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from pyshimmer.device import EChannelType +from pyshimmer.dev.channels import EChannelType ACK_COMMAND_PROCESSED = 0xFF INSTREAM_CMD_RESPONSE = 0x8A diff --git a/pyshimmer/dev/__init__.py b/pyshimmer/dev/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyshimmer/dev/base.py b/pyshimmer/dev/base.py new file mode 100644 index 0000000..e2991ac --- /dev/null +++ b/pyshimmer/dev/base.py @@ -0,0 +1,92 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from typing import Union, overload + +import numpy as np + +# Device clock rate in ticks per second +DEV_CLOCK_RATE: float = 32768.0 + +DEFAULT_BAUDRATE = 115200 + + +def sr2dr(sr: float) -> int: + """Calculate equivalent device-specific rate for a sample rate in Hz + + Device-specific sample rates are given in absolute clock ticks per unit of time. This function can be used to + calculate such a rate for the Shimmer3. + + Args: + sr(float): The sampling rate in Hz + + Returns: + An integer which represents the equivalent device-specific sampling rate + """ + dr_dec = DEV_CLOCK_RATE / sr + return round(dr_dec) + + +def dr2sr(dr: int): + """Calculate equivalent sampling rate for a given device-specific rate + + Device-specific sample rates are given in absolute clock ticks per unit of time. This function can be used to + calculate a regular sampling rate in Hz from such a rate. + + Args: + dr(int): The absolute device rate as int + + Returns: + A floating-point number that represents the sampling rate in Hz + """ + return DEV_CLOCK_RATE / dr + + +@overload +def sec2ticks(t_sec: float) -> int: ... + + +@overload +def sec2ticks(t_sec: np.ndarray) -> np.ndarray: ... + + +def sec2ticks(t_sec: Union[float, np.ndarray]) -> Union[int, np.ndarray]: + """Calculate equivalent device clock ticks for a time in seconds + + Args: + t_sec: A time in seconds + Returns: + An integer which represents the equivalent number of clock ticks + """ + return round(t_sec * DEV_CLOCK_RATE) + + +@overload +def ticks2sec(t_ticks: int) -> float: ... + + +@overload +def ticks2sec(t_ticks: np.ndarray) -> np.ndarray: ... + + +def ticks2sec(t_ticks: Union[int, np.ndarray]) -> Union[float, np.ndarray]: + """Calculate the time in seconds equivalent to a device clock ticks count + + Args: + t_ticks: A clock tick counter for which to calculate the time in seconds + Returns: + A floating point time in seconds that is equivalent to the number of clock ticks + """ + return t_ticks / DEV_CLOCK_RATE diff --git a/pyshimmer/device.py b/pyshimmer/dev/channels.py similarity index 52% rename from pyshimmer/device.py rename to pyshimmer/dev/channels.py index 4a4c483..58e81c5 100644 --- a/pyshimmer/device.py +++ b/pyshimmer/dev/channels.py @@ -1,76 +1,8 @@ -# pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -import re import struct -from enum import Enum, auto, unique -from typing import Dict, List, Union, overload, Tuple +from enum import unique, Enum, auto +from typing import Dict, List -import numpy as np - -from pyshimmer.util import raise_to_next_pow, unpack, flatten_list, bit_is_set, fmt_hex - -# Device clock rate in ticks per second -DEV_CLOCK_RATE: float = 32768.0 - -DEFAULT_BAUDRATE = 115200 - - -def ensure_firmware_version(func): - def wrapper(self, other): - if not isinstance(other, FirmwareVersion): - return False - - return func(self, other) - - return wrapper - - -class FirmwareVersion: - - def __init__(self, major: int, minor: int, rel: int): - """Represents the version of the Shimmer firmware - - :param major: Major version - :param minor: Minor version - :param rel: Patch level - """ - self.major = major - self.minor = minor - self.rel = rel - self._key = (major, minor, rel) - - @ensure_firmware_version - def __eq__(self, other: "FirmwareVersion") -> bool: - return self._key == other._key - - @ensure_firmware_version - def __gt__(self, other: "FirmwareVersion") -> bool: - return self._key > other._key - - @ensure_firmware_version - def __ge__(self, other: "FirmwareVersion") -> bool: - return self._key >= other._key - - @ensure_firmware_version - def __lt__(self, other: "FirmwareVersion") -> bool: - return self._key < other._key - - @ensure_firmware_version - def __le__(self, other: "FirmwareVersion") -> bool: - return self._key <= other._key +from pyshimmer.util import raise_to_next_pow, unpack, flatten_list class ChannelDataType: @@ -155,227 +87,6 @@ def decode(self, val_bin: bytes) -> any: return unpack(r_tpl) -class ExGMux(Enum): - NORMAL = 0x00 - SHORTED = 0x01 - RLD_MEASURE = 0x02 - MVDD = 0x03 - TEMP_SENSOR = 0x04 - TEST_SIGNAL = 0x05 - RLD_DRP = 0x06 - RLD_DRM = 0x07 - RLD_DRPM = 0x08 - INPUT3 = 0x09 - RESERVED = 0x0A - - -class ExGRLDLead(Enum): - RLD1P = 0x01 << 0 - RLD1N = 0x01 << 1 - RLD2P = 0x01 << 2 - RLD2N = 0x01 << 3 - - -class ERLDRef(Enum): - EXTERNAL = 0x00 - INTERNAL = 0x01 - - -class ExGRegister: - GAIN_MAP = { - 0: 6, 1: 1, 2: 2, 3: 3, 4: 4, 5: 8, 6: 12, - } - DATA_RATE_MAP = { - 0: 125, 1: 250, 2: 500, 3: 1000, 4: 2000, 5: 4000, 6: 8000, 7: -1 - } - PD_BIT = 0x01 << 7 - RLD_PD_BIT = 0x01 << 5 - - def __init__(self, reg_bin: bytes): - if len(reg_bin) < 10: - raise ValueError('Binary register content must have length 10') - - self._reg_bin = reg_bin - - def __str__(self) -> str: - def print_ch(ch_id: int) -> str: - return f'Channel {ch_id + 1:2d}\n' + \ - f'\tPowerdown: {self.is_ch_powerdown(ch_id)}\n' + \ - f'\tGain: {self.get_ch_gain(ch_id):2d}\n' + \ - f'\tMultiplexer: {self.get_ch_mux(ch_id).name} ({self.get_ch_mux_bin(ch_id):#06b})\n' - - def fmt_rld_channels(ch_names) -> str: - ch_names = [ch.name for ch in ch_names] if len(ch_names) > 0 else ['None'] - return ', '.join(ch_names) - - reg_bin_str = fmt_hex(self._reg_bin) - obj_str = f'ExGRegister:\n' + \ - f'Data Rate: {self.data_rate:4d}\n' + \ - f'RLD Powerdown: {self.rld_powerdown}\n' + \ - f'RLD Channels: {fmt_rld_channels(self.rld_channels)}\n' + \ - f'RLD Reference: {self.rld_ref.name}\n' + \ - f'Binary: {reg_bin_str}\n' - obj_str += print_ch(0) - obj_str += print_ch(1) - return obj_str - - def __eq__(self, other: "ExGRegister") -> bool: - return self._reg_bin == other._reg_bin - - @staticmethod - def check_ch_id(ch_id: int) -> None: - if not 0 <= ch_id <= 1: - raise ValueError('Channel ID must be 0 or 1') - - def _get_ch_byte(self, ch_id: int) -> int: - ch_offset = 3 + ch_id - return self._reg_bin[ch_offset] - - def _get_rld_byte(self) -> int: - return self._reg_bin[0x05] - - def get_ch_gain(self, ch_id: int) -> int: - self.check_ch_id(ch_id) - - ch_byte = self._get_ch_byte(ch_id) - gain_bin = (ch_byte & 0x70) >> 4 - - return self.GAIN_MAP[gain_bin] - - def get_ch_mux_bin(self, ch_id: int) -> int: - self.check_ch_id(ch_id) - - ch_byte = self._get_ch_byte(ch_id) - return ch_byte & 0x0F - - def get_ch_mux(self, ch_id: int) -> ExGMux: - return ExGMux(self.get_ch_mux_bin(ch_id)) - - def is_ch_powerdown(self, ch_id: int) -> bool: - self.check_ch_id(ch_id) - - ch_byte = self._get_ch_byte(ch_id) - return bit_is_set(ch_byte, self.PD_BIT) - - @property - def binary(self): - return self._reg_bin - - @property - def ch1_gain(self) -> int: - return self.get_ch_gain(0) - - @property - def ch2_gain(self) -> int: - return self.get_ch_gain(1) - - @property - def ch1_mux(self) -> ExGMux: - return self.get_ch_mux(0) - - @property - def ch2_mux(self) -> ExGMux: - return self.get_ch_mux(1) - - @property - def ch1_powerdown(self) -> bool: - return self.is_ch_powerdown(0) - - @property - def ch2_powerdown(self) -> bool: - return self.is_ch_powerdown(1) - - @property - def data_rate(self) -> int: - dr_bin = self._reg_bin[0] & 0x07 - return self.DATA_RATE_MAP[dr_bin] - - @property - def rld_powerdown(self) -> bool: - rld_byte = self._get_rld_byte() - return not bit_is_set(rld_byte, self.RLD_PD_BIT) - - @property - def rld_channels(self) -> List[ExGRLDLead]: - rld_byte = self._get_rld_byte() - return [ch for ch in ExGRLDLead if bit_is_set(rld_byte, ch.value)] - - @property - def rld_ref(self) -> ERLDRef: - ref_byte = self._reg_bin[9] - rld_ref = (ref_byte >> 1) & 0x01 - return ERLDRef(rld_ref) - - -def sr2dr(sr: float) -> int: - """Calculate equivalent device-specific rate for a sample rate in Hz - - Device-specific sample rates are given in absolute clock ticks per unit of time. This function can be used to - calculate such a rate for the Shimmer3. - - Args: - sr(float): The sampling rate in Hz - - Returns: - An integer which represents the equivalent device-specific sampling rate - """ - dr_dec = DEV_CLOCK_RATE / sr - return round(dr_dec) - - -def dr2sr(dr: int): - """Calculate equivalent sampling rate for a given device-specific rate - - Device-specific sample rates are given in absolute clock ticks per unit of time. This function can be used to - calculate a regular sampling rate in Hz from such a rate. - - Args: - dr(int): The absolute device rate as int - - Returns: - A floating-point number that represents the sampling rate in Hz - """ - return DEV_CLOCK_RATE / dr - - -@overload -def sec2ticks(t_sec: float) -> int: ... - - -@overload -def sec2ticks(t_sec: np.ndarray) -> np.ndarray: ... - - -def sec2ticks(t_sec: Union[float, np.ndarray]) -> Union[int, np.ndarray]: - """Calculate equivalent device clock ticks for a time in seconds - - Args: - t_sec: A time in seconds - Returns: - An integer which represents the equivalent number of clock ticks - """ - return round(t_sec * DEV_CLOCK_RATE) - - -@overload -def ticks2sec(t_ticks: int) -> float: ... - - -@overload -def ticks2sec(t_ticks: np.ndarray) -> np.ndarray: ... - - -def ticks2sec(t_ticks: Union[int, np.ndarray]) -> Union[float, np.ndarray]: - """Calculate the time in seconds equivalent to a device clock ticks count - - Args: - t_ticks: A clock tick counter for which to calculate the time in seconds - Returns: - A floating point time in seconds that is equivalent to the number of clock ticks - """ - return t_ticks / DEV_CLOCK_RATE - - @unique class EChannelType(Enum): """ @@ -426,12 +137,6 @@ class EChannelType(Enum): TIMESTAMP = auto() -class EFirmwareType(Enum): - BtStream = auto() - SDLog = auto() - LogAndStream = auto() - - @unique class ESensorGroup(Enum): """ @@ -484,51 +189,6 @@ class ESensorGroup(Enum): EXG2_16BIT = auto() -ExG_ChType_Chip_Assignment: Dict[EChannelType, Tuple[int, int]] = { - EChannelType.EXG_ADS1292R_1_CH1_24BIT: (0, 0), - EChannelType.EXG_ADS1292R_1_CH1_16BIT: (0, 0), - - EChannelType.EXG_ADS1292R_1_CH2_24BIT: (0, 1), - EChannelType.EXG_ADS1292R_1_CH2_16BIT: (0, 1), - - EChannelType.EXG_ADS1292R_2_CH1_24BIT: (1, 0), - EChannelType.EXG_ADS1292R_2_CH1_16BIT: (1, 0), - - EChannelType.EXG_ADS1292R_2_CH2_24BIT: (1, 1), - EChannelType.EXG_ADS1292R_2_CH2_16BIT: (1, 1), -} - - -def is_exg_ch(ch_type: EChannelType) -> bool: - """ - Returns true if the signal that this channel type describes was recorded by a ExG chip - - Args: - ch_type: The EChannelType of the signal - - Returns: - True if the channel type belongs to the ExG chips, otherwise False - """ - # This is hacky but protected by unit tests - regex = re.compile(r'EXG_ADS1292R_\d_CH\d_\d{2}BIT') - return regex.match(ch_type.name) is not None - - -def get_exg_ch(ch_type: EChannelType) -> Tuple[int, int]: - """ - Each ExG Chip EChannelType originates from a specific ExG chip and channel. This function returns a tuple that - specifices which chip and channel a certain signal EChannelType was recorded with. - - Args: - ch_type: The EChannelType of the signal - - Returns: - A tuple of ints which represents the chip id {0, 1} and the channel id {0, 1}. - - """ - return ExG_ChType_Chip_Assignment[ch_type] - - """ Assigns each channel type its appropriate data type. """ @@ -577,32 +237,6 @@ def get_exg_ch(ch_type: EChannelType) -> Tuple[int, int]: EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True) } -FirmwareTypeValueAssignment = { - 0x01: EFirmwareType.BtStream, - 0x02: EFirmwareType.SDLog, - 0x03: EFirmwareType.LogAndStream, -} - - -def get_firmware_type(f_type: int) -> EFirmwareType: - if f_type not in FirmwareTypeValueAssignment: - raise ValueError(f'Unknown firmware type: 0x{f_type:x}') - - return FirmwareTypeValueAssignment[f_type] - - -def get_ch_dtypes(channels: List[EChannelType]) -> List[ChannelDataType]: - """Return the channel data types for a set of channels - - Args: - channels: A list of channels - Returns: - A list of channel data types with the same order - """ - dtypes = [ChDataTypeAssignment[ch] for ch in channels] - return dtypes - - """ This dictionary contains the mapping from sensor to data channels. Since one sensor can record on multiple channels, the mapping is one-to-many. @@ -639,6 +273,38 @@ def get_ch_dtypes(channels: List[EChannelType]) -> List[ChannelDataType]: ESensorGroup.TEMP: [], } +""" +The sensors are enabled via a multi-byte bitfield that currently stretches a total of three bytes. This dictionary +contains the bitfield position for every sensor in this bitfield. +""" +SensorBitAssignments: Dict[ESensorGroup, int] = { + ESensorGroup.ACCEL_LN: 0x80 << 0 * 8, + ESensorGroup.GYRO: 0x40 << 0 * 8, + ESensorGroup.MAG: 0x20 << 0 * 8, + ESensorGroup.EXG1_24BIT: 0x10 << 0 * 8, + ESensorGroup.EXG2_24BIT: 0x08 << 0 * 8, + ESensorGroup.GSR: 0x04 << 0 * 8, + ESensorGroup.CH_A7: 0x02 << 0 * 8, + ESensorGroup.CH_A6: 0x01 << 0 * 8, + + ESensorGroup.STRAIN: 0x80 << 1 * 8, + # No assignment 0x40 << 1 * 8, + ESensorGroup.BATTERY: 0x20 << 1 * 8, + ESensorGroup.ACCEL_WR: 0x10 << 1 * 8, + ESensorGroup.CH_A15: 0x08 << 1 * 8, + ESensorGroup.CH_A1: 0x04 << 1 * 8, + ESensorGroup.CH_A12: 0x02 << 1 * 8, + ESensorGroup.CH_A13: 0x01 << 1 * 8, + + ESensorGroup.CH_A14: 0x80 << 2 * 8, + ESensorGroup.ACCEL_MPU: 0x40 << 2 * 8, + ESensorGroup.MAG_MPU: 0x20 << 2 * 8, + ESensorGroup.EXG1_16BIT: 0x10 << 2 * 8, + ESensorGroup.EXG2_16BIT: 0x08 << 2 * 8, + ESensorGroup.PRESSURE: 0x04 << 2 * 8, + ESensorGroup.TEMP: 0x02 << 2 * 8, +} + def get_enabled_channels(sensors: List[ESensorGroup]) -> List[EChannelType]: """Determine the set of data channels for a set of enabled sensors @@ -657,36 +323,13 @@ def get_enabled_channels(sensors: List[ESensorGroup]) -> List[EChannelType]: return flatten_list(channels) -""" -The sensors are enabled via a multi-byte bitfield that currently stretches a total of three bytes. This dictionary -contains the bitfield position for every sensor in this bitfield. -""" -# @formatter:off -SensorBitAssignments: Dict[ESensorGroup, int] = { - ESensorGroup.ACCEL_LN: 0x80 << 0 * 8, - ESensorGroup.GYRO: 0x40 << 0 * 8, - ESensorGroup.MAG: 0x20 << 0 * 8, - ESensorGroup.EXG1_24BIT: 0x10 << 0 * 8, - ESensorGroup.EXG2_24BIT: 0x08 << 0 * 8, - ESensorGroup.GSR: 0x04 << 0 * 8, - ESensorGroup.CH_A7: 0x02 << 0 * 8, - ESensorGroup.CH_A6: 0x01 << 0 * 8, - - ESensorGroup.STRAIN: 0x80 << 1 * 8, - # No assignment 0x40 << 1 * 8, - ESensorGroup.BATTERY: 0x20 << 1 * 8, - ESensorGroup.ACCEL_WR: 0x10 << 1 * 8, - ESensorGroup.CH_A15: 0x08 << 1 * 8, - ESensorGroup.CH_A1: 0x04 << 1 * 8, - ESensorGroup.CH_A12: 0x02 << 1 * 8, - ESensorGroup.CH_A13: 0x01 << 1 * 8, - - ESensorGroup.CH_A14: 0x80 << 2 * 8, - ESensorGroup.ACCEL_MPU: 0x40 << 2 * 8, - ESensorGroup.MAG_MPU: 0x20 << 2 * 8, - ESensorGroup.EXG1_16BIT: 0x10 << 2 * 8, - ESensorGroup.EXG2_16BIT: 0x08 << 2 * 8, - ESensorGroup.PRESSURE: 0x04 << 2 * 8, - ESensorGroup.TEMP: 0x02 << 2 * 8, -} -# @formatter:on +def get_ch_dtypes(channels: List[EChannelType]) -> List[ChannelDataType]: + """Return the channel data types for a set of channels + + Args: + channels: A list of channels + Returns: + A list of channel data types with the same order + """ + dtypes = [ChDataTypeAssignment[ch] for ch in channels] + return dtypes diff --git a/pyshimmer/dev/exg.py b/pyshimmer/dev/exg.py new file mode 100644 index 0000000..e36caa7 --- /dev/null +++ b/pyshimmer/dev/exg.py @@ -0,0 +1,203 @@ +import re +from enum import Enum +from typing import List, Dict, Tuple + +from pyshimmer.util import bit_is_set, fmt_hex +from .channels import EChannelType + + +class ExGMux(Enum): + NORMAL = 0x00 + SHORTED = 0x01 + RLD_MEASURE = 0x02 + MVDD = 0x03 + TEMP_SENSOR = 0x04 + TEST_SIGNAL = 0x05 + RLD_DRP = 0x06 + RLD_DRM = 0x07 + RLD_DRPM = 0x08 + INPUT3 = 0x09 + RESERVED = 0x0A + + +class ExGRLDLead(Enum): + RLD1P = 0x01 << 0 + RLD1N = 0x01 << 1 + RLD2P = 0x01 << 2 + RLD2N = 0x01 << 3 + + +class ERLDRef(Enum): + EXTERNAL = 0x00 + INTERNAL = 0x01 + + +class ExGRegister: + GAIN_MAP = { + 0: 6, 1: 1, 2: 2, 3: 3, 4: 4, 5: 8, 6: 12, + } + DATA_RATE_MAP = { + 0: 125, 1: 250, 2: 500, 3: 1000, 4: 2000, 5: 4000, 6: 8000, 7: -1 + } + PD_BIT = 0x01 << 7 + RLD_PD_BIT = 0x01 << 5 + + def __init__(self, reg_bin: bytes): + if len(reg_bin) < 10: + raise ValueError('Binary register content must have length 10') + + self._reg_bin = reg_bin + + def __str__(self) -> str: + def print_ch(ch_id: int) -> str: + return f'Channel {ch_id + 1:2d}\n' + \ + f'\tPowerdown: {self.is_ch_powerdown(ch_id)}\n' + \ + f'\tGain: {self.get_ch_gain(ch_id):2d}\n' + \ + f'\tMultiplexer: {self.get_ch_mux(ch_id).name} ({self.get_ch_mux_bin(ch_id):#06b})\n' + + def fmt_rld_channels(ch_names) -> str: + ch_names = [ch.name for ch in ch_names] if len(ch_names) > 0 else ['None'] + return ', '.join(ch_names) + + reg_bin_str = fmt_hex(self._reg_bin) + obj_str = f'ExGRegister:\n' + \ + f'Data Rate: {self.data_rate:4d}\n' + \ + f'RLD Powerdown: {self.rld_powerdown}\n' + \ + f'RLD Channels: {fmt_rld_channels(self.rld_channels)}\n' + \ + f'RLD Reference: {self.rld_ref.name}\n' + \ + f'Binary: {reg_bin_str}\n' + obj_str += print_ch(0) + obj_str += print_ch(1) + return obj_str + + def __eq__(self, other: "ExGRegister") -> bool: + return self._reg_bin == other._reg_bin + + @staticmethod + def check_ch_id(ch_id: int) -> None: + if not 0 <= ch_id <= 1: + raise ValueError('Channel ID must be 0 or 1') + + def _get_ch_byte(self, ch_id: int) -> int: + ch_offset = 3 + ch_id + return self._reg_bin[ch_offset] + + def _get_rld_byte(self) -> int: + return self._reg_bin[0x05] + + def get_ch_gain(self, ch_id: int) -> int: + self.check_ch_id(ch_id) + + ch_byte = self._get_ch_byte(ch_id) + gain_bin = (ch_byte & 0x70) >> 4 + + return self.GAIN_MAP[gain_bin] + + def get_ch_mux_bin(self, ch_id: int) -> int: + self.check_ch_id(ch_id) + + ch_byte = self._get_ch_byte(ch_id) + return ch_byte & 0x0F + + def get_ch_mux(self, ch_id: int) -> ExGMux: + return ExGMux(self.get_ch_mux_bin(ch_id)) + + def is_ch_powerdown(self, ch_id: int) -> bool: + self.check_ch_id(ch_id) + + ch_byte = self._get_ch_byte(ch_id) + return bit_is_set(ch_byte, self.PD_BIT) + + @property + def binary(self): + return self._reg_bin + + @property + def ch1_gain(self) -> int: + return self.get_ch_gain(0) + + @property + def ch2_gain(self) -> int: + return self.get_ch_gain(1) + + @property + def ch1_mux(self) -> ExGMux: + return self.get_ch_mux(0) + + @property + def ch2_mux(self) -> ExGMux: + return self.get_ch_mux(1) + + @property + def ch1_powerdown(self) -> bool: + return self.is_ch_powerdown(0) + + @property + def ch2_powerdown(self) -> bool: + return self.is_ch_powerdown(1) + + @property + def data_rate(self) -> int: + dr_bin = self._reg_bin[0] & 0x07 + return self.DATA_RATE_MAP[dr_bin] + + @property + def rld_powerdown(self) -> bool: + rld_byte = self._get_rld_byte() + return not bit_is_set(rld_byte, self.RLD_PD_BIT) + + @property + def rld_channels(self) -> List[ExGRLDLead]: + rld_byte = self._get_rld_byte() + return [ch for ch in ExGRLDLead if bit_is_set(rld_byte, ch.value)] + + @property + def rld_ref(self) -> ERLDRef: + ref_byte = self._reg_bin[9] + rld_ref = (ref_byte >> 1) & 0x01 + return ERLDRef(rld_ref) + + +ExG_ChType_Chip_Assignment: Dict[EChannelType, Tuple[int, int]] = { + EChannelType.EXG_ADS1292R_1_CH1_24BIT: (0, 0), + EChannelType.EXG_ADS1292R_1_CH1_16BIT: (0, 0), + + EChannelType.EXG_ADS1292R_1_CH2_24BIT: (0, 1), + EChannelType.EXG_ADS1292R_1_CH2_16BIT: (0, 1), + + EChannelType.EXG_ADS1292R_2_CH1_24BIT: (1, 0), + EChannelType.EXG_ADS1292R_2_CH1_16BIT: (1, 0), + + EChannelType.EXG_ADS1292R_2_CH2_24BIT: (1, 1), + EChannelType.EXG_ADS1292R_2_CH2_16BIT: (1, 1), +} + + +def is_exg_ch(ch_type: EChannelType) -> bool: + """ + Returns true if the signal that this channel type describes was recorded by a ExG chip + + Args: + ch_type: The EChannelType of the signal + + Returns: + True if the channel type belongs to the ExG chips, otherwise False + """ + # This is hacky but protected by unit tests + regex = re.compile(r'EXG_ADS1292R_\d_CH\d_\d{2}BIT') + return regex.match(ch_type.name) is not None + + +def get_exg_ch(ch_type: EChannelType) -> Tuple[int, int]: + """ + Each ExG Chip EChannelType originates from a specific ExG chip and channel. This function returns a tuple that + specifices which chip and channel a certain signal EChannelType was recorded with. + + Args: + ch_type: The EChannelType of the signal + + Returns: + A tuple of ints which represents the chip id {0, 1} and the channel id {0, 1}. + + """ + return ExG_ChType_Chip_Assignment[ch_type] diff --git a/pyshimmer/dev/fw_version.py b/pyshimmer/dev/fw_version.py new file mode 100644 index 0000000..31105fe --- /dev/null +++ b/pyshimmer/dev/fw_version.py @@ -0,0 +1,66 @@ +from enum import Enum, auto + + +def ensure_firmware_version(func): + def wrapper(self, other): + if not isinstance(other, FirmwareVersion): + return False + + return func(self, other) + + return wrapper + + +class FirmwareVersion: + + def __init__(self, major: int, minor: int, rel: int): + """Represents the version of the Shimmer firmware + + :param major: Major version + :param minor: Minor version + :param rel: Patch level + """ + self.major = major + self.minor = minor + self.rel = rel + self._key = (major, minor, rel) + + @ensure_firmware_version + def __eq__(self, other: "FirmwareVersion") -> bool: + return self._key == other._key + + @ensure_firmware_version + def __gt__(self, other: "FirmwareVersion") -> bool: + return self._key > other._key + + @ensure_firmware_version + def __ge__(self, other: "FirmwareVersion") -> bool: + return self._key >= other._key + + @ensure_firmware_version + def __lt__(self, other: "FirmwareVersion") -> bool: + return self._key < other._key + + @ensure_firmware_version + def __le__(self, other: "FirmwareVersion") -> bool: + return self._key <= other._key + + +class EFirmwareType(Enum): + BtStream = auto() + SDLog = auto() + LogAndStream = auto() + + +FirmwareTypeValueAssignment = { + 0x01: EFirmwareType.BtStream, + 0x02: EFirmwareType.SDLog, + 0x03: EFirmwareType.LogAndStream, +} + + +def get_firmware_type(f_type: int) -> EFirmwareType: + if f_type not in FirmwareTypeValueAssignment: + raise ValueError(f'Unknown firmware type: 0x{f_type:x}') + + return FirmwareTypeValueAssignment[f_type] diff --git a/pyshimmer/reader/binary_reader.py b/pyshimmer/reader/binary_reader.py index 3b21453..88069a3 100644 --- a/pyshimmer/reader/binary_reader.py +++ b/pyshimmer/reader/binary_reader.py @@ -18,8 +18,9 @@ import numpy as np -from pyshimmer.device import ESensorGroup, EChannelType, ChannelDataType, SensorBitAssignments, \ - get_enabled_channels, get_ch_dtypes, ExGRegister +from pyshimmer.dev.channels import ESensorGroup, get_ch_dtypes, get_enabled_channels, SensorBitAssignments, \ + ChannelDataType, EChannelType +from pyshimmer.dev.exg import ExGRegister from pyshimmer.util import FileIOBase, unpack, bit_is_set from .reader_const import RTC_CLOCK_DIFF_OFFSET, ENABLED_SENSORS_OFFSET, ENABLED_SENSORS_LEN, SR_OFFSET, \ START_TS_OFFSET, START_TS_LEN, TRIAL_CONFIG_OFFSET, TRIAL_CONFIG_MASTER, TRIAL_CONFIG_SYNC, BLOCK_LEN, \ diff --git a/pyshimmer/reader/reader_const.py b/pyshimmer/reader/reader_const.py index b38fe58..d7781a3 100644 --- a/pyshimmer/reader/reader_const.py +++ b/pyshimmer/reader/reader_const.py @@ -15,7 +15,7 @@ # along with this program. If not, see . from typing import Dict, List -from pyshimmer.device import ESensorGroup +from pyshimmer.dev.channels import ESensorGroup SR_OFFSET = 0x00 diff --git a/pyshimmer/reader/shimmer_reader.py b/pyshimmer/reader/shimmer_reader.py index 4b826c6..6ef451c 100644 --- a/pyshimmer/reader/shimmer_reader.py +++ b/pyshimmer/reader/shimmer_reader.py @@ -18,8 +18,9 @@ import numpy as np -from pyshimmer.device import EChannelType, ticks2sec, dr2sr, ExGRegister, get_exg_ch, ChDataTypeAssignment, is_exg_ch, \ - get_enabled_channels +from pyshimmer.dev.base import ticks2sec, dr2sr +from pyshimmer.dev.channels import ChDataTypeAssignment, get_enabled_channels, EChannelType +from pyshimmer.dev.exg import is_exg_ch, get_exg_ch, ExGRegister from pyshimmer.reader.binary_reader import ShimmerBinaryReader from pyshimmer.reader.reader_const import EXG_ADC_REF_VOLT, EXG_ADC_OFFSET, TRIAXCAL_SENSORS from pyshimmer.util import unwrap diff --git a/pyshimmer/uart/dock_api.py b/pyshimmer/uart/dock_api.py index 64f0416..c1b8c12 100644 --- a/pyshimmer/uart/dock_api.py +++ b/pyshimmer/uart/dock_api.py @@ -18,7 +18,9 @@ from serial import Serial -from pyshimmer.device import sec2ticks, ticks2sec, EFirmwareType, get_firmware_type, ExGRegister +from pyshimmer.dev.base import sec2ticks, ticks2sec +from pyshimmer.dev.exg import ExGRegister +from pyshimmer.dev.fw_version import get_firmware_type, EFirmwareType from pyshimmer.uart.dock_const import * from pyshimmer.uart.dock_serial import DockSerial from pyshimmer.util import unpack diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index fab7641..78821c8 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -21,7 +21,8 @@ from pyshimmer.bluetooth.bt_commands import GetDeviceNameCommand, SetDeviceNameCommand, DataPacket, GetStatusCommand, \ GetStringCommand, ResponseCommand from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import ChDataTypeAssignment, EChannelType, EFirmwareType, FirmwareVersion +from pyshimmer.dev.channels import ChDataTypeAssignment, EChannelType +from pyshimmer.dev.fw_version import FirmwareVersion, EFirmwareType from pyshimmer.test_util import PTYSerialMockCreator diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 66a9b84..74126af 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -22,7 +22,8 @@ StopLoggingCommand, GetEXGRegsCommand, SetEXGRegsCommand, GetExperimentIDCommand, SetExperimentIDCommand, \ GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, DataPacket, ResponseCommand, EnableStatusAckCommand from pyshimmer.bluetooth.bt_serial import BluetoothSerial -from pyshimmer.device import EFirmwareType, EChannelType, ChDataTypeAssignment +from pyshimmer.dev.channels import ChDataTypeAssignment, EChannelType +from pyshimmer.dev.fw_version import EFirmwareType from pyshimmer.test_util import MockSerial diff --git a/test/reader/test_binary_reader.py b/test/reader/test_binary_reader.py index 82563d2..8c37e7b 100644 --- a/test/reader/test_binary_reader.py +++ b/test/reader/test_binary_reader.py @@ -17,7 +17,8 @@ import numpy as np -from pyshimmer.device import ESensorGroup, EChannelType, get_ch_dtypes, ExGRegister +from pyshimmer.dev.channels import ESensorGroup, get_ch_dtypes +from pyshimmer import EChannelType, ExGRegister from pyshimmer.reader.shimmer_reader import ShimmerBinaryReader from .reader_test_util import get_binary_sample_fpath, get_synced_bin_vs_consensys_pair_fpath, get_ecg_sample, \ get_triaxcal_sample diff --git a/test/reader/test_reader_const.py b/test/reader/test_reader_const.py index 0d2ce44..b0791a8 100644 --- a/test/reader/test_reader_const.py +++ b/test/reader/test_reader_const.py @@ -15,7 +15,7 @@ # along with this program. If not, see . from unittest import TestCase -from pyshimmer.device import ESensorGroup +from pyshimmer.dev.channels import ESensorGroup from pyshimmer.reader.reader_const import sort_sensors diff --git a/test/reader/test_shimmer_reader.py b/test/reader/test_shimmer_reader.py index 6a4e444..085c9ea 100644 --- a/test/reader/test_shimmer_reader.py +++ b/test/reader/test_shimmer_reader.py @@ -20,7 +20,10 @@ import numpy as np import pandas as pd -from pyshimmer.device import EChannelType, ticks2sec, get_exg_ch, ExGRegister, ESensorGroup, get_enabled_channels +from pyshimmer import EChannelType, ExGRegister +from pyshimmer.dev.base import ticks2sec +from pyshimmer.dev.channels import ESensorGroup, get_enabled_channels +from pyshimmer.dev.exg import get_exg_ch from pyshimmer.reader.binary_reader import ShimmerBinaryReader from pyshimmer.reader.shimmer_reader import ShimmerReader, SingleChannelProcessor, PPGProcessor, TriAxCalProcessor from .reader_test_util import get_bin_vs_consensys_pair_fpath, get_synced_bin_vs_consensys_pair_fpath, get_ecg_sample, \ diff --git a/test/test_device.py b/test/test_device.py index 7a279fc..87bd948 100644 --- a/test/test_device.py +++ b/test/test_device.py @@ -16,9 +16,11 @@ # along with this program. If not, see . from unittest import TestCase -from pyshimmer.device import sr2dr, dr2sr, ChannelDataType, ChDataTypeAssignment, SensorChannelAssignment, \ - SensorBitAssignments, sec2ticks, ticks2sec, get_ch_dtypes, EChannelType, ExGRegister, ExGMux, get_firmware_type, \ - EFirmwareType, ExGRLDLead, ERLDRef, get_exg_ch, is_exg_ch, FirmwareVersion +from pyshimmer.dev.base import sr2dr, dr2sr, sec2ticks, ticks2sec +from pyshimmer.dev.channels import ChDataTypeAssignment, get_ch_dtypes, SensorChannelAssignment, SensorBitAssignments, \ + ChannelDataType, EChannelType +from pyshimmer.dev.exg import is_exg_ch, get_exg_ch, ExGMux, ExGRLDLead, ERLDRef, ExGRegister +from pyshimmer.dev.fw_version import FirmwareVersion, get_firmware_type, EFirmwareType def randbytes(k: int) -> bytes: @@ -35,7 +37,7 @@ def setUp(self) -> None: def test_channel_enum_uniqueness(self): try: # The exception will trigger upon import if the enum values are not unique - from pyshimmer.device import EChannelType + from pyshimmer.dev.channels import EChannelType except ValueError as e: self.fail(f'Enum not unique: {e}') @@ -130,18 +132,18 @@ def test_get_ch_dtypes(self): def test_sensor_group_uniqueness(self): try: # The exception will trigger upon import if the enum values are not unique - from pyshimmer.device import ESensorGroup + from pyshimmer.dev.channels import ESensorGroup except ValueError as e: self.fail(f'Enum not unique: {e}') def test_datatype_assignments(self): - from pyshimmer.device import EChannelType + from pyshimmer.dev.channels import EChannelType for ch_type in EChannelType: if ch_type not in ChDataTypeAssignment: self.fail(f'No data type assigned to channel type: {ch_type}') def test_sensor_channel_assignments(self): - from pyshimmer.device import ESensorGroup + from pyshimmer.dev.channels import ESensorGroup for sensor in ESensorGroup: if sensor not in SensorChannelAssignment: self.fail(f'No channels assigned to sensor type: {sensor}') diff --git a/test/uart/test_dock_api.py b/test/uart/test_dock_api.py index 7a90d1b..47ff1d3 100644 --- a/test/uart/test_dock_api.py +++ b/test/uart/test_dock_api.py @@ -1,9 +1,8 @@ -from unittest import TestCase from typing import Tuple +from unittest import TestCase -from pyshimmer.uart.dock_api import ShimmerDock +from pyshimmer import EFirmwareType, ShimmerDock from pyshimmer.test_util import MockSerial -from pyshimmer.device import EFirmwareType class DockAPITest(TestCase): From 33b64df1918428b6a95f5978ea06900296f514b7 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 12:05:55 +0100 Subject: [PATCH 04/13] Split test_device.py into multiple modules Move device.base tests into separate module Move dev.channels tests into separate module Move device.exg tests into separate module Move test_device.py to dev/test_device_fw_version.py --- test/dev/__init__.py | 0 test/dev/test_device_base.py | 49 +++++ test/dev/test_device_channels.py | 114 ++++++++++ test/dev/test_device_exg.py | 138 +++++++++++++ test/dev/test_device_fw_version.py | 80 ++++++++ test/test_device.py | 320 ----------------------------- 6 files changed, 381 insertions(+), 320 deletions(-) create mode 100644 test/dev/__init__.py create mode 100644 test/dev/test_device_base.py create mode 100644 test/dev/test_device_channels.py create mode 100644 test/dev/test_device_exg.py create mode 100644 test/dev/test_device_fw_version.py delete mode 100644 test/test_device.py diff --git a/test/dev/__init__.py b/test/dev/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/dev/test_device_base.py b/test/dev/test_device_base.py new file mode 100644 index 0000000..583d042 --- /dev/null +++ b/test/dev/test_device_base.py @@ -0,0 +1,49 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from unittest import TestCase + +from pyshimmer.dev.base import sr2dr, dr2sr, sec2ticks, ticks2sec + + +class DeviceBaseTest(TestCase): + + def test_sr2dr(self): + r = sr2dr(1024.0) + self.assertEqual(r, 32) + + r = sr2dr(500.0) + self.assertEqual(r, 66) + + def test_dr2sr(self): + r = dr2sr(65) + self.assertEqual(round(r), 504) + + r = dr2sr(32) + self.assertEqual(r, 1024.0) + + r = dr2sr(64) + self.assertEqual(r, 512.0) + + def test_sec2ticks(self): + r = sec2ticks(1.0) + self.assertEqual(r, 32768) + + def test_ticks2sec(self): + r = ticks2sec(32768) + self.assertEqual(r, 1.0) + + r = ticks2sec(65536) + self.assertEqual(r, 2.0) diff --git a/test/dev/test_device_channels.py b/test/dev/test_device_channels.py new file mode 100644 index 0000000..d76ddfe --- /dev/null +++ b/test/dev/test_device_channels.py @@ -0,0 +1,114 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from unittest import TestCase + +from pyshimmer.dev.channels import ChDataTypeAssignment, get_ch_dtypes, SensorChannelAssignment, SensorBitAssignments, \ + ChannelDataType, EChannelType + + +class DeviceChannelsTest(TestCase): + + def test_channel_enum_uniqueness(self): + try: + # The exception will trigger upon import if the enum values are not unique + from pyshimmer.dev.channels import EChannelType + except ValueError as e: + self.fail(f'Enum not unique: {e}') + + def test_channel_data_type(self): + def test_both_endianess(byte_val_le: bytes, expected: int, signed: bool): + blen = len(byte_val_le) + dt_le = ChannelDataType(blen, signed=signed, le=True) + dt_be = ChannelDataType(blen, signed=signed, le=False) + + self.assertEqual(expected, dt_le.decode(byte_val_le)) + self.assertEqual(expected, dt_be.decode(byte_val_le[::-1])) + + # Test the property getters + dt = ChannelDataType(3, signed=False, le=True) + self.assertEqual(dt.little_endian, True) + self.assertEqual(dt.big_endian, False) + self.assertEqual(dt.signed, False) + self.assertEqual(dt.size, 3) + + # Test the property getters + dt = ChannelDataType(3, signed=False, le=False) + self.assertEqual(dt.little_endian, False) + self.assertEqual(dt.big_endian, True) + + # Test unsigned decodation for 3 byte data + test_both_endianess(b'\x00\x00\x00', 0x000000, signed=False) + test_both_endianess(b'\x10\x00\x00', 0x000010, signed=False) + test_both_endianess(b'\x00\x00\xFF', 0xFF0000, signed=False) + test_both_endianess(b'\xFF\xFF\xFF', 0xFFFFFF, signed=False) + + # Test signed decodation for 3 byte data + test_both_endianess(b'\xFF\xFF\xFF', -1, signed=True) + test_both_endianess(b'\x00\x00\x80', -2 ** 23, signed=True) + test_both_endianess(b'\xFF\xFF\x7F', 2 ** 23 - 1, signed=True) + test_both_endianess(b'\xFF\x00\x00', 255, signed=True) + + # Test unsigned decodation for 2 byte data + test_both_endianess(b'\x00\x00', 0x0000, signed=False) + test_both_endianess(b'\x10\x00', 0x0010, signed=False) + test_both_endianess(b'\x00\xFF', 0xFF00, signed=False) + test_both_endianess(b'\xFF\xFF', 0xFFFF, signed=False) + + # Test signed decodation for 2 byte data + test_both_endianess(b'\xFF\xFF', -1, signed=True) + test_both_endianess(b'\x00\x80', -2 ** 15, signed=True) + test_both_endianess(b'\xFF\x7F', 2 ** 15 - 1, signed=True) + test_both_endianess(b'\xFF\x00', 255, signed=True) + + def test_get_ch_dtypes(self): + channels = [EChannelType.INTERNAL_ADC_13, EChannelType.GYRO_MPU9150_Y] + r = get_ch_dtypes(channels) + + self.assertEqual(len(r), 2) + first, second = r + + self.assertEqual(first.size, 2) + self.assertEqual(first.little_endian, True) + self.assertEqual(first.signed, False) + + self.assertEqual(second.size, 2) + self.assertEqual(second.little_endian, False) + self.assertEqual(second.signed, True) + + def test_sensor_group_uniqueness(self): + try: + # The exception will trigger upon import if the enum values are not unique + from pyshimmer.dev.channels import ESensorGroup + except ValueError as e: + self.fail(f'Enum not unique: {e}') + + def test_datatype_assignments(self): + from pyshimmer.dev.channels import EChannelType + for ch_type in EChannelType: + if ch_type not in ChDataTypeAssignment: + self.fail(f'No data type assigned to channel type: {ch_type}') + + def test_sensor_channel_assignments(self): + from pyshimmer.dev.channels import ESensorGroup + for sensor in ESensorGroup: + if sensor not in SensorChannelAssignment: + self.fail(f'No channels assigned to sensor type: {sensor}') + + def test_sensor_bit_assignments_uniqueness(self): + for s1 in SensorBitAssignments.keys(): + for s2 in SensorBitAssignments.keys(): + if s1 != s2 and SensorBitAssignments[s1] == SensorBitAssignments[s2]: + self.fail(f'Colliding bitfield assignments for sensor {s1} and {s2}') diff --git a/test/dev/test_device_exg.py b/test/dev/test_device_exg.py new file mode 100644 index 0000000..739b982 --- /dev/null +++ b/test/dev/test_device_exg.py @@ -0,0 +1,138 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import random +from unittest import TestCase + +from pyshimmer.dev.channels import EChannelType +from pyshimmer.dev.exg import is_exg_ch, get_exg_ch, ExGMux, ExGRLDLead, ERLDRef, ExGRegister + + +def randbytes(k: int) -> bytes: + population = list(range(256)) + seq = random.choices(population, k=k) + return bytes(seq) + + +class DeviceExGTest(TestCase): + + def test_get_exg_ch(self): + self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_1_CH1_24BIT), (0, 0)) + self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_1_CH2_24BIT), (0, 1)) + self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_2_CH1_24BIT), (1, 0)) + self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_2_CH2_24BIT), (1, 1)) + + def test_is_exg_ch(self): + from itertools import product + valid_ch = [EChannelType[f'EXG_ADS1292R_{i}_CH{j}_{k}BIT'] for i, j, k in product([1, 2], [1, 2], [16, 24])] + + for ch in EChannelType: + self.assertEqual(is_exg_ch(ch), ch in valid_ch) + + +class ExGRegisterTest(TestCase): + + def setUp(self) -> None: + random.seed(0x42) + + def test_exg_register_fail(self): + self.assertRaises(ValueError, ExGRegister, bytes()) + + def test_exg_register(self): + reg1 = bytes([3, 160, 16, 64, 71, 0, 0, 0, 2, 1]) + reg2 = bytes([0, 171, 16, 21, 21, 0, 0, 0, 2, 1]) + + exg_reg1 = ExGRegister(reg1) + exg_reg2 = ExGRegister(reg2) + + self.assertEqual(exg_reg1.ch1_gain, 4) + self.assertEqual(exg_reg1.ch2_gain, 4) + self.assertEqual(exg_reg1.ch1_mux, ExGMux.NORMAL) + self.assertEqual(exg_reg1.get_ch_mux_bin(0), 0b0000) + self.assertEqual(exg_reg1.ch2_mux, ExGMux.RLD_DRM) + self.assertEqual(exg_reg1.get_ch_mux_bin(1), 0b0111) + self.assertEqual(exg_reg1.ch1_powerdown, False) + self.assertEqual(exg_reg1.ch2_powerdown, False) + self.assertEqual(exg_reg1.data_rate, 1000) + self.assertEqual(exg_reg1.binary, reg1) + + self.assertEqual(exg_reg2.ch1_gain, 1) + self.assertEqual(exg_reg2.ch2_gain, 1) + self.assertEqual(exg_reg2.ch1_mux, ExGMux.TEST_SIGNAL) + self.assertEqual(exg_reg2.ch2_mux, ExGMux.TEST_SIGNAL) + self.assertEqual(exg_reg2.ch1_powerdown, False) + self.assertEqual(exg_reg2.ch2_powerdown, False) + self.assertEqual(exg_reg2.data_rate, 125) + self.assertEqual(exg_reg2.binary, reg2) + + self.assertRaises(ValueError, exg_reg1.get_ch_mux, 2) + self.assertRaises(ValueError, exg_reg1.get_ch_mux, -1) + + def test_exg_register_powerdown(self): + pd = 0x1 << 7 + reg_bin = bytes([3, 160, 16, pd, pd, 0, 0, 0, 2, 1]) + reg = ExGRegister(reg_bin) + + self.assertEqual(reg.ch1_powerdown, True) + self.assertEqual(reg.ch2_powerdown, True) + + def test_exg_register_rld_powerdown(self): + pd = 0x01 << 5 + reg_bin = bytes([0, 0, 0, 0, 0, pd, 0, 0, 0, 0]) + reg = ExGRegister(reg_bin) + + self.assertEqual(reg.rld_powerdown, False) + + def test_exg_register_rld_channels(self): + reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) + reg = ExGRegister(reg_bin) + self.assertEqual(reg.rld_channels, [ExGRLDLead.RLD1P, ExGRLDLead.RLD2P, ExGRLDLead.RLD2N]) + + reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x00, 0x00, 0x00, 0x02, 0x03]) + reg = ExGRegister(reg_bin) + self.assertEqual(reg.rld_channels, []) + + def test_exg_register_rld_ref(self): + reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) + reg = ExGRegister(reg_bin) + self.assertEqual(reg.rld_ref, ERLDRef.INTERNAL) + + reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x01]) + reg = ExGRegister(reg_bin) + self.assertEqual(reg.rld_ref, ERLDRef.EXTERNAL) + + def test_exg_register_print(self): + reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) + reg = ExGRegister(reg_bin) + + str_repr = str(reg) + self.assertTrue('Data Rate: 1000' in str_repr) + self.assertTrue('RLD Powerdown: False' in str_repr) + + def test_equality_operator(self): + def do_assert(a: bytes, b: bytes, result: bool) -> None: + self.assertEqual(ExGRegister(a) == ExGRegister(b), result) + + x = randbytes(10) + y = randbytes(10) + + do_assert(x, y, False) + do_assert(x, x, True) + do_assert(y, y, True) + + for i in range(len(x)): + y = bytearray(x) + y[i] = random.randrange(0, 256) + do_assert(x, y, False) diff --git a/test/dev/test_device_fw_version.py b/test/dev/test_device_fw_version.py new file mode 100644 index 0000000..6311002 --- /dev/null +++ b/test/dev/test_device_fw_version.py @@ -0,0 +1,80 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from unittest import TestCase + +from pyshimmer.dev.fw_version import FirmwareVersion, get_firmware_type, EFirmwareType + + +class DeviceFirmwareVersionTest(TestCase): + + def test_get_firmware_type(self): + r = get_firmware_type(0x01) + self.assertEqual(r, EFirmwareType.BtStream) + r = get_firmware_type(0x02) + self.assertEqual(r, EFirmwareType.SDLog) + r = get_firmware_type(0x03) + self.assertEqual(r, EFirmwareType.LogAndStream) + + self.assertRaises(ValueError, get_firmware_type, 0xFF) + + +class FirmwareVersionTest(TestCase): + + def test_version_equality(self): + a = FirmwareVersion(1, 2, 3) + b = FirmwareVersion(1, 2, 3) + c = FirmwareVersion(3, 2, 1) + + self.assertEqual(a, a) + self.assertEqual(a, b) + + self.assertNotEqual(a, None) + self.assertNotEqual(a, False) + self.assertNotEqual(a, 10) + self.assertNotEqual(a, c) + + def test_attributes(self): + ver = FirmwareVersion(1, 2, 3) + self.assertEqual(ver.major, 1) + self.assertEqual(ver.minor, 2) + self.assertEqual(ver.rel, 3) + + def test_greater_less(self): + a = FirmwareVersion(3, 2, 1) + + b = FirmwareVersion(3, 2, 1) + self.assertFalse(b > a) + self.assertTrue(b >= a) + self.assertFalse(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(2, 2, 1) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(3, 1, 1) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) + + b = FirmwareVersion(3, 2, 0) + self.assertFalse(b > a) + self.assertFalse(b >= a) + self.assertTrue(b < a) + self.assertTrue(b <= a) diff --git a/test/test_device.py b/test/test_device.py deleted file mode 100644 index 87bd948..0000000 --- a/test/test_device.py +++ /dev/null @@ -1,320 +0,0 @@ -# pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -import random -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -from unittest import TestCase - -from pyshimmer.dev.base import sr2dr, dr2sr, sec2ticks, ticks2sec -from pyshimmer.dev.channels import ChDataTypeAssignment, get_ch_dtypes, SensorChannelAssignment, SensorBitAssignments, \ - ChannelDataType, EChannelType -from pyshimmer.dev.exg import is_exg_ch, get_exg_ch, ExGMux, ExGRLDLead, ERLDRef, ExGRegister -from pyshimmer.dev.fw_version import FirmwareVersion, get_firmware_type, EFirmwareType - - -def randbytes(k: int) -> bytes: - population = list(range(256)) - seq = random.choices(population, k=k) - return bytes(seq) - - -class DeviceTest(TestCase): - - def setUp(self) -> None: - random.seed(0x42) - - def test_channel_enum_uniqueness(self): - try: - # The exception will trigger upon import if the enum values are not unique - from pyshimmer.dev.channels import EChannelType - except ValueError as e: - self.fail(f'Enum not unique: {e}') - - def test_sr2dr(self): - r = sr2dr(1024.0) - self.assertEqual(r, 32) - - r = sr2dr(500.0) - self.assertEqual(r, 66) - - def test_dr2sr(self): - r = dr2sr(65) - self.assertEqual(round(r), 504) - - r = dr2sr(32) - self.assertEqual(r, 1024.0) - - r = dr2sr(64) - self.assertEqual(r, 512.0) - - def test_sec2ticks(self): - r = sec2ticks(1.0) - self.assertEqual(r, 32768) - - def test_ticks2sec(self): - r = ticks2sec(32768) - self.assertEqual(r, 1.0) - - r = ticks2sec(65536) - self.assertEqual(r, 2.0) - - def test_channel_data_type(self): - def test_both_endianess(byte_val_le: bytes, expected: int, signed: bool): - blen = len(byte_val_le) - dt_le = ChannelDataType(blen, signed=signed, le=True) - dt_be = ChannelDataType(blen, signed=signed, le=False) - - self.assertEqual(expected, dt_le.decode(byte_val_le)) - self.assertEqual(expected, dt_be.decode(byte_val_le[::-1])) - - # Test the property getters - dt = ChannelDataType(3, signed=False, le=True) - self.assertEqual(dt.little_endian, True) - self.assertEqual(dt.big_endian, False) - self.assertEqual(dt.signed, False) - self.assertEqual(dt.size, 3) - - # Test the property getters - dt = ChannelDataType(3, signed=False, le=False) - self.assertEqual(dt.little_endian, False) - self.assertEqual(dt.big_endian, True) - - # Test unsigned decodation for 3 byte data - test_both_endianess(b'\x00\x00\x00', 0x000000, signed=False) - test_both_endianess(b'\x10\x00\x00', 0x000010, signed=False) - test_both_endianess(b'\x00\x00\xFF', 0xFF0000, signed=False) - test_both_endianess(b'\xFF\xFF\xFF', 0xFFFFFF, signed=False) - - # Test signed decodation for 3 byte data - test_both_endianess(b'\xFF\xFF\xFF', -1, signed=True) - test_both_endianess(b'\x00\x00\x80', -2 ** 23, signed=True) - test_both_endianess(b'\xFF\xFF\x7F', 2 ** 23 - 1, signed=True) - test_both_endianess(b'\xFF\x00\x00', 255, signed=True) - - # Test unsigned decodation for 2 byte data - test_both_endianess(b'\x00\x00', 0x0000, signed=False) - test_both_endianess(b'\x10\x00', 0x0010, signed=False) - test_both_endianess(b'\x00\xFF', 0xFF00, signed=False) - test_both_endianess(b'\xFF\xFF', 0xFFFF, signed=False) - - # Test signed decodation for 2 byte data - test_both_endianess(b'\xFF\xFF', -1, signed=True) - test_both_endianess(b'\x00\x80', -2 ** 15, signed=True) - test_both_endianess(b'\xFF\x7F', 2 ** 15 - 1, signed=True) - test_both_endianess(b'\xFF\x00', 255, signed=True) - - def test_get_ch_dtypes(self): - channels = [EChannelType.INTERNAL_ADC_13, EChannelType.GYRO_MPU9150_Y] - r = get_ch_dtypes(channels) - - self.assertEqual(len(r), 2) - first, second = r - - self.assertEqual(first.size, 2) - self.assertEqual(first.little_endian, True) - self.assertEqual(first.signed, False) - - self.assertEqual(second.size, 2) - self.assertEqual(second.little_endian, False) - self.assertEqual(second.signed, True) - - def test_sensor_group_uniqueness(self): - try: - # The exception will trigger upon import if the enum values are not unique - from pyshimmer.dev.channels import ESensorGroup - except ValueError as e: - self.fail(f'Enum not unique: {e}') - - def test_datatype_assignments(self): - from pyshimmer.dev.channels import EChannelType - for ch_type in EChannelType: - if ch_type not in ChDataTypeAssignment: - self.fail(f'No data type assigned to channel type: {ch_type}') - - def test_sensor_channel_assignments(self): - from pyshimmer.dev.channels import ESensorGroup - for sensor in ESensorGroup: - if sensor not in SensorChannelAssignment: - self.fail(f'No channels assigned to sensor type: {sensor}') - - def test_sensor_bit_assignments_uniqueness(self): - for s1 in SensorBitAssignments.keys(): - for s2 in SensorBitAssignments.keys(): - if s1 != s2 and SensorBitAssignments[s1] == SensorBitAssignments[s2]: - self.fail(f'Colliding bitfield assignments for sensor {s1} and {s2}') - - def test_get_firmware_type(self): - r = get_firmware_type(0x01) - self.assertEqual(r, EFirmwareType.BtStream) - r = get_firmware_type(0x02) - self.assertEqual(r, EFirmwareType.SDLog) - r = get_firmware_type(0x03) - self.assertEqual(r, EFirmwareType.LogAndStream) - - self.assertRaises(ValueError, get_firmware_type, 0xFF) - - def test_get_exg_ch(self): - self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_1_CH1_24BIT), (0, 0)) - self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_1_CH2_24BIT), (0, 1)) - self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_2_CH1_24BIT), (1, 0)) - self.assertEqual(get_exg_ch(EChannelType.EXG_ADS1292R_2_CH2_24BIT), (1, 1)) - - def test_is_exg_ch(self): - from itertools import product - valid_ch = [EChannelType[f'EXG_ADS1292R_{i}_CH{j}_{k}BIT'] for i, j, k in product([1, 2], [1, 2], [16, 24])] - - for ch in EChannelType: - self.assertEqual(is_exg_ch(ch), ch in valid_ch) - - -class ExGRegisterTest(TestCase): - - def test_exg_register_fail(self): - self.assertRaises(ValueError, ExGRegister, bytes()) - - def test_exg_register(self): - reg1 = bytes([3, 160, 16, 64, 71, 0, 0, 0, 2, 1]) - reg2 = bytes([0, 171, 16, 21, 21, 0, 0, 0, 2, 1]) - - exg_reg1 = ExGRegister(reg1) - exg_reg2 = ExGRegister(reg2) - - self.assertEqual(exg_reg1.ch1_gain, 4) - self.assertEqual(exg_reg1.ch2_gain, 4) - self.assertEqual(exg_reg1.ch1_mux, ExGMux.NORMAL) - self.assertEqual(exg_reg1.get_ch_mux_bin(0), 0b0000) - self.assertEqual(exg_reg1.ch2_mux, ExGMux.RLD_DRM) - self.assertEqual(exg_reg1.get_ch_mux_bin(1), 0b0111) - self.assertEqual(exg_reg1.ch1_powerdown, False) - self.assertEqual(exg_reg1.ch2_powerdown, False) - self.assertEqual(exg_reg1.data_rate, 1000) - self.assertEqual(exg_reg1.binary, reg1) - - self.assertEqual(exg_reg2.ch1_gain, 1) - self.assertEqual(exg_reg2.ch2_gain, 1) - self.assertEqual(exg_reg2.ch1_mux, ExGMux.TEST_SIGNAL) - self.assertEqual(exg_reg2.ch2_mux, ExGMux.TEST_SIGNAL) - self.assertEqual(exg_reg2.ch1_powerdown, False) - self.assertEqual(exg_reg2.ch2_powerdown, False) - self.assertEqual(exg_reg2.data_rate, 125) - self.assertEqual(exg_reg2.binary, reg2) - - self.assertRaises(ValueError, exg_reg1.get_ch_mux, 2) - self.assertRaises(ValueError, exg_reg1.get_ch_mux, -1) - - def test_exg_register_powerdown(self): - pd = 0x1 << 7 - reg_bin = bytes([3, 160, 16, pd, pd, 0, 0, 0, 2, 1]) - reg = ExGRegister(reg_bin) - - self.assertEqual(reg.ch1_powerdown, True) - self.assertEqual(reg.ch2_powerdown, True) - - def test_exg_register_rld_powerdown(self): - pd = 0x01 << 5 - reg_bin = bytes([0, 0, 0, 0, 0, pd, 0, 0, 0, 0]) - reg = ExGRegister(reg_bin) - - self.assertEqual(reg.rld_powerdown, False) - - def test_exg_register_rld_channels(self): - reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) - reg = ExGRegister(reg_bin) - self.assertEqual(reg.rld_channels, [ExGRLDLead.RLD1P, ExGRLDLead.RLD2P, ExGRLDLead.RLD2N]) - - reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x00, 0x00, 0x00, 0x02, 0x03]) - reg = ExGRegister(reg_bin) - self.assertEqual(reg.rld_channels, []) - - def test_exg_register_rld_ref(self): - reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) - reg = ExGRegister(reg_bin) - self.assertEqual(reg.rld_ref, ERLDRef.INTERNAL) - - reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x01]) - reg = ExGRegister(reg_bin) - self.assertEqual(reg.rld_ref, ERLDRef.EXTERNAL) - - def test_exg_register_print(self): - reg_bin = bytes([0x03, 0xA8, 0x10, 0x40, 0x40, 0x2D, 0x00, 0x00, 0x02, 0x03]) - reg = ExGRegister(reg_bin) - - str_repr = str(reg) - self.assertTrue('Data Rate: 1000' in str_repr) - self.assertTrue('RLD Powerdown: False' in str_repr) - - def test_equality_operator(self): - def do_assert(a: bytes, b: bytes, result: bool) -> None: - self.assertEqual(ExGRegister(a) == ExGRegister(b), result) - - x = randbytes(10) - y = randbytes(10) - - do_assert(x, y, False) - do_assert(x, x, True) - do_assert(y, y, True) - - for i in range(len(x)): - y = bytearray(x) - y[i] = random.randrange(0, 256) - do_assert(x, y, False) - - -class FirmwareVersionTest(TestCase): - - def test_version_equality(self): - a = FirmwareVersion(1, 2, 3) - b = FirmwareVersion(1, 2, 3) - c = FirmwareVersion(3, 2, 1) - - self.assertEqual(a, a) - self.assertEqual(a, b) - - self.assertNotEqual(a, None) - self.assertNotEqual(a, False) - self.assertNotEqual(a, 10) - self.assertNotEqual(a, c) - - def test_attributes(self): - ver = FirmwareVersion(1, 2, 3) - self.assertEqual(ver.major, 1) - self.assertEqual(ver.minor, 2) - self.assertEqual(ver.rel, 3) - - def test_greater_less(self): - a = FirmwareVersion(3, 2, 1) - - b = FirmwareVersion(3, 2, 1) - self.assertFalse(b > a) - self.assertTrue(b >= a) - self.assertFalse(b < a) - self.assertTrue(b <= a) - - b = FirmwareVersion(2, 2, 1) - self.assertFalse(b > a) - self.assertFalse(b >= a) - self.assertTrue(b < a) - self.assertTrue(b <= a) - - b = FirmwareVersion(3, 1, 1) - self.assertFalse(b > a) - self.assertFalse(b >= a) - self.assertTrue(b < a) - self.assertTrue(b <= a) - - b = FirmwareVersion(3, 2, 0) - self.assertFalse(b > a) - self.assertFalse(b >= a) - self.assertTrue(b < a) - self.assertTrue(b <= a) From 9d6ad0853c47847d6231a81669c2f33ba7e39878 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 12:20:18 +0100 Subject: [PATCH 05/13] Implement class storing firmware capabilities This allows to differentiate between the capabilities of different firmware versions, such as the support for disabling status Acks. --- pyshimmer/dev/fw_version.py | 28 ++++++++++++++++++++++++---- test/dev/test_device_fw_version.py | 11 ++++++++++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/pyshimmer/dev/fw_version.py b/pyshimmer/dev/fw_version.py index 31105fe..f979701 100644 --- a/pyshimmer/dev/fw_version.py +++ b/pyshimmer/dev/fw_version.py @@ -11,6 +11,12 @@ def wrapper(self, other): return wrapper +class EFirmwareType(Enum): + BtStream = auto() + SDLog = auto() + LogAndStream = auto() + + class FirmwareVersion: def __init__(self, major: int, minor: int, rel: int): @@ -46,10 +52,24 @@ def __le__(self, other: "FirmwareVersion") -> bool: return self._key <= other._key -class EFirmwareType(Enum): - BtStream = auto() - SDLog = auto() - LogAndStream = auto() +class FirmwareCapabilities: + + def __init__(self, fw_type: EFirmwareType, version: FirmwareVersion): + self._fw_type = fw_type + self._version = version + + @property + def fw_type(self) -> EFirmwareType: + return self._fw_type + + @property + def version(self) -> FirmwareVersion: + return self._version + + @property + def supports_ack_disable(self) -> bool: + return self._fw_type == EFirmwareType.LogAndStream and \ + self._version >= FirmwareVersion(major=0, minor=15, rel=4) FirmwareTypeValueAssignment = { diff --git a/test/dev/test_device_fw_version.py b/test/dev/test_device_fw_version.py index 6311002..b4074cf 100644 --- a/test/dev/test_device_fw_version.py +++ b/test/dev/test_device_fw_version.py @@ -15,7 +15,7 @@ # along with this program. If not, see . from unittest import TestCase -from pyshimmer.dev.fw_version import FirmwareVersion, get_firmware_type, EFirmwareType +from pyshimmer.dev.fw_version import FirmwareVersion, get_firmware_type, EFirmwareType, FirmwareCapabilities class DeviceFirmwareVersionTest(TestCase): @@ -31,6 +31,15 @@ def test_get_firmware_type(self): self.assertRaises(ValueError, get_firmware_type, 0xFF) +class FirmwareCapabilitiesTest(TestCase): + + def test_capabilities(self): + cap = FirmwareCapabilities(EFirmwareType.LogAndStream, version=FirmwareVersion(1, 2, 3)) + self.assertTrue(cap.supports_ack_disable) + self.assertEqual(cap.version, FirmwareVersion(1, 2, 3)) + self.assertEqual(cap.fw_type, EFirmwareType.LogAndStream) + + class FirmwareVersionTest(TestCase): def test_version_equality(self): From 0b9041d7e66c566ec429aec0710256493e135a66 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 17:15:58 +0100 Subject: [PATCH 06/13] Make Bluetooth Integration setup manual to pass parameters --- test/bluetooth/test_bluetooth_api.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index 78821c8..4f2a435 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -324,12 +324,14 @@ def master_fn(master: BinaryIO, _) -> bytes: return self._submit_handler_fn(master_fn) - def setUp(self) -> None: + def do_setup(self, initialize: bool = True) -> None: self._mock_creator = PTYSerialMockCreator() serial, self._master = self._mock_creator.create_mock() self._sot = ShimmerBluetooth(serial) - self._sot.initialize() + + if initialize: + self._sot.initialize() def tearDown(self) -> None: self._sot.shutdown() @@ -337,16 +339,14 @@ def tearDown(self) -> None: # noinspection PyMethodMayBeStatic def test_context_manager(self): - mock_creator = PTYSerialMockCreator() - serial, master = mock_creator.create_mock() + self.do_setup(initialize=False) - sot = ShimmerBluetooth(serial) - with sot: + with self._sot: pass - mock_creator.close() - def test_get_sampling_rate(self): + self.do_setup() + ftr = self._submit_req_resp_handler(1, b'\xff\x04\x40\x00') r = self._sot.get_sampling_rate() @@ -354,6 +354,8 @@ def test_get_sampling_rate(self): self.assertEqual(r, 512.0) def test_get_data_types(self): + self.do_setup() + ftr = self._submit_req_resp_handler(1, b'\xff\x02\x40\x00\x01\xff\x01\x09\x01\x01\x12') r = self._sot.get_data_types() @@ -361,6 +363,8 @@ def test_get_data_types(self): self.assertEqual(r, [EChannelType.TIMESTAMP, EChannelType.INTERNAL_ADC_13]) def test_streaming(self): + self.do_setup() + pkts = [] def pkt_handler(new_pkt: DataPacket) -> None: @@ -387,6 +391,8 @@ def pkt_handler(new_pkt: DataPacket) -> None: self.assertEqual(pkt[EChannelType.INTERNAL_ADC_13], 1866) def test_status_update(self): + self.do_setup() + pkts = [] def status_handler(new_pkt: List[bool]) -> None: @@ -404,6 +410,8 @@ def status_handler(new_pkt: List[bool]) -> None: self.assertEqual(pkt, [False, False, False, False, False, True, False, False]) def test_get_firmware_version(self): + self.do_setup() + self._submit_req_resp_handler(1, b'\xFF\x2F\x03\x00\x01\x00\x02\x03') fwtype, fwver = self._sot.get_firmware_version() From bd9c6156df6af5078e5ebac3fee7f8284bba0641 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 23:41:41 +0100 Subject: [PATCH 07/13] Implement automatic FW version retrieval in Bluetooth API --- pyshimmer/bluetooth/bt_api.py | 35 ++++++++++++++++++++++++++-- test/bluetooth/test_bluetooth_api.py | 18 ++++++++++++-- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index 0d89954..c71bca6 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -15,7 +15,7 @@ # along with this program. If not, see . from queue import Queue, Empty from threading import Event, Thread -from typing import List, Tuple, Callable, Iterable +from typing import List, Tuple, Callable, Iterable, Optional from serial import Serial @@ -29,7 +29,7 @@ from pyshimmer.bluetooth.bt_serial import BluetoothSerial from pyshimmer.dev.channels import ChDataTypeAssignment, ChannelDataType, EChannelType, ESensorGroup from pyshimmer.dev.exg import ExGRegister -from pyshimmer.dev.fw_version import EFirmwareType, FirmwareVersion +from pyshimmer.dev.fw_version import EFirmwareType, FirmwareVersion, FirmwareCapabilities from pyshimmer.serial_base import ReadAbort from pyshimmer.util import fmt_hex, PeekQueue @@ -267,6 +267,30 @@ def __init__(self, serial: Serial): self._thread = Thread(target=self._run_readloop, daemon=True) + self._initialized = False + self._fw_version: Optional[FirmwareVersion] = None + self._fw_caps: Optional[FirmwareCapabilities] = None + + @property + def initialized(self) -> bool: + """Specifies if the connection was initialized + + This property helps to determine if the capabilities property will return a valid value. + + :return: True if initialize() was called, otherwise False + """ + return self._initialized + + @property + def capabilities(self) -> FirmwareCapabilities: + """Return the capabilities of the device firmware + + This property shall only be accessed after invoking initialize(). + + :return: A FirmwareCapabilities instance representing the version and capabilities of the firmware + """ + return self._fw_caps + def __enter__(self): self.initialize() return self @@ -274,6 +298,10 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_traceback): self.shutdown() + def _set_fw_capabilities(self) -> None: + fw_type, fw_ver = self.get_firmware_version() + self._fw_caps = FirmwareCapabilities(fw_type, fw_ver) + def initialize(self) -> None: """Initialize the reading loop of the API @@ -281,6 +309,9 @@ def initialize(self) -> None: """ self._thread.start() + self._set_fw_capabilities() + self._initialized = True + def shutdown(self) -> None: """Shutdown the read loop diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index 4f2a435..18ed4da 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -331,18 +331,32 @@ def do_setup(self, initialize: bool = True) -> None: self._sot = ShimmerBluetooth(serial) if initialize: + future = self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') self._sot.initialize() + result = future.result() + assert result == b'\x2E' + def tearDown(self) -> None: self._sot.shutdown() self._mock_creator.close() - # noinspection PyMethodMayBeStatic def test_context_manager(self): self.do_setup(initialize=False) + # We prepare the response for the GetFirmwareVersion command issued + # at initialization. + self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') with self._sot: - pass + self.assertTrue(self._sot.initialized) + + def test_version_and_capabilities(self): + self.do_setup() + + self.assertTrue(self._sot.initialized) + self.assertIsNotNone(self._sot.capabilities) + self.assertEqual(self._sot.capabilities.fw_type, EFirmwareType.LogAndStream) + self.assertEqual(self._sot.capabilities.version, FirmwareVersion(0, 11, 0)) def test_get_sampling_rate(self): self.do_setup() From 1805b321288ce7724290424ecdecaf06f488304b Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 25 Feb 2023 23:44:19 +0100 Subject: [PATCH 08/13] Rename the Enable Status Ack Command to Set Status Ack State Command --- pyshimmer/bluetooth/bt_commands.py | 2 +- test/bluetooth/test_bt_commands.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyshimmer/bluetooth/bt_commands.py b/pyshimmer/bluetooth/bt_commands.py index 01f4b0f..7186e61 100644 --- a/pyshimmer/bluetooth/bt_commands.py +++ b/pyshimmer/bluetooth/bt_commands.py @@ -481,7 +481,7 @@ def __init__(self, dev_name: str): super().__init__(SET_SHIMMERNAME_COMMAND, dev_name) -class EnableStatusAckCommand(ShimmerCommand): +class SetStatusAckCommand(ShimmerCommand): def __init__(self, enabled: bool): """Command to enable/disable the ACK byte before status messages diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 2db9e1e..a9e1484 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -20,7 +20,7 @@ GetConfigTimeCommand, SetConfigTimeCommand, GetRealTimeClockCommand, SetRealTimeClockCommand, GetStatusCommand, \ GetFirmwareVersionCommand, InquiryCommand, StartStreamingCommand, StopStreamingCommand, StartLoggingCommand, \ StopLoggingCommand, GetEXGRegsCommand, SetEXGRegsCommand, GetExperimentIDCommand, SetExperimentIDCommand, \ - GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, DataPacket, ResponseCommand, EnableStatusAckCommand, \ + GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, DataPacket, ResponseCommand, SetStatusAckCommand, \ SetSensorsCommand, SetSamplingRateCommand from pyshimmer.bluetooth.bt_serial import BluetoothSerial from pyshimmer.dev.channels import ChDataTypeAssignment, EChannelType, ESensorGroup @@ -195,11 +195,11 @@ def test_set_device_name_command(self): cmd = SetDeviceNameCommand('S_PPG') self.assert_cmd(cmd, b'\x79\x05S_PPG') - def test_enable_status_ack_command(self): - cmd = EnableStatusAckCommand(enabled=True) + def test_set_status_ack_command(self): + cmd = SetStatusAckCommand(enabled=True) self.assert_cmd(cmd, b'\xA3\x01') - cmd = EnableStatusAckCommand(enabled=False) + cmd = SetStatusAckCommand(enabled=False) self.assert_cmd(cmd, b'\xA3\x00') def test_dummy_command(self): From b81a23496f7ca92231bdc8bf12933984f3242dbc Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 26 Feb 2023 00:02:54 +0100 Subject: [PATCH 09/13] Implement automatic disabling of status acknowledgment if supported --- pyshimmer/bluetooth/bt_api.py | 46 +++++++++++++++++++++++----- test/bluetooth/test_bluetooth_api.py | 27 ++++++++++++++-- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index c71bca6..5b22afd 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -24,7 +24,7 @@ GetFirmwareVersionCommand, InquiryCommand, StartStreamingCommand, StopStreamingCommand, DataPacket, \ GetEXGRegsCommand, SetEXGRegsCommand, StartLoggingCommand, StopLoggingCommand, GetExperimentIDCommand, \ SetExperimentIDCommand, GetDeviceNameCommand, SetDeviceNameCommand, DummyCommand, GetBatteryCommand, \ - SetSamplingRateCommand, SetSensorsCommand + SetSamplingRateCommand, SetSensorsCommand, SetStatusAckCommand from pyshimmer.bluetooth.bt_const import ACK_COMMAND_PROCESSED, DATA_PACKET, FULL_STATUS_RESPONSE, INSTREAM_CMD_RESPONSE from pyshimmer.bluetooth.bt_serial import BluetoothSerial from pyshimmer.dev.channels import ChDataTypeAssignment, ChannelDataType, EChannelType, ESensorGroup @@ -256,18 +256,31 @@ def clear_queues(self) -> None: class ShimmerBluetooth: - """Main API for communicating with the Shimmer via Bluetooth - :arg serial: The serial interface to use for communication - """ + def __init__(self, serial: Serial, disable_status_ack: bool = True): + """API for communicating with the Shimmer via Bluetooth + + This class implements support for talking to the Shimmer LogAndStream firmware via Bluetooth. + Each command is encapsulated as a method that can be called to invoke the corresponding command. + All commands are executed synchronously. This means that the method call will block until the + Shimmer has processed the request and responded. - def __init__(self, serial: Serial): + :param serial: The serial channel that encapsulates the rfcomm Bluetooth connection to the Shimmer + :param disable_status_ack: Starting with LogAndStream firmware version 0.15.4, the vanilla firmware + supports disabling the acknowledgment byte before status messages. This removes the need for + running a custom firmware version on the Shimmer. If this flag is set to True, the API will + query the firmware version of the Shimmer and automatically send a command to disable the status + acknowledgment byte at startup. You can set it to True if you don't want this or if it causes + trouble with your firmware version. + """ self._serial = BluetoothSerial(serial) self._bluetooth = BluetoothRequestHandler(self._serial) self._thread = Thread(target=self._run_readloop, daemon=True) self._initialized = False + self._disable_ack = disable_status_ack + self._fw_version: Optional[FirmwareVersion] = None self._fw_caps: Optional[FirmwareCapabilities] = None @@ -303,13 +316,18 @@ def _set_fw_capabilities(self) -> None: self._fw_caps = FirmwareCapabilities(fw_type, fw_ver) def initialize(self) -> None: - """Initialize the reading loop of the API + """Initialize the Bluetooth connection - Initialize the reading loop by starting a new thread to handle all reads asynchronously + This method must be invoked before sending commands to the Shimmer. It queries the Shimmer version, + optionally disables the status acknowledgment and starts the read loop. """ self._thread.start() self._set_fw_capabilities() + + if self.capabilities.supports_ack_disable and self._disable_ack: + self.set_status_ack(enabled=False) + self._initialized = True def shutdown(self) -> None: @@ -383,6 +401,7 @@ def set_sampling_rate(self, sr: float) -> None: def get_battery_state(self, in_percent: bool) -> float: """Retrieve the battery state of the device + :param in_percent: True: calculate battery state in percent; False: calculate battery state in Volt :return: The battery state in percent / Volt """ @@ -554,3 +573,16 @@ def send_ping(self) -> None: The command can be used to test the connection. It does not return anything. """ self._process_and_wait(DummyCommand()) + + def set_status_ack(self, enabled: bool) -> None: + """Send a command to enable or disable the status acknowledgment + + This command should normally not be called directly. If enabled in the constructor, the command + will automatically be sent to the Shimmer if the firmware supports it. It can be used to make + vanilla firmware versions compatible with the state machine of the Python API. + + :param enabled: If set to True, enable status acknowledgment byte. This will make the + firmware incompatible to the Python API. If set to False, disable sending the status ack. + In this state, the firmware is compatible to the Python API. + """ + self._process_and_wait(SetStatusAckCommand(enabled)) diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index 18ed4da..1624c18 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -324,11 +324,11 @@ def master_fn(master: BinaryIO, _) -> bytes: return self._submit_handler_fn(master_fn) - def do_setup(self, initialize: bool = True) -> None: + def do_setup(self, initialize: bool = True, **kwargs) -> None: self._mock_creator = PTYSerialMockCreator() serial, self._master = self._mock_creator.create_mock() - self._sot = ShimmerBluetooth(serial) + self._sot = ShimmerBluetooth(serial, **kwargs) if initialize: future = self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') @@ -346,8 +346,10 @@ def test_context_manager(self): # We prepare the response for the GetFirmwareVersion command issued # at initialization. - self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') + req_future = self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') with self._sot: + req_data = req_future.result() + self.assertEqual(req_data, b'\x2e') self.assertTrue(self._sot.initialized) def test_version_and_capabilities(self): @@ -431,3 +433,22 @@ def test_get_firmware_version(self): self.assertEqual(fwtype, EFirmwareType.LogAndStream) self.assertEqual(fwver, FirmwareVersion(1, 2, 3)) + + def test_status_ack_disable(self): + self.do_setup(initialize=False) + + # Queue response for version command + self._submit_req_resp_handler(1, b'\xFF\x2F\x03\x00\x00\x00\x0F\x04') + # Queue response for disabling the status acknowledgment + req_future = self._submit_req_resp_handler(2, b'\xFF') + + self._sot.initialize() + req_data = req_future.result() + self.assertEqual(req_data, b'\xA3\x00') + + def test_status_ack_not_disable(self): + self.do_setup(initialize=False, disable_status_ack=False) + + # Queue response for version command + self._submit_req_resp_handler(1, b'\xFF\x2F\x03\x00\x00\x00\x0F\x04') + self._sot.initialize() From 46d52278a4fd8362f39b326a48659bbae608c3e4 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 26 Feb 2023 00:06:39 +0100 Subject: [PATCH 10/13] Add license notices where missing --- pyshimmer/dev/__init__.py | 15 +++++++++++++++ pyshimmer/dev/base.py | 2 +- pyshimmer/dev/channels.py | 15 +++++++++++++++ pyshimmer/dev/exg.py | 15 +++++++++++++++ pyshimmer/dev/fw_version.py | 15 +++++++++++++++ test/dev/__init__.py | 15 +++++++++++++++ test/dev/test_device_base.py | 2 +- test/dev/test_device_channels.py | 2 +- test/dev/test_device_exg.py | 2 +- test/dev/test_device_fw_version.py | 2 +- 10 files changed, 80 insertions(+), 5 deletions(-) diff --git a/pyshimmer/dev/__init__.py b/pyshimmer/dev/__init__.py index e69de29..4aed427 100644 --- a/pyshimmer/dev/__init__.py +++ b/pyshimmer/dev/__init__.py @@ -0,0 +1,15 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2023 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/pyshimmer/dev/base.py b/pyshimmer/dev/base.py index e2991ac..1ceee48 100644 --- a/pyshimmer/dev/base.py +++ b/pyshimmer/dev/base.py @@ -1,5 +1,5 @@ # pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel +# Copyright (C) 2023 Lukas Magel # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/pyshimmer/dev/channels.py b/pyshimmer/dev/channels.py index 56a8c5e..7668822 100644 --- a/pyshimmer/dev/channels.py +++ b/pyshimmer/dev/channels.py @@ -1,3 +1,18 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2023 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import struct from enum import Enum, auto, unique from typing import Dict, List, Iterable diff --git a/pyshimmer/dev/exg.py b/pyshimmer/dev/exg.py index e36caa7..9c25f08 100644 --- a/pyshimmer/dev/exg.py +++ b/pyshimmer/dev/exg.py @@ -1,3 +1,18 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2023 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import re from enum import Enum from typing import List, Dict, Tuple diff --git a/pyshimmer/dev/fw_version.py b/pyshimmer/dev/fw_version.py index f979701..3c3ddb9 100644 --- a/pyshimmer/dev/fw_version.py +++ b/pyshimmer/dev/fw_version.py @@ -1,3 +1,18 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2023 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . from enum import Enum, auto diff --git a/test/dev/__init__.py b/test/dev/__init__.py index e69de29..9cd6828 100644 --- a/test/dev/__init__.py +++ b/test/dev/__init__.py @@ -0,0 +1,15 @@ +# pyshimmer - API for Shimmer sensor devices +# Copyright (C) 2020 Lukas Magel + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/test/dev/test_device_base.py b/test/dev/test_device_base.py index 583d042..8e9803c 100644 --- a/test/dev/test_device_base.py +++ b/test/dev/test_device_base.py @@ -1,5 +1,5 @@ # pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel +# Copyright (C) 2023 Lukas Magel # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/test/dev/test_device_channels.py b/test/dev/test_device_channels.py index 3be07b2..7e19001 100644 --- a/test/dev/test_device_channels.py +++ b/test/dev/test_device_channels.py @@ -1,5 +1,5 @@ # pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel +# Copyright (C) 2023 Lukas Magel # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/test/dev/test_device_exg.py b/test/dev/test_device_exg.py index 739b982..a0f8de4 100644 --- a/test/dev/test_device_exg.py +++ b/test/dev/test_device_exg.py @@ -1,5 +1,5 @@ # pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel +# Copyright (C) 2023 Lukas Magel # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/test/dev/test_device_fw_version.py b/test/dev/test_device_fw_version.py index b4074cf..07fae98 100644 --- a/test/dev/test_device_fw_version.py +++ b/test/dev/test_device_fw_version.py @@ -1,5 +1,5 @@ # pyshimmer - API for Shimmer sensor devices -# Copyright (C) 2020 Lukas Magel +# Copyright (C) 2023 Lukas Magel # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by From 1c602b00a82dfc25d63ffea286efa4ac4574fc6c Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 26 Feb 2023 12:02:06 +0100 Subject: [PATCH 11/13] Update the README to reflect the changes --- README.rst | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/README.rst b/README.rst index debb8ff..0f2aec4 100644 --- a/README.rst +++ b/README.rst @@ -4,13 +4,6 @@ pyshimmer: Unofficial Python API for Shimmer Sensor devices .. image:: https://github.com/seemoo-lab/pyshimmer/actions/workflows/build.yml/badge.svg :target: https://github.com/seemoo-lab/pyshimmer -.. image:: https://www.codefactor.io/repository/github/seemoo-lab/pyshimmer/badge/master - :target: https://www.codefactor.io/repository/github/seemoo-lab/pyshimmer/overview/master - :alt: CodeFactor - -.. image:: https://codecov.io/gh/seemoo-lab/pyshimmer/branch/master/graph/badge.svg?token=EHK1ISJH7Z - :target: https://codecov.io/gh/seemoo-lab/pyshimmer - .. contents:: General Information @@ -75,23 +68,40 @@ You can then run the tests from the repository root by simply issuing: Shimmer Firmware ^^^^^^^^^^^^^^^^ -The vanilla version of the `Shimmer3 firmware `_ exhibits several -unfixed bugs (see the `issues page `_ for more information). -Depending on the firmware you intend to use, you will need to compile and run a custom patched version of the firmware. -In the following table, we list the tested firmware versions and their compatibility. +As of version v0.15.4 of the `Shimmer3 firmware `_ the Python API is +fully compatible to the firmware. Older versions of the vanilla firmware exhibit several bugs and are incompatible. +If you intend to use a firmware version older than 0.15.4, you will need to compile and run a custom patched version of +the firmware. In the following table, the firmware versions and their compatibility are listed. + +Compatibility Table +""""""""""""""""""" ============= ========= ============= ====================================================================== Firmware Type Version Compatibility Issues ============= ========= ============= ====================================================================== +LogAndStream v0.15.4 Compatible You will need to use pyshimmer v0.4 or newer + to v0.4.0 LogAndStream v0.11.0 Incompatible - `Issue 7 `_ - `Issue 10 `_ +SDLog v0.21.0 Compatible Untested SDLog v0.19.0 Compatible ============= ========= ============= ====================================================================== -If you want to use the *LogAndStream* firmware with the pyshimmer library, you will need to compile and program a -patched version of the firmware. We provide a forked repository which features the necessary fixes -`here `_. It also contains instructions on how to compile and program the -firmware. +It is recommended to use the newest *LogAndStream* firmware that is compatible to the API. If you want to use an older +version with the pyshimmer library, you will need to compile and program a patched version of the firmware. We provide +a forked repository which features the necessary fixes `here `_. +It also contains instructions on how to compile and program the firmware. + +Notes on Firmware Version 0.15.4 +"""""""""""""""""""""""""""""""" +Starting with Firmware version v0.15.4, +`the race condition issue `_ in the Bluetooth stack has been +fixed. Additionally, the Shimmer3 now supports an additional command to disable the unsolicited status acknowledgment +byte (see `Issue 10 `_). The pyshimmer Bluetooth API tries to +automatically detect if the Shimmer3 runs a firmware newer or equal to v0.15.4 and automatically issues the command +to disable the unsolicited status acknowledgment at startup. You can optionally disable this feature in the constructor. +With this new command, the state machine in the Bluetooth API of pyshimmer is compatible to the vanilla firmware +version. Creating udev rules for persistent device filenames ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ From b822acd19503afbf81c23d73943ce7499b4f8514 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 26 Feb 2023 12:11:50 +0100 Subject: [PATCH 12/13] Readd test that got lost during merging --- test/dev/test_device_channels.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/dev/test_device_channels.py b/test/dev/test_device_channels.py index 7e19001..cc3c329 100644 --- a/test/dev/test_device_channels.py +++ b/test/dev/test_device_channels.py @@ -28,7 +28,7 @@ def test_channel_enum_uniqueness(self): except ValueError as e: self.fail(f'Enum not unique: {e}') - def test_channel_data_type(self): + def test_channel_data_type_decoding(self): def test_both_endianess(byte_val_le: bytes, expected: int, signed: bool): blen = len(byte_val_le) dt_le = ChannelDataType(blen, signed=signed, le=True) @@ -73,6 +73,20 @@ def test_both_endianess(byte_val_le: bytes, expected: int, signed: bool): test_both_endianess(b'\xFF\x7F', 2 ** 15 - 1, signed=True) test_both_endianess(b'\xFF\x00', 255, signed=True) + def test_channel_data_type_encoding(self): + def test_both_endianess(val: int, val_len: int, expected: bytes, signed: bool): + dt_le = ChannelDataType(val_len, signed=signed, le=True) + dt_be = ChannelDataType(val_len, signed=signed, le=False) + + self.assertEqual(expected, dt_le.encode(val)) + self.assertEqual(expected[::-1], dt_be.encode(val)) + + test_both_endianess(0x1234, 2, b'\x34\x12', signed=False) + test_both_endianess(-0x10, 2, b'\xF0\xFF', signed=True) + + test_both_endianess(0x12345, 3, b'\x45\x23\x01', signed=False) + test_both_endianess(-0x12345, 3, b'\xbb\xdc\xfe', signed=True) + def test_get_ch_dtypes(self): channels = [EChannelType.INTERNAL_ADC_13, EChannelType.GYRO_MPU9150_Y] r = get_ch_dtypes(channels) From 7b8faed6dc61de080c78faa1121b44e5d0bbe7b2 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sun, 26 Feb 2023 12:34:03 +0100 Subject: [PATCH 13/13] Add some comments to the Bluetooth API tests --- test/bluetooth/test_bluetooth_api.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/bluetooth/test_bluetooth_api.py b/test/bluetooth/test_bluetooth_api.py index 1624c18..dde8aa3 100644 --- a/test/bluetooth/test_bluetooth_api.py +++ b/test/bluetooth/test_bluetooth_api.py @@ -331,9 +331,12 @@ def do_setup(self, initialize: bool = True, **kwargs) -> None: self._sot = ShimmerBluetooth(serial, **kwargs) if initialize: + # The Bluetooth API automatically requests the firmware version upon initialization. + # We must prepare a proper response beforehand. future = self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') self._sot.initialize() + # Check that it properly asked for the firmware version result = future.result() assert result == b'\x2E' @@ -344,16 +347,19 @@ def tearDown(self) -> None: def test_context_manager(self): self.do_setup(initialize=False) - # We prepare the response for the GetFirmwareVersion command issued - # at initialization. + # The Bluetooth API automatically requests the firmware version upon initialization. + # We must prepare a proper response beforehand. req_future = self._submit_req_resp_handler(req_len=1, resp=b'\xff\x2f\x03\x00\x00\x00\x0b\x00') with self._sot: + # We check that the API properly asked for the firmware version req_data = req_future.result() self.assertEqual(req_data, b'\x2e') + + # It should now be in an initialized state self.assertTrue(self._sot.initialized) def test_version_and_capabilities(self): - self.do_setup() + self.do_setup(initialize=True) self.assertTrue(self._sot.initialized) self.assertIsNotNone(self._sot.capabilities)