diff --git a/src/odemis/driver/pwrmccdaq.py b/src/odemis/driver/pwrmccdaq.py index 6a9839a71c..84cf87861f 100644 --- a/src/odemis/driver/pwrmccdaq.py +++ b/src/odemis/driver/pwrmccdaq.py @@ -69,13 +69,13 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str], the device is considered a USB-powered MCC DAQ 1208LS device. Note that the serial number should contain an added 0 to the beginning of the s/n. If "fake" is passed the simulator is used. - :param di_channels (dict -> int, list(str, bool)): + :param di_channels (dict: int -> list(str, bool)): the DIO channel used for TTL signal status change through the MCC device. - for example di_channels: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}. - key is the channel number(int), value is a list with VA name(str) and a flag stating - if ttl high is True(bool), this means that the VA will be considered True or False - when the TTL signal is high. - with this approach it is actually possible to keep track of customized status changes. + Example: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}. + key is the DI channel number. + value is a list with VA name (str) and a "TTL high" flag, stating the value to set when + reading high on the DI channel. So passing True means that the VA will report True when + the port is high, and False when the port is low. Passing False means the opposite. """ super().__init__(name, role, **kwargs) self._name = name @@ -121,7 +121,7 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str], ch_va_obj = DIChannelInfo(channel, False, ttl_high, va_name, va) self._channel_vas.append(ch_va_obj) setattr(self, va_name, va) # set the class VA variable name - logging.info(f"{va_name} status activated for component {self._name} on channel {channel}") + logging.info(f"{va_name} status registered for component {self._name} on channel {channel}") for port, val in dconfig.items(): self.device.DConfig(port, val) @@ -192,8 +192,7 @@ def run(self): logging.info(f"{chan.va_name} changed to {new_val}") chan.va._set_value(new_val, force_write=True) - logging.info("DI Status thread suspended.") - + logging.info("DI Status thread terminated.") except Exception as ex: logging.error(f"An Exception occurred while polling DI port status ({ex}) " f"status changes are not longer tracked.") @@ -217,7 +216,9 @@ class MCCDeviceLight(Emitter, MCCDevice): def __init__(self, name: str, role: str, mcc_device: Optional[str], ao_channels: List[int], do_channels: List[int], spectra, pwr_curve, di_channels: Dict[int, Tuple[str, bool]] = {}, **kwargs): """ - :param mcc_device (str or None): refer to parent. + :param mcc_device (str or None): refer to parent. When using the simulator ("fake"), the + first 8 DIO channels are connected to the next 8 DIO channels. For instance, the value of + do_channel 0 is connected to the value read on di_channel 8, etc. :param ao_channels: (list of (0<=int<=3)): The analogue output channel for each source, used to control the power level of the laser output. :param do_channels: (list of (0<=int<=15)): @@ -372,16 +373,20 @@ def __init__(self): # initialize values self.productID = 0x0007a # USB-1208LS # to keep track of the individual bits and values - self.port_a_bit_status = [False, False, False, False, False, False, False, False] - self.port_b_bit_status = [False, False, False, False, False, False, False, False] - self.port_a_bit_config = [False, False, False, False, False, False, False, False] - self.port_b_bit_config = [False, False, False, False, False, False, False, False] + self.port_status = { + usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False], + usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False], + } + self.port_config = { + usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False], + usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False], + } # port 1 (A) or 2 (B) write on A is 2 write to B is 1 self.AO_channels = [0, 0] # value (uint16) in counts to output [10-bits 0-5V] # set default configuration self.DConfig(usb_1208LS.DIO_PORTA, 0x00) # Port A output (all LOW) - self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B input (all LOW) + self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B output (all LOW) self.AOut(0, 0x0) self.AOut(1, 0x0) @@ -403,19 +408,16 @@ def DConfig(self, port_number, bit_mask): if bit_mask < 0x00 or bit_mask > 0xff: raise ValueError("Bit mask to set is not between 0 and 255") - if port_number == usb_1208LS.DIO_PORTA: - self.port_a_bit_config = [bool(bit_mask & 1 << i) for i in range(8)] - elif port_number == usb_1208LS.DIO_PORTB: - self.port_b_bit_config = [bool(bit_mask & 1 << i) for i in range(8)] - else: - raise ValueError() + if port_number not in self.port_config: + raise ValueError("Port number is not valid") + self.port_config[port_number] = [bool(bit_mask & 1 << i) for i in range(8)] def DIn(self, port_number): """ :param port_number: AUXPORT = 0x10 | Port A = 0x01 | Port B = 0x04 :return: the value seen at the port pins """ - DIO_port = self._return_port_status(port_number, read=True) + DIO_port = self.port_status[port_number] return sum(v << i for i, v in enumerate(DIO_port)) def DOut(self, port_number, value): @@ -427,16 +429,13 @@ def DOut(self, port_number, value): if value < 0 or value > 255: raise ValueError("Value to set is not between 0 and 255") - DIO_port = self._return_port_status(port_number, read=False) - - if port_number == usb_1208LS.DIO_PORTA: - bit_config = self.port_a_bit_config - elif port_number == usb_1208LS.DIO_PORTB: - bit_config = self.port_b_bit_config - else: - raise ValueError() + DIO_port = self.port_status[port_number] + bit_config = self.port_config[port_number] + DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)] - # when port A is requested, writing to port B is simulated and the other way around + # Simulate connection to the other port + other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB + DIO_port = self.port_status[other_port] DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)] def DBitIn(self, port_number, bit): @@ -450,9 +449,8 @@ def DBitIn(self, port_number, bit): if bit < 0 or bit > 7: raise ValueError("Bit value is not between 0 and 7") - DIO_port = self._return_port_status(port_number, read=True) - - return int(DIO_port[bit]) + # Note: it's allowed to read from an output bit. It will return the latest value set. + return int(self.port_status[port_number][bit]) def DBitOut(self, port_number, bit, value): """ @@ -466,8 +464,14 @@ def DBitOut(self, port_number, bit, value): if value < 0 or value > 1: raise ValueError("Value to set should be either 0 or 1") - DIO_port = self._return_port_status(port_number, read=False) - DIO_port[bit] = bool(value) + if self.port_config[port_number][bit]: + logging.warning("Writing to port %s, bit %s that is not configured as output", port_number, bit) + + self.port_status[port_number][bit] = bool(value) + + # Simulate connection to the other port + other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB + self.port_status[other_port][bit] = bool(value) def AOut(self, channel, value): """ @@ -475,17 +479,9 @@ def AOut(self, channel, value): :param channel: selects output channel (0 or 1) :param value: value (uint16) in counts to output [10-bits 0-5V] """ - if channel > 1 or channel < 0: - channel = 0 + if channel < 0 or channel > 1: + raise ValueError(f"Channel should be either 0 or 1 but got {channel}") # force automatic clipping value = min(max(0, value), 0x3ff) self.AO_channels[channel] = value - - def _return_port_status(self, port_number, read: bool = True) -> List[bool]: - if port_number == usb_1208LS.DIO_PORTA: - return self.port_a_bit_status if read else self.port_b_bit_status - elif port_number == usb_1208LS.DIO_PORTB: - return self.port_b_bit_status if read else self.port_a_bit_status - else: - raise ValueError() diff --git a/src/odemis/driver/test/pwrmccdaq_test.py b/src/odemis/driver/test/pwrmccdaq_test.py index f7811756da..5acc7bc75f 100644 --- a/src/odemis/driver/test/pwrmccdaq_test.py +++ b/src/odemis/driver/test/pwrmccdaq_test.py @@ -36,7 +36,7 @@ "name": "Laser Hub", "role": "light", "mcc_device": None, "ao_channels": [0, 1], - "do_channels": [5, 6], + "do_channels": [5, 6], # pins 26 & 27 "spectra": [ [592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9], [592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9]], @@ -49,7 +49,11 @@ 0: 0, 5: 0.06, # 60mW light emitter }], - "di_channels": {10: ["interlockTriggered", False], 11: ["mirrorParked", False]}, + # Simulator connects to the first 8 channels <-> last 8 channels. + # Setting the power to 0.0 will change the DO ports 5 or 6, which will affect + # the DI ports 13 or 14. + # For hardware testing, connect pins 26 and 27 with 37 and 38 respectively on the board. + "di_channels": {13: ["interlockTriggered", False], 14: ["mirrorParked", True]}, } @@ -97,12 +101,9 @@ def test_interlock(self): Test for the instantiation of a basic MCC device with a DI status activated. Checks for registered VA's, polling status thread and channel selection. Tests trigger of VA status by using a TTL signal from a DO to a specific DI. - For this test to work with real HW, connect pin 26 and 27 with 34 and 35 + For this test to work with real HW, connect pins 26 and 27 with pins 37 and 38 respectively on the board. """ - if TEST_NOHW: - self.skipTest("This test case does not make sense without the real HW.") - self._create_device() # check if the interlockTriggered VA is registered properly @@ -127,19 +128,18 @@ def test_interlock(self): # check the status of the DI channels before triggering self.assertFalse(self.mcc_device.interlockTriggered.value) - self.assertFalse(self.mcc_device.mirrorParked.value) + self.assertTrue(self.mcc_device.mirrorParked.value) # check the old bit status old_bit_status = self.mcc_device.device.DIn(0x04) # set the power back to zero, this should trigger both of the DI channels - self.mcc_device.power.value[0] = self.mcc_device.power.range[0][0] - self.mcc_device.power.value[1] = self.mcc_device.power.range[0][1] + self.mcc_device.power.value = self.mcc_device.power.range[0] time.sleep(0.15) # wait a little longer than the tread interval # check the status of the DI channels after triggering self.assertTrue(self.mcc_device.interlockTriggered.value) - self.assertTrue(self.mcc_device.interlockTriggered.value) + self.assertFalse(self.mcc_device.mirrorParked.value) # check if the bit status is now the new bit status new_bit_status = self.mcc_device.device.DIn(0x04)