Skip to content

Commit

Permalink
first attempt at async driver (with errors!)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinPlaza committed Nov 18, 2024
1 parent c2d9929 commit 2f3e091
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 33 deletions.
2 changes: 2 additions & 0 deletions mil_common/drivers/electrical_protocol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ project(electrical_protocol)
find_package(catkin REQUIRED COMPONENTS
rospy
serial
axros
)

catkin_python_setup()
catkin_package()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Everything currently labelled async is unifinished
and not fully working.

The last issue I had was the simulated device not
being able to connect to the test, all async files are
commented out.

You can run the async test with the following command:
"rostest --text electrical_protocol async_simulated.test"

If you have any questions or confusion about the mess
I've made here feel free to reach out: marcinplaza@ufl.edu

ps: when starting you will most likely have to make
async_test_simulated_basic.py / async_driver.py executable
example: chmod +x async_test_simulated_basic.py (makes it executable)

some helpful sites:
axros examples
https://mil.ufl.edu/docs/reference/axros/examples.html

axros reference
https://mil.ufl.edu/docs/reference/axros/api.html#service

asyncio reference
https://docs.python.org/3/library/asyncio.html
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
class can be used, which supports packets built through this library.
"""

from .async_driver import AsyncSerialDevice
from .driver import ROSSerialDevice
from .packet import AckPacket, ChecksumException, NackPacket, Packet
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
'''
from __future__ import annotations
import abc
Expand All @@ -6,7 +7,8 @@
from typing import Any, Generic, TypeVar, Union, cast, get_args, get_origin
import axros
import rospy
# import rospy
import serial
import serial_asyncio
Expand All @@ -23,13 +25,17 @@ def __init__(self, device: AsyncSerialDevice):
self.byte_count = 0
self.buffer = b""
self.expected_count = None
print("in init?")
def connection_made(self, transport: asyncio.BaseTransport) -> None:
self.transport = transport
print("port opened", transport)
self.transport.serial.reset_input_buffer() # type: ignore
self.transport.serial.reset_output_buffer() # type: ignore
def data_received(self, data: bytes) -> None:
print("data received?")
# Either wait to hear the next start byte, or keep accumulating bytes
# for the next packet
if self.expected_count is None:
Expand Down Expand Up @@ -114,7 +120,7 @@ def __init_subclass__(cls) -> None:
cls._send_T = get_args(cls.__orig_bases__[0])[0] # type: ignore
cls._recv_T = get_args(cls.__orig_bases__[0])[1] # type: ignore
async def connect(self, port: str, baudrate: int) -> None:
async def connect(self, port: str, baudrate: int, loop) -> None:
"""
Connects to the port with the given baudrate. If the device is already
connected, the input and output buffers will be flushed.
Expand All @@ -123,34 +129,40 @@ async def connect(self, port: str, baudrate: int) -> None:
port (str): The serial port to connect to.
baudrate (int): The baudrate to connect with.
"""
print("3")
self.port = port
self.baudrate = baudrate
if self.transport:
raise RuntimeError("Device is already connected.")
# if self.transport:
# raise RuntimeError("Device is already connected.")
print("the serial connection runs after this?")
self.transport, self.protocol = await serial_asyncio.create_serial_connection(
asyncio.get_event_loop(),
loop,
DeviceProtocol,
port,
baudrate=baudrate,
)
def close(self) -> None:
print("the serial connection creator finished?")
async def close(self) -> None:
"""
Closes the serial device.
"""
rospy.loginfo("Closing serial device...")
print("Closing serial device...")
if not self.transport:
raise RuntimeError("Device is not connected.")
else:
with contextlib.suppress(OSError):
if self.transport.serial.in_waiting: # type: ignore
rospy.logwarn(
print(
"Shutting down device, but packets were left in buffer...",
)
self.transport.close()
self.transport = None
def write(self, data: bytes) -> None:
async def write(self, data: bytes) -> None:
"""
Writes a series of raw bytes to the device. This method should rarely be
used; using :meth:`~.send_packet` is preferred because of the guarantees
Expand All @@ -164,21 +176,22 @@ def write(self, data: bytes) -> None:
self.transport.write(data)
def send_packet(self, packet: SendPackets) -> None:
print("4")
"""
Sends a given packet to the device.
Arguments:
packet (:class:`~.Packet`): The packet to send.
"""
self.write(bytes(packet))
asyncio.create_task(self.write(bytes(packet)))
def _read_from_stream(self) -> bytes:
async def _read_from_stream(self) -> bytes:
# Read until SOF is encourntered in case buffer contains the end of a previous packet
if not self.device:
if not self.transport:
raise RuntimeError("Device is not connected.")
sof = None
for _ in range(10):
sof = self.device.read(1)
sof = self.transport.serial.read(1)
if not len(sof):
continue
sof_int = int.from_bytes(sof, byteorder="big")
Expand All @@ -188,13 +201,13 @@ def _read_from_stream(self) -> bytes:
raise TimeoutError("No SOF received in one second.")
sof_int = int.from_bytes(sof, byteorder="big")
if sof_int != SYNC_CHAR_1:
rospy.logerr("Where da start char at?")
print("Where da start char at?")
data = sof
# Read sync char 2, msg ID, subclass ID
data += self.device.read(3)
length = self.device.read(2) # read payload length
data += self.transport.serial.read(3)
length = self.transport.serial.read(2) # read payload length
data += length
data += self.device.read(
data += self.transport.serial.read(
int.from_bytes(length, byteorder="little") + 2,
) # read data and checksum
return data
Expand All @@ -208,14 +221,14 @@ def _correct_type(self, provided: Any) -> bool:
else:
return isinstance(provided, self._recv_T)
def _read_packet(self) -> bool:
async def _read_packet(self) -> bool:
if not self.device:
raise RuntimeError("Device is not connected.")
try:
if not self.is_open() or self.device.in_waiting == 0:
return False
if self.device.in_waiting > 200:
rospy.logwarn_throttle(
print(
0.5,
"Packets are coming in much quicker than expected, upping rate...",
)
Expand All @@ -224,33 +237,33 @@ def _read_packet(self) -> bool:
assert isinstance(packed_packet, bytes)
packet = Packet.from_bytes(packed_packet)
except serial.SerialException as e:
rospy.logerr(f"Error reading packet: {e}")
print(f"Error reading packet: {e}")
return False
except OSError:
rospy.logerr_throttle(1, "Cannot read from serial device.")
print(1, "Cannot read from serial device.")
return False
if not self._correct_type(packet):
rospy.logerr(
print(
f"Received unexpected packet: {packet} (expected: {self._recv_T})",
)
return False
packet = cast(RecvPackets, packet)
self.on_packet_received(packet)
await self.on_packet_received(packet)
return True
def _process_buffer(self, _: rospy.timer.TimerEvent) -> None:
async def _process_buffer(self, _: rospy.timer.TimerEvent) -> None:
if not self.is_open():
return
try:
self._read_packet()
await self._read_packet()
except Exception as e:
rospy.logerr(f"Error reading recent packet: {e}")
print(f"Error reading recent packet: {e}")
import traceback
traceback.print_exc()
@abc.abstractmethod
def on_packet_received(self, packet: RecvPackets) -> None:
async def on_packet_received(self, packet: RecvPackets) -> None:
"""
Abstract method to be implemented by subclasses for handling packets
sent by the physical electrical board.
Expand All @@ -259,3 +272,4 @@ def on_packet_received(self, packet: RecvPackets) -> None:
packet (:class:`~.Packet`): The packet that is received.
"""
pass
'''
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def connect(self, port: str, baudrate: int) -> None:
port (str): The serial port to connect to.
baudrate (int): The baudrate to connect with.
"""
rospy.loginfo("3")
self.port = port
self.baudrate = baudrate
self.device = serial.Serial(port, baudrate, timeout=0.1)
Expand Down Expand Up @@ -113,6 +114,7 @@ def write(self, data: bytes) -> None:
self.device.write(data)

def send_packet(self, packet: SendPackets) -> None:
rospy.loginfo("4")
"""
Sends a given packet to the device.
Expand Down
1 change: 1 addition & 0 deletions mil_common/drivers/electrical_protocol/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<maintainer email="cameron.brown@todo.todo">cameron.brown</maintainer>
<license>MIT</license>
<buildtool_depend>catkin</buildtool_depend>
<exec_depend>axros</exec_depend>
<exec_depend>rospy</exec_depend>
<exec_depend>serial</exec_depend>
</package>
104 changes: 104 additions & 0 deletions mil_common/drivers/electrical_protocol/test/async_calculator_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
# from threading import Event, Thread
from typing import Union
import uvloop
from axros import NodeHandle
from electrical_protocol import AsyncSerialDevice, Packet
from std_msgs.msg import Float32, String
from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
@dataclass
class RequestAddPacket(Packet, class_id=0x37, subclass_id=0x00, payload_format="<ff"):
number_one: float
number_two: float
@dataclass
class RequestSubPacket(Packet, class_id=0x37, subclass_id=0x01, payload_format="<ff"):
start: float
minus: float
@dataclass
class AnswerPacket(Packet, class_id=0x37, subclass_id=0x02, payload_format="<f"):
result: float
class AsyncCalculatorDevice(
AsyncSerialDevice[Union[RequestAddPacket, RequestSubPacket], AnswerPacket],
):
def __init__(self, nh: NodeHandle):
self.nh = nh
print("1")
# await asyncio.gather(self.port_topic.setup(), self.start_service.setup(), self.answer_topic.setup())
async def setup(self):
# Move the async setup logic to a separate method
print("Setting up AsyncCalculatorDevice...")
# Create subscriber, service, and publisher
self.port_topic = self.nh.subscribe("~port", String, self.port_callback)
await self.port_topic.setup()
print(f"Port topic: {self.port_topic.is_running}")
self.start_service = self.nh.advertise_service("~trigger", Empty, self.trigger)
await self.start_service.setup()
print(f"Start service: {self.start_service.is_running}")
self.answer_topic = self.nh.advertise("~answer", Float32, latching=True)
print(f"Answer topic: {self.start_service.is_running}")
await self.answer_topic.setup()
# Set up the subscriber, publisher, and service asynchronously
print("AsyncCalculatorDevice setup complete.")
self.next_packet = asyncio.Event()
self.i = 0
self.transport = None
self.protocol = None
def port_callback(self, msg: String):
print("2")
if self.loop.is_running:
print("why are you running, but good")
self.connect(msg.data, 115200, self.loop)
async def trigger(self, _: EmptyRequest) -> EmptyResponse:
if self.transport == None:
print("transport is not with us")
print("4")
self.num_one, self.num_two = self.i, 1000 - self.i
self.i += 1
self.send_packet(
RequestAddPacket(number_one=self.num_one, number_two=self.num_two),
)
return EmptyResponse()
def on_packet_received(self, packet) -> None:
self.answer_topic.publish(Float32(data=packet.result))
self.next_packet.set()
async def main():
nh, remaining_args = NodeHandle.from_argv_with_remaining(
"async_calculator_device", anonymous=True,
)
async with nh:
print("Starting AsyncCalculatorDevice...")
device = AsyncCalculatorDevice(nh)
await device.setup()
if __name__ == "__main__":
uvloop.install()
asyncio.run(main())
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

<!-- <?xml version="1.0" encoding="UTF-8"?>
<launch>
<param name="/is_simulation" value="True" />
<node pkg="electrical_protocol" type="async_calculator_device.py" name="async_calculator_device" output="screen" />
<test pkg="electrical_protocol" test-name="async_test_simulated_basic" type="async_test_simulated_basic.py" />
</launch> -->
Loading

0 comments on commit 2f3e091

Please sign in to comment.