diff --git a/pyaer/device.py b/pyaer/device.py
index 64b1fc0..c7b0df2 100644
--- a/pyaer/device.py
+++ b/pyaer/device.py
@@ -1,31 +1,50 @@
from __future__ import annotations
-import abc
+from abc import abstractmethod
from typing import Any
+from typing import Callable
+from typing import TYPE_CHECKING
+
+import numpy as np
from pyaer import libcaer
from pyaer.constants import DEFAULT_MAX_PACKET_INTERVAL
from pyaer.constants import DISABLE_MAX_CONTAINER_SIZE
+from pyaer.dtypes import BiasObjectType
+from pyaer.dtypes import DeviceType
+from pyaer.dtypes import EventType
+from pyaer.dtypes import ModuleAddressType
+from pyaer.dtypes import ParameterAddressType
+from pyaer.utils import load_json
+from pyaer.utils import write_json
-class USBDevice:
- """Base class for all USB devices.
+if TYPE_CHECKING:
+ from pyaer.filters import DVSNoise
- This class is the base of DVS128, DAVIS240, DAVIS346 and DYNAPSE.
- """
+
+class Device:
+ """Generic device."""
def __init__(self) -> None:
self.handle = None
- # functions for get events number and packet functions
- self.get_event_number_funcs = {
+ self.filter_noise = False
+ self.noise_filter: "DVSNoise" | None = None
+
+ self.configs_list: list[
+ tuple[str, ModuleAddressType, ParameterAddressType]
+ ] = []
+
+ # Functions for get events number and packet functions
+ self.get_event_number_funcs: dict[EventType, Callable] = {
libcaer.POLARITY_EVENT: libcaer.caerEventPacketHeaderGetEventNumber,
libcaer.SPECIAL_EVENT: libcaer.caerEventPacketHeaderGetEventNumber,
libcaer.IMU6_EVENT: libcaer.caerEventPacketHeaderGetEventNumber,
libcaer.IMU9_EVENT: libcaer.caerEventPacketHeaderGetEventNumber,
libcaer.SPIKE_EVENT: libcaer.caerEventPacketHeaderGetEventNumber,
}
- self.get_event_packet_funcs = {
+ self.get_event_packet_funcs: dict[EventType, Callable] = {
libcaer.POLARITY_EVENT: libcaer.caerPolarityEventPacketFromPacketHeader,
libcaer.SPECIAL_EVENT: libcaer.caerSpecialEventPacketFromPacketHeader,
libcaer.FRAME_EVENT: libcaer.caerFrameEventPacketFromPacketHeader,
@@ -34,7 +53,7 @@ def __init__(self) -> None:
libcaer.SPIKE_EVENT: libcaer.caerSpikeEventPacketFromPacketHeader,
}
- @abc.abstractmethod
+ @abstractmethod
def obtain_device_info(self, handle: Any) -> None:
"""Obtains device handle.
@@ -46,9 +65,9 @@ def obtain_device_info(self, handle: Any) -> None:
handle: a valid device handle that can be used with the other `libcaer`
functions, or `None` on error.
"""
- raise NotADirectoryError()
+ raise NotImplementedError()
- @abc.abstractmethod
+ @abstractmethod
def get_event(self) -> None:
"""Gets Event.
@@ -57,78 +76,28 @@ def get_event(self) -> None:
"""
raise NotImplementedError()
- def open(
- self,
- device_type,
- device_id=1,
- bus_number_restrict=0,
- dev_address_restrict=0,
- serial_number="",
- ):
- """Open USB deivce.
-
- # Arguments
- device_type: `int`
- Device type:
- `libcaer.CAER_DEVICE_DVS128`,
- `libcaer.CAER_DEVICE_EDVS`,
- `libcaer.CAER_DEVICE_DAVIS`,
- `libcaer.CAER_DEVICE_DAVIS_FX2`,
- `libcaer.CAER_DEVICE_DAVIS_FX3`,
- `libcaer.CAER_DEVICE_DAVIS_RPI`,
- `libcaer.CAER_DEVICE_DYNAPSE`.
- device_id: `int`
- a unique ID to identify the device from others.
- Will be used as the source for EventPackets being
- generate from its data.
- `default is 1`.
- bus_number_restrict: `int`
- restrict the search for viable devices to only this USB
- bus number.
- `default is 0`.
- dev_address_restrict: `int`
- restrict the search for viable devices to only this USB
- device address.
- `default is 0`.
- serial_number: `str`
- restrict the search for viable devices to only devices which do
- possess the given Serial Number in their USB
- SerialNumber descriptor.
- `default is ""`
+ @abstractmethod
+ def open(self) -> None:
+ """Opens a device.
+
+ The acutal implementation depends on the subclass.
"""
- self.handle = libcaer.caerDeviceOpen(
- device_id,
- device_type,
- bus_number_restrict,
- dev_address_restrict,
- serial_number,
- )
- if self.handle is None:
- raise ValueError("The device is failed to open.")
+ raise NotImplementedError()
def close(self) -> None:
"""Closes USB device.
- This method closes an opened USB device if the respective handle is not None.
+ This method closes an opened device if the respective handle is not None.
"""
if self.handle is not None:
libcaer.caerDeviceClose(self.handle)
- def shutdown(self) -> None:
- """Shutdown device.
-
- This method is a combination of `data_stop` and `close`. This is a preferred way
- of shutting down a device.
- """
- self.data_stop()
- self.close()
-
def data_start(self) -> bool:
"""Starts data transmission.
Returns:
- flag: Return `True` if the data transmission is initialized successfully.
- Otherwise `False`.
+ `True` if the data transmission is initialized successfully.
+ Otherwise `False`.
"""
# TODO figure out the parameter meaning
if self.handle is not None:
@@ -147,21 +116,27 @@ def data_stop(self) -> None:
"""
libcaer.caerDeviceDataStop(self.handle)
- def send_default_config(self) -> bool:
- """Send default configuration.
+ def shutdown(self) -> None:
+ """Shuts down device.
- Each type of devices has a set of default configurations (e.g., bias)
- that are pre-defined in the `libcaer` library.
- Note that the default configuration might not be suitable for your needs.
+ This method is a combination of `data_stop` and `close`. This is a preferred way
+ of shutting down a device.
+ """
+ self.data_stop()
+ self.close()
+
+ def send_default_config(self) -> bool:
+ """Sends default configuration.
+ Each type of devices has a set of default configurations (e.g. bias) that are
+ pre-defined in the `libcaer` library. Note that the default configuration might
+ not be suitable for your needs.
Returns:
- flag: Return `True` if the default config is set successfully,
- `False` otherwise.
+ `True` if the default config is set successfully, `False` otherwise.
"""
if self.handle is not None:
- send_success = libcaer.caerDeviceSendDefaultConfig(self.handle)
- return send_success
+ return libcaer.caerDeviceSendDefaultConfig(self.handle)
else:
return False
@@ -170,7 +145,7 @@ def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool:
The main function of setting configurations (e.g., bias).
- # Args:
+ Args:
mod_addr: a module address, used to specify which configuration module one
wants to update. Negative addresses are used for host-side
configuration, while positive addresses (including zero) are used for
@@ -180,26 +155,23 @@ def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool:
(including zero) are allowed.
param: a configuration parameter's new value.
- # Returns:
+ Returns:
`True` if the config is set successfully, `False` otherwise.
"""
if self.handle is not None:
- set_success = libcaer.caerDeviceConfigSet(
- self.handle, mod_addr, param_addr, param
- )
- return set_success
+ return libcaer.caerDeviceConfigSet(self.handle, mod_addr, param_addr, param)
else:
return False
def set_max_container_packet_size(
self, max_packet_size: int = DISABLE_MAX_CONTAINER_SIZE
) -> bool:
- """Set max container packet size.
+ """Sets max container packet size.
Args:
- max_packet_size: set the maximum number of events any of a packet
+ max_packet_size: Set the maximum number of events any of a packet
container's packets may hold before it's made available to the user.
- Set to `pyaer.constants.DISABLE_MAX_CONTAINER_SIZE` to disable.
+ Set to zero to disable. The default is `0`.
"""
return self.set_config(
libcaer.CAER_HOST_CONFIG_PACKETS,
@@ -213,7 +185,7 @@ def set_max_container_interval(
"""Sets max packet interval.
Args:
- max_packet_interval: set the time interval between subsequent packet
+ max_packet_interval: Set the time interval between subsequent packet
containers. Must be at least 1 microsecond. The value is in
microseconds, and is checked across all types of events contained in
the EventPacketContainer. The default is `10000` (10ms or 100 packets/s)
@@ -241,6 +213,36 @@ def set_data_exchange_blocking(self, exchange_blocking: bool = True) -> bool:
exchange_blocking,
)
+ def start_data_stream(
+ self,
+ send_default_config: bool = True,
+ max_packet_size: int | None = None,
+ max_packet_interval: int | None = None,
+ ) -> None:
+ """Starts streaming data.
+
+ Args:
+ send_default_config: Send default config to the device before starting
+ the data streaming. `default is True`
+ max_packet_size: Set the maximum number of events any of a packet
+ container's packets may hold before it's made available to the user.
+ Set to zero to disable. The default is `None` (use default setting: 0).
+ max_packet_interval: Set the time interval between subsequent packet
+ containers. Must be at least 1 microsecond. The value is in
+ microseconds, and is checked across all types of events contained in the
+ EventPacketContainer. The default is `None` (use default setting: 10ms)
+ """
+ if send_default_config:
+ self.send_default_config()
+
+ if max_packet_size is not None:
+ self.set_max_container_packet_size(max_packet_size)
+ if max_packet_interval is not None:
+ self.set_max_container_interval(max_packet_interval)
+
+ self.data_start()
+ self.set_data_exchange_blocking()
+
def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None:
"""Gets Configuration.
@@ -255,7 +257,7 @@ def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None:
Returns:
A configuration parameter's new value. Returns None if the handle is not
- valid.
+ valid.
"""
if self.handle is not None:
return libcaer.caerDeviceConfigGet(self.handle, mod_addr, param_addr)
@@ -266,7 +268,7 @@ def get_packet_container(self) -> tuple[Any | None, int | None]:
"""Gets event packet container.
Returns:
- A tuple of `(packet_container, num_event_packets)`.
+ A tuple of ``(packet_container, num_event_packets)``.
"""
packet_container = libcaer.caerDeviceDataGet(self.handle)
if packet_container is not None:
@@ -287,36 +289,33 @@ def get_packet_header(
idx: the index of the packet header
Returns:
- A tuple of `(packet_header, packet_type)`.
+ A tuple of ``(packet_header, packet_type)``.
"""
packet_header = libcaer.caerEventPacketContainerGetEventPacket(
packet_container, idx
)
- if packet_header is None:
- return (None, None)
- else:
+ if packet_header is not None:
packet_type = libcaer.caerEventPacketHeaderGetEventType(packet_header)
return packet_header, packet_type
+ else:
+ return None, None
+
+ def get_event_packet(
+ self, packet_header: Any, packet_type: EventType
+ ) -> tuple[int, Any]:
+ """Gets event packet from packet header.
- def get_event_packet(self, packet_header, packet_type):
- """Get event packet from packet header.
-
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet.
- packet_type: `caerEventPacketType`
- the type of the event packet, can be one of the following:
- `libcaer.POLARITY_EVENT`,
- `libcaer.SPECIAL_EVENT`,
- `libcaer.FRAME_EVENT`,
- `libcaer.IMU6_EVENT`,
- `libcaer.SPIKE_EVENT`
-
- # Returns
- num_events: `int`
- number of events, return None if there is no events.
- event_packet: `caerEventPacket`
- a packet of events that are ready to be read.
+ Arguments:
+ packet_header: the header that represents a event packet.
+ packet_type: the type of the event packet, can be one of the following:
+ - `libcaer.POLARITY_EVENT`,
+ - `libcaer.SPECIAL_EVENT`,
+ - `libcaer.FRAME_EVENT`,
+ - `libcaer.IMU6_EVENT`,
+ - `libcaer.SPIKE_EVENT`
+
+ Returns:
+ A tuple of ``(num_events, event_packet)``.
"""
num_events = (
self.get_event_number_funcs[packet_type](packet_header)
@@ -332,40 +331,152 @@ def get_event_packet(self, packet_header, packet_type):
return num_events, event_packet
- def get_polarity_event(self, packet_header):
- """Get a packet of polarity event.
-
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 4) where N
- is the number of events in the event packet.
- Each row in the array represents a single polarity event.
- The first number is the timestamp.
- The second number is the X position of the event.
- The third number is the Y position of the event.
- The fourth number represents the polarity of the event
- (positive or negative).
- num_events: `int`
- number of the polarity events available in the packet.
+ def set_bias(self, bias_obj: BiasObjectType) -> None:
+ """Sets bias from bias dictionary.
+
+ Args:
+ bias_obj: Dictionary that contains biases.
+
+ Returns:
+ True if set successful, False otherwise.
+ """
+ for bias_name, module_address, parameter_address in self.configs_list:
+ self.set_config(module_address, parameter_address, bias_obj[bias_name])
+
+ # setting for noise filter
+ if self.noise_filter is not None and self.filter_noise is True:
+ self.noise_filter.set_bias(bias_obj["noise_filter_configs"])
+
+ def get_bias(self) -> BiasObjectType:
+ """Gets bias settings.
+
+ Returns:
+ Dictionary that contains DVS128 current bias settings.
+ """
+ bias_obj: BiasObjectType = {}
+
+ for bias_name, module_address, parameter_address in self.configs_list:
+ bias_obj[bias_name] = self.get_config(module_address, parameter_address)
+
+ # get noise filter configs
+ if self.noise_filter is not None:
+ bias_obj["noise_filter_configs"] = self.noise_filter.get_bias()
+
+ return bias_obj
+
+ def set_bias_from_json(self, file_path: str) -> None:
+ """Set bias from loading JSON configuration file.
+
+ Args:
+ file_path: absolute path of the JSON bias file.
+ """
+ bias_obj = load_json(file_path)
+ self.set_bias(bias_obj)
+
+ def save_bias_to_json(self, file_path: str) -> bool:
+ """Save bias to JSON.
+
+ Args:
+ file_path: the absolute path to the destiation.
+
+ Returns:
+ returns True if success in writing, False otherwise.
+ """
+ bias_obj = self.get_bias()
+ return write_json(file_path, bias_obj)
+
+ def get_polarity_event(self, packet_header: Any) -> tuple[np.ndarray, int]:
+ """Gets a packet of polarity event.
+
+ Args:
+ packet_header: the header that represents a event packet
+ noise_filter: the background activity filter is applied if True.
"""
num_events, polarity = self.get_event_packet(
packet_header, libcaer.POLARITY_EVENT
)
- # TODO: to implement a noise filtering process
- # or reimplement this function into specific classes
+ if self.filter_noise is True and self.noise_filter is not None:
+ polarity = self.noise_filter.apply(polarity)
- events = libcaer.get_polarity_event(polarity, num_events * 4).reshape(
- num_events, 4
+ events = libcaer.get_filtered_polarity_event(
+ polarity, num_events * 5
+ ).reshape(num_events, 5)
+ else:
+ events = libcaer.get_polarity_event(polarity, num_events * 4).reshape(
+ num_events, 4
+ )
+
+ return events, num_events
+
+ def get_special_event(self, packet_header: Any) -> tuple[np.ndarray, int]:
+ """Get a packet of special event.
+
+ Args:
+ packet_header: the header that represents a event packet.
+ """
+ num_events, special = self.get_event_packet(
+ packet_header, libcaer.SPECIAL_EVENT
+ )
+
+ events = libcaer.get_special_event(special, num_events * 2).reshape(
+ num_events, 2
)
return events, num_events
- def get_polarity_hist(self, packet_header, device_type=None):
+
+class USBDevice(Device):
+ """Base class for all USB devices.
+
+ This class is the base of DVS128, DAVIS240, DAVIS346 and DYNAPSE.
+ """
+
+ def __init__(self) -> None:
+ super().__init__()
+
+ def open( # type: ignore
+ self,
+ device_type: DeviceType,
+ device_id: int = 1,
+ bus_number_restrict: int = 0,
+ dev_address_restrict: int = 0,
+ serial_number: str = "",
+ ) -> None:
+ """Open an USB deivce.
+
+ Args:
+ device_type:
+ - `libcaer.CAER_DEVICE_DVS128`,
+ - `libcaer.CAER_DEVICE_DAVIS`,
+ - `libcaer.CAER_DEVICE_DAVIS_FX2`,
+ - `libcaer.CAER_DEVICE_DAVIS_FX3`,
+ - `libcaer.CAER_DEVICE_DAVIS_RPI`,
+ - `libcaer.CAER_DEVICE_DYNAPSE`.
+ device_id: An unique ID to identify the device from others. Will be used as
+ the source for EventPackets being generate from its data.
+ `default is 1`.
+ bus_number_restrict: restrict the search for viable devices to only this USB
+ bus number. `default is 0`.
+ dev_address_restrict: restrict the search for viable devices to only this
+ USB device address. `default is 0`.
+ serial_number: restrict the search for viable devices to only devices which
+ do possess the given Serial Number in their USB SerialNumber descriptor.
+ `default is ""`
+ """
+ self.handle = libcaer.caerDeviceOpen(
+ device_id,
+ device_type,
+ bus_number_restrict,
+ dev_address_restrict,
+ serial_number,
+ )
+ if self.handle is None:
+ raise ValueError("The device is failed to open.")
+
+ def get_polarity_hist(
+ self, packet_header: Any, device_type: str | int | None = None
+ ) -> tuple[np.ndarray, int]:
"""Get the positive and negative histogram for a packet."""
num_events, polarity = self.get_event_packet(
packet_header, libcaer.POLARITY_EVENT
@@ -383,74 +494,26 @@ def get_polarity_hist(self, packet_header, device_type=None):
hist = libcaer.get_polarity_event_histogram_dvxplorer_lite(
polarity, num_events
)
- else:
- return None, 0
return hist, num_events
- def get_counter_neuron_event(self, packet_header, device_type=None):
- """Get the positive and negative histogram for a packet."""
- num_events, polarity = self.get_event_packet(
- packet_header, libcaer.POLARITY_EVENT
- )
-
- if device_type == libcaer.DAVIS_CHIP_DAVIS240C:
- hist = libcaer.get_counter_neuron_frame_240(polarity, num_events)
- elif device_type == libcaer.DAVIS_CHIP_DAVIS346B:
- hist = libcaer.get_polarity_event_histogram_346(polarity, num_events)
- elif device_type == "DVS128":
- hist = libcaer.get_polarity_event_histogram_128(polarity, num_events)
- else:
- return None, 0
-
- return hist, num_events
-
- def get_special_event(self, packet_header):
- """Get a packet of special event.
-
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 2) where N
- is the number of events in the event packet.
- Each row in the array represents a single special event.
- The first value is the timestamp of the event.
- The second value is the special event data.
- num_events: `int`
- number of the special events in the packet.
- """
- num_events, special = self.get_event_packet(
- packet_header, libcaer.SPECIAL_EVENT
- )
-
- events = libcaer.get_special_event(special, num_events * 2).reshape(
- num_events, 2
- )
-
- return events, num_events
-
def get_frame_event(
- self, packet_header, device_type=None, aps_filter_type=libcaer.MONO
- ):
+ self,
+ packet_header: Any,
+ device_type: int | None = None,
+ aps_filter_type: int = libcaer.MONO,
+ ) -> tuple[np.ndarray, int]:
"""Get a packet of frame event.
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- frame_mat: `numpy.ndarray`
- a 2-D array that has the shape of (height, width).
- The height and width of the APS frame is determined by
- the specific DAVIS device (e.g., DAVIS240 will have
- a 180x240 APS frame.
- For DAVIS346Red that has RGB outputs, the output array
- has the shape of (height, width, 3)
- frame_ts: `int`
- the APS frame timestamp.
+ Args:
+ packet_header: the header that represents a event packet
+
+ Returns:
+ frame_mat: a 2-D array that has the shape of (height, width). The height and
+ width of the APS frame is determined by the specific DAVIS device (e.g.,
+ DAVIS240 will have a 180x240 APS frame. For DAVIS346Red that has RGB
+ outputs, the output array has the shape of (height, width, 3)
+ frame_ts: the APS frame timestamp.
"""
_, frame = self.get_event_packet(packet_header, libcaer.FRAME_EVENT)
first_event = libcaer.caerFrameEventPacketGetEventConst(frame, 0)
@@ -479,25 +542,20 @@ def get_frame_event(
return frame_mat, frame_ts
- def get_imu6_event(self, packet_header):
+ def get_imu6_event(self, packet_header: Any) -> tuple[np.ndarray, int]:
"""Get IMU6 event.
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 8) where N
- is the number of IMU6 events in the packet.
- Each row of the array consists a single IMU6 event.
- The first value is the timestamp of the event.
- The next three values are accelerations on the X, Y, and Z
- axes. The next three values are angular velocity
- on the X, Y and Z axes.
- The last value is the temperature in Celsius scale.
- num_events: `int`
- number of the IMU6 events.
+ # Args:
+ packet_header: the header that represents a event packet
+
+ # Returns:
+ events: a 2-D array that has the shape of (N, 8) where N is the number of
+ IMU6 events in the packet. Each row of the array consists a single IMU6
+ event. The first value is the timestamp of the event. The next three
+ values are accelerations on the X, Y, and Z axes. The next three values
+ are angular velocity on the X, Y and Z axes. The last value is the
+ temperature in Celsius scale.
+ num_events: number of the IMU6 events.
"""
num_events, imu = self.get_event_packet(packet_header, libcaer.IMU6_EVENT)
@@ -505,26 +563,21 @@ def get_imu6_event(self, packet_header):
return events, num_events
- def get_imu9_event(self, packet_header):
+ def get_imu9_event(self, packet_header: Any) -> tuple[np.ndarray, int]:
"""Get IMU9 event.
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 11) where N
- is the number of IMU9 events in the packet.
- Each row of the array consists a single IMU9 event.
- The first value is the timestamp of the event.
- The next three values are accelerations on the X, Y, and Z
- axes. The next three values are angular velocity
- on the X, Y and Z axes. The next three values are
- X, Y, Z axis compass heading.
- The last value is the temperature in Celsius scale.
- num_events: `int`
- number of the IMU9 events.
+ # Args:
+ packet_header: the header that represents a event packet
+
+ # Returns:
+ events: a 2-D array that has the shape of (N, 11) where N is the number of
+ IMU9 events in the packet. Each row of the array consists a single IMU9
+ event. The first value is the timestamp of the event. The next three
+ values are accelerations on the X, Y, and Z axes. The next three values
+ are angular velocity on the X, Y and Z axes. The next three values are
+ X, Y, Z axis compass heading. The last value is the temperature in
+ Celsius scale.
+ num_events: number of the IMU9 events.
"""
num_events, imu = self.get_event_packet(packet_header, libcaer.IMU9_EVENT)
@@ -532,97 +585,54 @@ def get_imu9_event(self, packet_header):
return events, num_events
- def get_spike_event(self, packet_header):
+ def get_spike_event(self, packet_header: Any) -> tuple[np.ndarray, int]:
"""Get Spike Event.
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 4) where N
- is the number of spike events in the packet.
- Each row of the array has a single spike event.
- The first value is the timestamp of the event.
- The second value is the neuron ID.
- The third value is the chip ID.
- The last value is the source core ID.
- num_events: `int`
- the number of the spike events.
+ # Args:
+ packet_header: the header that represents a event packet
+
+ # Returns:
+ events: a 2-D array that has the shape of (N, 4) where N is the number of
+ spike events in the packet. Each row of the array has a single spike
+ event. The first value is the timestamp of the event. The second value
+ is the neuron ID. The third value is the chip ID. The last value is the
+ source core ID.
+ num_events: the number of the spike events.
"""
- num_events, spike = self.get_event_packet(packet_header, self.SPIKE_EVENT)
+ num_events, spike = self.get_event_packet(packet_header, libcaer.SPIKE_EVENT)
events = libcaer.get_spike_event(spike, num_events * 4).reshape(num_events, 4)
return events, num_events
-class SerialDevice(object):
+class SerialDevice(Device):
"""Base class for serial devices.
The base class for devices that use the serial port. eDVS is the only current
supported device in this family.
"""
- def __init__(self):
- """Device."""
- self.handle = None
-
- @abc.abstractmethod
- def obtain_device_info(self, handle):
- """Obtain device handle.
-
- This abstract method should be implemented in all derived classes.
- This method collects the general information about the USB device
- such as the width and height of the camera or the serial number
- of the device.
-
- # Arguments
- handle: `caerDeviceHandle`
- a valid device handle that can be used with the other
- `libcaer` functions, or `None` on error.
- """
- return
-
- @abc.abstractmethod
- def get_event(self):
- """Get Event.
-
- This abstract method should be implemented in all derived classes. This method
- returns a packet of events according to the type of the sensor.
- """
- return
+ def __init__(self) -> None:
+ super().__init__()
- def open(
+ def open( # type: ignore
self,
- device_type,
- device_id=1,
- serial_port_name="/dev/ttyUSB0",
- serial_baud_rate=libcaer.CAER_HOST_CONFIG_SERIAL_BAUD_RATE_12M,
- ):
- """Open USB deivce.
-
- # Arguments
- device_type: `int`
- Device type:
- `libcaer.CAER_DEVICE_DVS128`,
- `libcaer.CAER_DEVICE_EDVS`,
- `libcaer.CAER_DEVICE_DAVIS`,
- `libcaer.CAER_DEVICE_DAVIS_FX2`,
- `libcaer.CAER_DEVICE_DAVIS_FX3`,
- `libcaer.CAER_DEVICE_DAVIS_RPI`,
- `libcaer.CAER_DEVICE_DYNAPSE`.
- device_id: `int`
- a unique ID to identify the device from others.
- Will be used as the source for EventPackets being
- generate from its data.
+ device_type: DeviceType,
+ device_id: int = 1,
+ serial_port_name: str = "/dev/ttyUSB0",
+ serial_baud_rate: int = libcaer.CAER_HOST_CONFIG_SERIAL_BAUD_RATE_12M,
+ ) -> None:
+ """Open a serial deivce.
+
+ # Args:
+ device_type: `libcaer.CAER_DEVICE_EDVS`,
+ device_id: a unique ID to identify the device from others. Will be used as
+ the source for EventPackets being generate from its data.
`default is 1`.
- serial_port_name: `str`
- name of the serial port device to open.
+ serial_port_name: name of the serial port device to open.
`default is /dev/ttyUSB0`
- serial_baud_rate: `uint32_t`
- baud-rate for serial port communication.
+ serial_baud_rate: baud-rate for serial port communication.
`default is 12M`
"""
self.handle = libcaer.caerDeviceOpenSerial(
@@ -630,211 +640,3 @@ def open(
)
if self.handle is None:
raise ValueError("The device is failed to open.")
-
- def close(self):
- """Close USB device.
-
- This method closes an opened USB device if the respective handle is not None.
- """
- if self.handle is not None:
- libcaer.caerDeviceClose(self.handle)
-
- def shutdown(self):
- """Shutdown device.
-
- This method is a combination of `data_stop` and `close`. This is a preferred way
- of shutting down a device.
- """
- self.data_stop()
- self.close()
-
- def data_start(self):
- """Start data transmission.
-
- # Returns
- flag: `bool`
- Return `True` if the data transmission is
- initialized successfully. Otherwise `False`.
- """
- # TODO figure out the parameter meaning
- if self.handle is not None:
- data_start_success = libcaer.caerDeviceDataStart(
- self.handle, None, None, None, None, None
- )
- return data_start_success
- else:
- return False
-
- def data_stop(self):
- """Stop data transmission.
-
- This method stops the data transmission only. Note that this method does not
- destroy the respective device `handle`.
- """
- libcaer.caerDeviceDataStop(self.handle)
-
- def send_default_config(self):
- """Send default configuration.
-
- Each type of devices has a set of default configurations (e.g. bias)
- that are pre-defined in the `libcaer` library.
- Note that the default configuration might not be suitable for your
- needs.
-
- # Returns
- flag: `bool`
- return `True` if the default config is set successfully,
- `False` otherwise.
- """
- if self.handle is not None:
- send_success = libcaer.caerDeviceSendDefaultConfig(self.handle)
- return send_success
- else:
- return False
-
- def set_data_exchange_blocking(self, exchange_blocking=True):
- """Set data exchange blocking.
-
- # Arguments
- exchange_blocking: `bool`
- whether to start all the data producer modules on the device
- (DVS, APS, Mux, ...) automatically when starting the
- data transfer thread with `caerDeviceDataStart()` or not.
- If disabled, be aware you will have to start the right modules
- manually, which can be useful if you need precise control
- over which ones are running at any time.
- The default is `True`.
- """
- return self.set_config(
- libcaer.CAER_HOST_CONFIG_DATAEXCHANGE,
- libcaer.CAER_HOST_CONFIG_DATAEXCHANGE_BLOCKING,
- exchange_blocking,
- )
-
- def set_config(self, mod_addr, param_addr, param):
- """Set configuration.
-
- The main function of setting configurations (e.g., bias).
-
- # Arguments
- mod_addr: `int`
- a module address, used to specify which configuration module
- one wants to update. Negative addresses are used for host-side
- configuration, while positive addresses (including zero) are
- used for device-side configuration.
- param_addr: `int`
- a parameter address, to select a specific parameter to update
- from this particular configuration module.
- Only positive numbers
- (including zero) are allowed.
- param: `int` or `bool`
- a configuration parameter's new value.
-
- # Returns
- flag: `bool`
- returns `True` if the config is set successfully,
- `False` otherwise.
- """
- if self.handle is not None:
- set_success = libcaer.caerDeviceConfigSet(
- self.handle, mod_addr, param_addr, param
- )
- return set_success
- else:
- return False
-
- def get_config(self, mod_addr, param_addr):
- """Get Configuration.
-
- # Arguments
- mod_addr: `int`
- a module address, used to specify which configuration module
- one wants to update. Negative addresses are used for host-side
- configuration, while positive addresses (including zero) are
- used for device-side configuration.
- param_addr: `int`
- a parameter address, to select a specific parameter to update
- from this particular configuration module.
- Only positive numbers
- (including zero) are allowed.
-
- # Returns
- param: `int` or `bool`
- a configuration parameter's new value. Returns None
- if the handle is not valid.
- """
- if self.handle is not None:
- return libcaer.caerDeviceConfigGet(self.handle, mod_addr, param_addr)
- else:
- return None
-
- def get_packet_container(self):
- """Get event packet container.
-
- # Returns
- packet_container: `caerEventPacketContainer`
- a container that consists of event packets.
- packet_number: `int`
- number of event packet in the container.
- """
- packet_container = libcaer.caerDeviceDataGet(self.handle)
- if packet_container is not None:
- packet_number = libcaer.caerEventPacketContainerGetEventPacketsNumber(
- packet_container
- )
- return packet_container, packet_number
- else:
- return None, None
-
- def get_packet_header(self, packet_container, idx):
- """Get a single packet header.
-
- # Arguments
- packet_container: `caerEventPacketContainer`
- the event packet container
- idx: `int`
- the index of the packet header
-
- # Returns
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
- packet_type: `caerEventPacketType`
- the type of the event packet
- """
- packet_header = libcaer.caerEventPacketContainerGetEventPacket(
- packet_container, idx
- )
- if packet_header is None:
- return (None, None)
- else:
- packet_type = libcaer.caerEventPacketHeaderGetEventType(packet_header)
- return packet_header, packet_type
-
- def get_polarity_event(self, packet_header):
- """Get a packet of polarity event.
-
- # Arguments
- packet_header: `caerEventPacketHeader`
- the header that represents a event packet
-
- # Returns
- events: `numpy.ndarray`
- a 2-D array that has the shape of (N, 4) where N
- is the number of events in the event packet.
- Each row in the array represents a single polarity event.
- The first number is the timestamp.
- The second number is the X position of the event.
- The third number is the Y position of the event.
- The fourth number represents the polarity of the event
- (positive or negative).
- num_events: `int`
- number of the polarity events available in the packet.
- """
- num_events = libcaer.caerEventPacketHeaderGetEventNumber(packet_header)
- polarity = libcaer.caerPolarityEventPacketFromPacketHeader(packet_header)
-
- events = libcaer.get_polarity_event(polarity, num_events * 4).reshape(
- num_events, 4
- )
-
- return events, num_events
diff --git a/pyaer/dtypes.py b/pyaer/dtypes.py
new file mode 100644
index 0000000..19e2087
--- /dev/null
+++ b/pyaer/dtypes.py
@@ -0,0 +1,10 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+EventType = int
+DeviceType = int
+ModuleAddressType = int
+ParameterAddressType = int
+BiasObjectType = dict[str, Any]