Skip to content

Commit

Permalink
Merge pull request #3008 from pieleric/fix-driver-pwrmccdaq-simulate-…
Browse files Browse the repository at this point in the history
…better-connection-of-dio-ports-a-b

[fix] driver pwrmccdaq: simulate better connection of DIO ports A <-> B
  • Loading branch information
pieleric authored Feb 6, 2025
2 parents dfbd050 + ba9cab3 commit cdbe40f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 56 deletions.
88 changes: 42 additions & 46 deletions src/odemis/driver/pwrmccdaq.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str],
the device is considered a USB-powered MCC DAQ 1208LS device. Note that the serial number
should contain an added 0 to the beginning of the s/n. If "fake" is passed the simulator
is used.
:param di_channels (dict -> int, list(str, bool)):
:param di_channels (dict: int -> list(str, bool)):
the DIO channel used for TTL signal status change through the MCC device.
for example di_channels: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}.
key is the channel number(int), value is a list with VA name(str) and a flag stating
if ttl high is True(bool), this means that the VA will be considered True or False
when the TTL signal is high.
with this approach it is actually possible to keep track of customized status changes.
Example: {2: ["interlockTriggered", False], 7: ["leftSwitch", True]}.
key is the DI channel number.
value is a list with VA name (str) and a "TTL high" flag, stating the value to set when
reading high on the DI channel. So passing True means that the VA will report True when
the port is high, and False when the port is low. Passing False means the opposite.
"""
super().__init__(name, role, **kwargs)
self._name = name
Expand Down Expand Up @@ -121,7 +121,7 @@ def __init__(self, name: str, role: str, mcc_device: Optional[str],
ch_va_obj = DIChannelInfo(channel, False, ttl_high, va_name, va)
self._channel_vas.append(ch_va_obj)
setattr(self, va_name, va) # set the class VA variable name
logging.info(f"{va_name} status activated for component {self._name} on channel {channel}")
logging.info(f"{va_name} status registered for component {self._name} on channel {channel}")

for port, val in dconfig.items():
self.device.DConfig(port, val)
Expand Down Expand Up @@ -192,8 +192,7 @@ def run(self):
logging.info(f"{chan.va_name} changed to {new_val}")
chan.va._set_value(new_val, force_write=True)

logging.info("DI Status thread suspended.")

logging.info("DI Status thread terminated.")
except Exception as ex:
logging.error(f"An Exception occurred while polling DI port status ({ex}) "
f"status changes are not longer tracked.")
Expand All @@ -217,7 +216,9 @@ class MCCDeviceLight(Emitter, MCCDevice):
def __init__(self, name: str, role: str, mcc_device: Optional[str], ao_channels: List[int], do_channels: List[int],
spectra, pwr_curve, di_channels: Dict[int, Tuple[str, bool]] = {}, **kwargs):
"""
:param mcc_device (str or None): refer to parent.
:param mcc_device (str or None): refer to parent. When using the simulator ("fake"), the
first 8 DIO channels are connected to the next 8 DIO channels. For instance, the value of
do_channel 0 is connected to the value read on di_channel 8, etc.
:param ao_channels: (list of (0<=int<=3)):
The analogue output channel for each source, used to control the power level of the laser output.
:param do_channels: (list of (0<=int<=15)):
Expand Down Expand Up @@ -372,16 +373,20 @@ def __init__(self):
# initialize values
self.productID = 0x0007a # USB-1208LS
# to keep track of the individual bits and values
self.port_a_bit_status = [False, False, False, False, False, False, False, False]
self.port_b_bit_status = [False, False, False, False, False, False, False, False]
self.port_a_bit_config = [False, False, False, False, False, False, False, False]
self.port_b_bit_config = [False, False, False, False, False, False, False, False]
self.port_status = {
usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False],
usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False],
}
self.port_config = {
usb_1208LS.DIO_PORTA: [False, False, False, False, False, False, False, False],
usb_1208LS.DIO_PORTB: [False, False, False, False, False, False, False, False],
}
# port 1 (A) or 2 (B) write on A is 2 write to B is 1
self.AO_channels = [0, 0] # value (uint16) in counts to output [10-bits 0-5V]

# set default configuration
self.DConfig(usb_1208LS.DIO_PORTA, 0x00) # Port A output (all LOW)
self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B input (all LOW)
self.DConfig(usb_1208LS.DIO_PORTB, 0x00) # Port B output (all LOW)
self.AOut(0, 0x0)
self.AOut(1, 0x0)

Expand All @@ -403,19 +408,16 @@ def DConfig(self, port_number, bit_mask):
if bit_mask < 0x00 or bit_mask > 0xff:
raise ValueError("Bit mask to set is not between 0 and 255")

if port_number == usb_1208LS.DIO_PORTA:
self.port_a_bit_config = [bool(bit_mask & 1 << i) for i in range(8)]
elif port_number == usb_1208LS.DIO_PORTB:
self.port_b_bit_config = [bool(bit_mask & 1 << i) for i in range(8)]
else:
raise ValueError()
if port_number not in self.port_config:
raise ValueError("Port number is not valid")
self.port_config[port_number] = [bool(bit_mask & 1 << i) for i in range(8)]

def DIn(self, port_number):
"""
:param port_number: AUXPORT = 0x10 | Port A = 0x01 | Port B = 0x04
:return: the value seen at the port pins
"""
DIO_port = self._return_port_status(port_number, read=True)
DIO_port = self.port_status[port_number]
return sum(v << i for i, v in enumerate(DIO_port))

def DOut(self, port_number, value):
Expand All @@ -427,16 +429,13 @@ def DOut(self, port_number, value):
if value < 0 or value > 255:
raise ValueError("Value to set is not between 0 and 255")

DIO_port = self._return_port_status(port_number, read=False)

if port_number == usb_1208LS.DIO_PORTA:
bit_config = self.port_a_bit_config
elif port_number == usb_1208LS.DIO_PORTB:
bit_config = self.port_b_bit_config
else:
raise ValueError()
DIO_port = self.port_status[port_number]
bit_config = self.port_config[port_number]
DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)]

# when port A is requested, writing to port B is simulated and the other way around
# Simulate connection to the other port
other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB
DIO_port = self.port_status[other_port]
DIO_port[:] = [bool(value & 1 << i) if not bit_config[i] else DIO_port[i] for i in range(8)]

def DBitIn(self, port_number, bit):
Expand All @@ -450,9 +449,8 @@ def DBitIn(self, port_number, bit):
if bit < 0 or bit > 7:
raise ValueError("Bit value is not between 0 and 7")

DIO_port = self._return_port_status(port_number, read=True)

return int(DIO_port[bit])
# Note: it's allowed to read from an output bit. It will return the latest value set.
return int(self.port_status[port_number][bit])

def DBitOut(self, port_number, bit, value):
"""
Expand All @@ -466,26 +464,24 @@ def DBitOut(self, port_number, bit, value):
if value < 0 or value > 1:
raise ValueError("Value to set should be either 0 or 1")

DIO_port = self._return_port_status(port_number, read=False)
DIO_port[bit] = bool(value)
if self.port_config[port_number][bit]:
logging.warning("Writing to port %s, bit %s that is not configured as output", port_number, bit)

self.port_status[port_number][bit] = bool(value)

# Simulate connection to the other port
other_port = usb_1208LS.DIO_PORTA if port_number == usb_1208LS.DIO_PORTB else usb_1208LS.DIO_PORTB
self.port_status[other_port][bit] = bool(value)

def AOut(self, channel, value):
"""
This command sets the voltage output of the specified analog output channel
:param channel: selects output channel (0 or 1)
:param value: value (uint16) in counts to output [10-bits 0-5V]
"""
if channel > 1 or channel < 0:
channel = 0
if channel < 0 or channel > 1:
raise ValueError(f"Channel should be either 0 or 1 but got {channel}")
# force automatic clipping
value = min(max(0, value), 0x3ff)

self.AO_channels[channel] = value

def _return_port_status(self, port_number, read: bool = True) -> List[bool]:
if port_number == usb_1208LS.DIO_PORTA:
return self.port_a_bit_status if read else self.port_b_bit_status
elif port_number == usb_1208LS.DIO_PORTB:
return self.port_b_bit_status if read else self.port_a_bit_status
else:
raise ValueError()
20 changes: 10 additions & 10 deletions src/odemis/driver/test/pwrmccdaq_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"name": "Laser Hub", "role": "light",
"mcc_device": None,
"ao_channels": [0, 1],
"do_channels": [5, 6],
"do_channels": [5, 6], # pins 26 & 27
"spectra": [
[592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9],
[592.e-9, 593.e-9, 594.e-9, 595.e-9, 596.e-9]],
Expand All @@ -49,7 +49,11 @@
0: 0,
5: 0.06, # 60mW light emitter
}],
"di_channels": {10: ["interlockTriggered", False], 11: ["mirrorParked", False]},
# Simulator connects to the first 8 channels <-> last 8 channels.
# Setting the power to 0.0 will change the DO ports 5 or 6, which will affect
# the DI ports 13 or 14.
# For hardware testing, connect pins 26 and 27 with 37 and 38 respectively on the board.
"di_channels": {13: ["interlockTriggered", False], 14: ["mirrorParked", True]},
}


Expand Down Expand Up @@ -97,12 +101,9 @@ def test_interlock(self):
Test for the instantiation of a basic MCC device with a DI status activated.
Checks for registered VA's, polling status thread and channel selection.
Tests trigger of VA status by using a TTL signal from a DO to a specific DI.
For this test to work with real HW, connect pin 26 and 27 with 34 and 35
For this test to work with real HW, connect pins 26 and 27 with pins 37 and 38
respectively on the board.
"""
if TEST_NOHW:
self.skipTest("This test case does not make sense without the real HW.")

self._create_device()

# check if the interlockTriggered VA is registered properly
Expand All @@ -127,19 +128,18 @@ def test_interlock(self):

# check the status of the DI channels before triggering
self.assertFalse(self.mcc_device.interlockTriggered.value)
self.assertFalse(self.mcc_device.mirrorParked.value)
self.assertTrue(self.mcc_device.mirrorParked.value)

# check the old bit status
old_bit_status = self.mcc_device.device.DIn(0x04)

# set the power back to zero, this should trigger both of the DI channels
self.mcc_device.power.value[0] = self.mcc_device.power.range[0][0]
self.mcc_device.power.value[1] = self.mcc_device.power.range[0][1]
self.mcc_device.power.value = self.mcc_device.power.range[0]
time.sleep(0.15) # wait a little longer than the tread interval

# check the status of the DI channels after triggering
self.assertTrue(self.mcc_device.interlockTriggered.value)
self.assertTrue(self.mcc_device.interlockTriggered.value)
self.assertFalse(self.mcc_device.mirrorParked.value)

# check if the bit status is now the new bit status
new_bit_status = self.mcc_device.device.DIn(0x04)
Expand Down

0 comments on commit cdbe40f

Please sign in to comment.