Skip to content

Commit

Permalink
Publish all RBR data channels
Browse files Browse the repository at this point in the history
  • Loading branch information
rgov committed Apr 9, 2024
1 parent 6b0861f commit d160604
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 26 deletions.
12 changes: 12 additions & 0 deletions src/rbr_maestro3_ctd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ project(rbr_maestro3_ctd)

find_package(
catkin REQUIRED COMPONENTS
message_generation
std_msgs
ds_core_msgs
ds_sensor_msgs
)

add_message_files(
FILES
RbrMeasurement.msg
)

generate_messages(
DEPENDENCIES
std_msgs
ds_core_msgs
)

catkin_package()
16 changes: 16 additions & 0 deletions src/rbr_maestro3_ctd/msg/RbrMeasurement.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# RbrMeasurement

#####################################################################

# The standard 2-part DsHeader block
# This allows both a standard ROS header and DS-specific header blocks
# See HEADERS.md in ds_core_msgs for details
std_msgs/Header header
ds_core_msgs/DsHeader ds_header

#####################################################################

string channel

float32 value # assuming it is always a float?
string unit
123 changes: 97 additions & 26 deletions src/rbr_maestro3_ctd/src/rbr_maestro3_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,35 @@
import datetime
import functools
import math
import re

import rospy

from ds_core_msgs.msg import RawData
from ds_sensor_msgs.msg import Ctd, DepthPressure

from rbr_maestro3_ctd.msg import RbrMeasurement


# Converts an arbitrary string to a ROS-safe name per http://wiki.ros.org/Names.
# TODO: Copied from aml_ctd_node.py; move to a common library.
def ros_safe(s, keep_slashes=False):
# Spaces and optionally slashes to underscores
s = s.replace(' ', '_')
if not keep_slashes:
s = s.replace('/', '_')

# Split up camelCase, without breaking acronyms
s = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\g<1>_\g<2>', s)
s = re.sub(r'([a-z])([A-Z])', r'\g<1>_\g<2>', s)

# Drop other characters and reduce to lowercase
s = re.sub(r'[^a-z0-9_]', '', s.lower())
if not re.match(r'^[a-z]', s):
s = 'x' + s # must start with a letter
return s



# All messages received from ROS are placed into this queue. Note that
# asyncio.Queue is _not_ thread safe.
Expand Down Expand Up @@ -49,11 +72,42 @@ async def main():
rospy.init_node('rbr_maestro3_ctd', anonymous=True, disable_signals=True)
rospy.core.add_preshutdown_hook(functools.partial(on_shutdown, loop))

# Create a mapping of topics to publish on
publishers = {
Ctd: rospy.Publisher('~', Ctd, queue_size=5),
DepthPressure: rospy.Publisher('~depth', DepthPressure, queue_size=5),
}
# Load the list of channels to publish from the ROS parameter server. The
# channels list should correspond with the `outputformat channelslist`
# configuration of the instrument.
#
# The channels are parsed into a (channel, unit) tuple:
# O2_concentration(umol/L) -> ('O2_concentration', 'umol/L')
#
# TODO: Query the instrument for its outputformat configuration.
channels = rospy.get_param('~channels', [])
if isinstance(channels, str):
channels = channels.split('|')
channels = [
tuple(c.strip(')').split('(')) if '(' in c else (c, '')
for c in channels
]


# Check a few channel types we are inflexible about
has_depth = False
for c, u in channels:
if c.lower() == 'depth':
has_depth = True
assert u == 'm'
if c.lower() == 'pressure':
assert u == 'dbar'


# Create publishers for each channel
ctd_publisher = rospy.Publisher('~', Ctd, queue_size=5)
if has_depth:
depth_publisher = rospy.Publisher('~depth', DepthPressure, queue_size=5)
channel_publishers = [
rospy.Publisher(f'~{ros_safe(c)}', RbrMeasurement, queue_size=5)
for c, u in channels
]


# Subscribe to incoming comms messages
handler = lambda msg: asyncio.run_coroutine_threadsafe(ros_msg_q.put(msg),
Expand All @@ -66,31 +120,32 @@ async def main():

# Crummy parser
fields = [ x.strip() for x in msg.data.split(b',') ]
if len(fields) != 15:
rospy.logwarn(f'Discarding malformed RBR message - expected 15 fields but received {len(fields)}')
if len(fields) != len(channels) + 1:
rospy.logwarn('Discarding malformed RBR message - expected '
f'{len(channels)+1} fields but received '
f'{len(fields)}')
continue

try:
# outputformat channelslist = [timestamp|]conductivity(mS/cm)|temperature(C)|pressure(dbar)|chlorophyll(ug/L)|phycoerythrin(cells/mL)|temperature(C)|O2_concentration(umol/L)|PAR(umol/m2/s)|turbidity(NTU)|pressure(dbar)|depth(m)|salinity(PSU)|speed_of_sound(m/s)|specific_conductivity(uS/cm)|O2_air_saturation(%)
conductivity, temperature, pressure, chlorophyll, phycoerythrin, \
temperature_again, o2_concentration, par, turbidity, \
pressure_again, depth, salinity, speed_of_sound, \
specific_conductivity, o2_air_saturation = \
[ float(x) for x in fields[1:] ]
# Parse all channel values as floats
values = {
c.lower(): float(x) for (c, u), x in zip(channels, fields[1:])
}

# Parse the timestamp as UTC
timestamp = datetime.datetime.strptime(fields[0].decode('utf-8'),
'%Y-%m-%d %H:%M:%S.%f')
timestamp = datetime.datetime.strptime(fields[0].decode(),
'%Y-%m-%d %H:%M:%S.%f')
timestamp = timestamp.replace(tzinfo=datetime.timezone.utc)
except ValueError as e:
rospy.logwarn(f'Discarding RBR message due to error during parsing: {e}')
continue


# Construct the Ctd message
ctd = Ctd()
ctd.conductivity = conductivity
ctd.temperature = temperature
ctd.pressure = pressure
ctd.conductivity = c.get('conductivity', math.nan)
ctd.temperature = c.get('temperature', math.nan)
ctd.pressure = c.get('pressure', math.nan)

# Clear fields with no measurement
ctd.salinity = math.nan
Expand All @@ -100,25 +155,41 @@ async def main():
ctd.conductivity_covar = ctd.temperature_covar = ctd.pressure_covar = \
ctd.salinity_covar = ctd.sound_speed_covar = -1


# Construct the DepthPressure message
dp = DepthPressure()
dp.depth = depth
dp.depth = c.get('depth', math.nan)
dp.latitude = DepthPressure.DEPTH_PRESSURE_NO_DATA
dp.tare = DepthPressure.DEPTH_PRESSURE_NO_DATA
dp.pressure_raw = pressure
dp.pressure_raw = c.get('pressure', math.nan)
dp.pressure_raw_unit = DepthPressure.UNIT_PRESSURE_DBAR
dp.pressure = pressure
dp.pressure = c.get('pressure', math.nan)


# Construct the RbrMeasurement messages
rbr_msgs = []
for c, u in channels:
rbr_msg = RbrMeasurement()
rbr_msg.channel = c
rbr_msg.unit = u
rbr_msg.value = values[c.lower()]
rbr_msgs.append(rbr_msg)


# Assemble all messages and their publishers
all_msgs = [(ctd, ctd_publisher)]
all_msgs.extend(zip(rbr_msgs, channel_publishers))
if has_depth:
all_msgs.append((dp, depth_publisher))

# Set the appropriate message timestamps
for m in [ctd, dp]:
# Set the appropriate message timestamps and publish
for m, publisher in all_msgs:
# The standard ROS header timestamp reflects when the instrument
# says the sample was taken. The DsHeader reflects the I/O time.
m.header.stamp = rospy.Time.from_sec(timestamp.timestamp())
m.ds_header.io_time = msg.ds_header.io_time

# Publish the messages
for m in [ctd, dp]:
publishers[type(m)].publish(m)
publisher.publish(m)


if __name__ == '__main__':
Expand Down

0 comments on commit d160604

Please sign in to comment.