A Python package that combines asyncio and pySerial.
import asyncio
import aioserial
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
asyncio.run(read_and_print(aioserial.AioSerial(port='COM1')))
The example usage from pyserial-asyncio
https://pyserial-asyncio.readthedocs.io/en/latest/shortintro.html
import asyncio
import serial_asyncio
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False # You can manipulate Serial object via transport
transport.write(b'Hello, World!\n') # Write serial data via transport
def data_received(self, data):
print('data received', repr(data))
if b'\n' in data:
self.transport.close()
def connection_lost(self, exc):
print('port closed')
self.transport.loop.stop()
def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())
def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
aioserial equivalence
import asyncio
import aioserial
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
data: bytes = await aioserial_instance.read_async()
print(data.decode(errors='ignore'), end='', flush=True)
if b'\n' in data:
aioserial_instance.close()
break
aioserial_instance: aioserial.AioSerial = aioserial.AioSerial(port='/dev/ttyUSB0', baudrate=115200)
asyncio.run(asyncio.gather(read_and_print(aioserial_instance), aioserial_instance.write_async(b'Hello, World!\n')))
>>> import aioserial
>>> import serial
>>> isinstance(aioserial.AioSerial(), serial.Serial)
True
>>> issubclass(aioserial.AioSerial, serial.Serial)
True
>>> aioserial.Serial is serial.Serial
True
aioserial_instance: aioserial.AioSerial = aioserial.AioSerial(
# ... same with what can be passed to serial.Serial ...,
loop: Optional[asyncio.AbstractEventLoop] = None,
cancel_read_timeout: int = 1,
cancel_write_timeout: int = 1)
bytes_read: bytes = \
await aioserial_instance.read_async(size: int = 1)
at_most_certain_size_of_bytes_read: bytes = \
await aioserial_instance.read_until_async(
expected: bytes = aioserial.LF, size: Optional[int] = None)
number_of_byte_read: int = \
await aioserial_instance.readinto_async(b: Union[array.array, bytearray])
a_line_of_at_most_certain_size_of_bytes_read: bytes = \
await aioserial_instance.readline_async(size: int = -1)
lines_of_at_most_certain_size_of_bytes_read: bytes = \
await aioserial_instance.readlines_async(hint: int = -1)
number_of_byte_like_data_written: int = \
await aioserial_instance.write_async(bytes_like_data)
number_of_byte_like_data_in_the_given_list_written: int = \
await aioserial_instance.writelines_async(list_of_bytes_like_data)
All the other APIs in the mother package pySerial are supported in aioserial as-is.
- Want to use an asyncio-based but not a (self-built) thread-based serial library.
- pySerial-asyncio does not support Windows.
- APIs in all the other packages (pySerial-asyncio, asyncserial) that target the same goal are not designed in high level.