From ba9cab3a6412480c75fd50779c425b9429d51650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Piel?= Date: Fri, 10 Jan 2025 18:31:05 +0100 Subject: [PATCH] [fix] driver pwrmccdaq: simulate better connection of DIO ports A <-> B The simulator simulates port A being connected to port B. So writting to a bit on port A causes the equivalent bit on port B to read the same value (and inversely too). This worked, but it broke the possibility to read what value is currently set. => write both to port A & B. Also *document* this behaviour. Also, as the test cases relied on such behaviour on the real hardware (by physically connecting some pins together). The tests also work on the simulator. So no need to disable it... However that test was broken, so fix it too. --- src/odemis/driver/pwrmccdaq.py | 88 +++++++++++------------- src/odemis/driver/test/pwrmccdaq_test.py | 20 +++--- 2 files changed, 52 insertions(+), 56 deletions(-) diff --git a/src/odemis/driver/pwrmccdaq.py b/src/odemis/driver/pwrmccdaq.py index 6a9839a71c..84cf87861f 100644 --- a/src/odemis/driver/pwrmccdaq.py +++ b/src/odemis/driver/pwrmccdaq.py @@ -69,13 +69,13 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str], the device is considered a USB-powered MCC DAQ 1208LS device. Note that the serial number should contain an added 0 to the beginning of the s/n. If "fake" is passed the simulator is used. - :param di_channels (dict -> int, list(str, bool)): + :param di_channels (dict: int -> list(str, bool)): the DIO channel used for TTL signal status change through the MCC device. - for example di_channels: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}. - key is the channel number(int), value is a list with VA name(str) and a flag stating - if ttl high is True(bool), this means that the VA will be considered True or False - when the TTL signal is high. - with this approach it is actually possible to keep track of customized status changes. + Example: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}. + key is the DI channel number. + value is a list with VA name (str) and a "TTL high" flag, stating the value to set when + reading high on the DI channel. So passing True means that the VA will report True when + the port is high, and False when the port is low. Passing False means the opposite. """ super().__init__(name, role, **kwargs) self._name = name @@ -121,7 +121,7 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str], ch_va_obj = DIChannelInfo(channel, False, ttl_high, va_name, va) self._channel_vas.append(ch_va_obj) setattr(self, va_name, va) # set the class VA variable name - logging.info(f"{va_name} status activated for component {self._name} on channel {channel}") + logging.info(f"{va_name} status registered for component {self._name} on channel {channel}") for port, val in dconfig.items(): self.device.DConfig(port, val) @@ -192,8 +192,7 @@ def run(self): logging.info(f"{chan.va_name} changed to {new_val}") chan.va._set_value(new_val, force_write=True) - logging.info("DI Status thread suspended.") - + logging.info("DI Status thread terminated.") except Exception as ex: logging.error(f"An Exception occurred while polling DI port status ({ex}) " f"status changes are not longer tracked.") @@ -217,7 +216,9 @@ class MCCDeviceLight(Emitter, MCCDevice): def __init__(self, name: str, role: str, mcc_device: Optional[str], ao_channels: List[int], do_channels: List[int], spectra, pwr_curve, di_channels: Dict[int, Tuple[str, bool]] = {}, **kwargs): """ - :param mcc_device (str or None): refer to parent. + :param mcc_device (str or None): refer to parent. When using the simulator ("fake"), the + first 8 DIO channels are connected to the next 8 DIO channels. For instance, the value of + do_channel 0 is connected to the value read on di_channel 8, etc. :param ao_channels: (list of (0<=int<=3)): The analogue output channel for each source, used to control the power level of the laser output. :param do_channels: (list of (0<=int<=15)): @@ -372,16 +373,20 @@ def __init__(self): # initialize values self.productID = 0x0007a # USB-1208LS # to keep track of the individual bits and values - self.port_a_bit_status = [False, False, False, False, False, False, False, False] - self.port_b_bit_status = [False, False, False, False, False, False, False, False] - self.port_a_bit_config = [False, False, False, False, False, False, False, False] - self.port_b_bit_config = [False, False, False, False, False, False, False, False] + self.port_status = { + usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False], + usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False], + } + self.port_config = { + usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False], + usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False], + } # port 1 (A) or 2 (B) write on A is 2 write to B is 1 self.AO_channels = [0, 0] # value (uint16) in counts to output [10-bits 0-5V] # set default configuration self.DConfig(usb_1208LS.DIO_PORTA, 0x00) # Port A output (all LOW) - self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B input (all LOW) + self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B output (all LOW) self.AOut(0, 0x0) self.AOut(1, 0x0) @@ -403,19 +408,16 @@ def DConfig(self, port_number, bit_mask): if bit_mask < 0x00 or bit_mask > 0xff: raise ValueError("Bit mask to set is not between 0 and 255") - if port_number == usb_1208LS.DIO_PORTA: - self.port_a_bit_config = [bool(bit_mask & 1 << i) for i in range(8)] - elif port_number == usb_1208LS.DIO_PORTB: - self.port_b_bit_config = [bool(bit_mask & 1 << i) for i in range(8)] - else: - raise ValueError() + if port_number not in self.port_config: + raise ValueError("Port number is not valid") + self.port_config[port_number] = [bool(bit_mask & 1 << i) for i in range(8)] def DIn(self, port_number): """ :param port_number: AUXPORT = 0x10 | Port A = 0x01 | Port B = 0x04 :return: the value seen at the port pins """ - DIO_port = self._return_port_status(port_number, read=True) + DIO_port = self.port_status[port_number] return sum(v << i for i, v in enumerate(DIO_port)) def DOut(self, port_number, value): @@ -427,16 +429,13 @@ def DOut(self, port_number, value): if value < 0 or value > 255: raise ValueError("Value to set is not between 0 and 255") - DIO_port = self._return_port_status(port_number, read=False) - - if port_number == usb_1208LS.DIO_PORTA: - bit_config = self.port_a_bit_config - elif port_number == usb_1208LS.DIO_PORTB: - bit_config = self.port_b_bit_config - else: - raise ValueError() + DIO_port = self.port_status[port_number] + bit_config = self.port_config[port_number] + DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)] - # when port A is requested, writing to port B is simulated and the other way around + # Simulate connection to the other port + other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB + DIO_port = self.port_status[other_port] DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)] def DBitIn(self, port_number, bit): @@ -450,9 +449,8 @@ def DBitIn(self, port_number, bit): if bit < 0 or bit > 7: raise ValueError("Bit value is not between 0 and 7") - DIO_port = self._return_port_status(port_number, read=True) - - return int(DIO_port[bit]) + # Note: it's allowed to read from an output bit. It will return the latest value set. + return int(self.port_status[port_number][bit]) def DBitOut(self, port_number, bit, value): """ @@ -466,8 +464,14 @@ def DBitOut(self, port_number, bit, value): if value < 0 or value > 1: raise ValueError("Value to set should be either 0 or 1") - DIO_port = self._return_port_status(port_number, read=False) - DIO_port[bit] = bool(value) + if self.port_config[port_number][bit]: + logging.warning("Writing to port %s, bit %s that is not configured as output", port_number, bit) + + self.port_status[port_number][bit] = bool(value) + + # Simulate connection to the other port + other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB + self.port_status[other_port][bit] = bool(value) def AOut(self, channel, value): """ @@ -475,17 +479,9 @@ def AOut(self, channel, value): :param channel: selects output channel (0 or 1) :param value: value (uint16) in counts to output [10-bits 0-5V] """ - if channel > 1 or channel < 0: - channel = 0 + if channel < 0 or channel > 1: + raise ValueError(f"Channel should be either 0 or 1 but got {channel}") # force automatic clipping value = min(max(0, value), 0x3ff) self.AO_channels[channel] = value - - def _return_port_status(self, port_number, read: bool = True) -> List[bool]: - if port_number == usb_1208LS.DIO_PORTA: - return self.port_a_bit_status if read else self.port_b_bit_status - elif port_number == usb_1208LS.DIO_PORTB: - return self.port_b_bit_status if read else self.port_a_bit_status - else: - raise ValueError() diff --git a/src/odemis/driver/test/pwrmccdaq_test.py b/src/odemis/driver/test/pwrmccdaq_test.py index f7811756da..5acc7bc75f 100644 --- a/src/odemis/driver/test/pwrmccdaq_test.py +++ b/src/odemis/driver/test/pwrmccdaq_test.py @@ -36,7 +36,7 @@ "name": "Laser Hub", "role": "light", "mcc_device": None, "ao_channels": [0, 1], - "do_channels": [5, 6], + "do_channels": [5, 6], # pins 26 & 27 "spectra": [ [592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9], [592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9]], @@ -49,7 +49,11 @@ 0: 0, 5: 0.06, # 60mW light emitter }], - "di_channels": {10: ["interlockTriggered", False], 11: ["mirrorParked", False]}, + # Simulator connects to the first 8 channels <-> last 8 channels. + # Setting the power to 0.0 will change the DO ports 5 or 6, which will affect + # the DI ports 13 or 14. + # For hardware testing, connect pins 26 and 27 with 37 and 38 respectively on the board. + "di_channels": {13: ["interlockTriggered", False], 14: ["mirrorParked", True]}, } @@ -97,12 +101,9 @@ def test_interlock(self): Test for the instantiation of a basic MCC device with a DI status activated. Checks for registered VA's, polling status thread and channel selection. Tests trigger of VA status by using a TTL signal from a DO to a specific DI. - For this test to work with real HW, connect pin 26 and 27 with 34 and 35 + For this test to work with real HW, connect pins 26 and 27 with pins 37 and 38 respectively on the board. """ - if TEST_NOHW: - self.skipTest("This test case does not make sense without the real HW.") - self._create_device() # check if the interlockTriggered VA is registered properly @@ -127,19 +128,18 @@ def test_interlock(self): # check the status of the DI channels before triggering self.assertFalse(self.mcc_device.interlockTriggered.value) - self.assertFalse(self.mcc_device.mirrorParked.value) + self.assertTrue(self.mcc_device.mirrorParked.value) # check the old bit status old_bit_status = self.mcc_device.device.DIn(0x04) # set the power back to zero, this should trigger both of the DI channels - self.mcc_device.power.value[0] = self.mcc_device.power.range[0][0] - self.mcc_device.power.value[1] = self.mcc_device.power.range[0][1] + self.mcc_device.power.value = self.mcc_device.power.range[0] time.sleep(0.15) # wait a little longer than the tread interval # check the status of the DI channels after triggering self.assertTrue(self.mcc_device.interlockTriggered.value) - self.assertTrue(self.mcc_device.interlockTriggered.value) + self.assertFalse(self.mcc_device.mirrorParked.value) # check if the bit status is now the new bit status new_bit_status = self.mcc_device.device.DIn(0x04)