Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIO performance improvement by making AIOHelper persistent #63

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build_pynng.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@

callbacks = """
// aio callback: https://nanomsg.github.io/nng/man/tip/nng_aio_alloc.3
extern "Python" void _async_complete(void *);
extern "Python" void _recv_complete(void *);
extern "Python" void _send_complete(void *);

// nng_pipe_notify callback:
// https://nanomsg.github.io/nng/man/tip/nng_pipe_notify.3
Expand Down
134 changes: 78 additions & 56 deletions pynng/_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,63 @@
from .exceptions import check_err


# global variable for mapping asynchronous operations with the Python data
# assocated with them. Key is id(obj), value is obj
_aio_map = {}
@ffi.def_extern()
def _send_complete(void_p):
"""
This is called when an async send is complete
"""

aio = ffi.from_handle(void_p)
if aio.send_callback:
aio.send_callback()

@ffi.def_extern()
def _async_complete(void_p):
def _recv_complete(void_p):
"""
This is the callback provided to nng_aio_* functions which completes the
Python future argument passed to it. It schedules _set_future_finished
to run to complete the future associated with the event.
This is called when an async receive is complete
"""
# this is not a public interface, so asserting invariants is good.
assert isinstance(void_p, ffi.CData)
id = int(ffi.cast('size_t', void_p))

rescheduler = _aio_map.pop(id)
rescheduler()
aio = ffi.from_handle(void_p)
if aio.recv_callback:
aio.recv_callback()


def asyncio_helper(aio):
def asyncio_helper(aio_p):
"""
Returns a callable that will be passed to _async_complete. The callable is
responsible for rescheduling the event loop

"""
loop = asyncio.get_event_loop()

loop = asyncio.get_running_loop()
fut = loop.create_future()

def _set_future_finished(fut):
if not fut.cancelled():
fut.set_result(None)

def callback():
loop.call_soon_threadsafe(_set_future_finished, fut)

async def wait_for_aio():
already_called_nng_aio_cancel = False
while True:
try:
await asyncio.shield(fut)
except asyncio.CancelledError:
if not already_called_nng_aio_cancel:
lib.nng_aio_cancel(aio.aio)
already_called_nng_aio_cancel = True
if not fut.done():
lib.nng_aio_cancel(aio_p)
else:
raise asyncio.CancelledError
else:
break
err = lib.nng_aio_result(aio.aio)

err = lib.nng_aio_result(aio_p)
if err == lib.NNG_ECANCELED:
raise asyncio.CancelledError
check_err(err)

def _set_future_finished(fut):
if not fut.done():
fut.set_result(None)

def rescheduler():
loop.call_soon_threadsafe(_set_future_finished, fut)

return wait_for_aio(), rescheduler
return wait_for_aio(), callback


def trio_helper(aio):
def trio_helper(aio_p):
# Record the info needed to get back into this task
import trio
token = trio.hazmat.current_trio_token()
Expand All @@ -81,7 +82,7 @@ async def wait_for_aio():
def abort_fn(raise_cancel_arg):
# This function is called if Trio wants to cancel the operation.
# First, ask nng to cancel the operation.
lib.nng_aio_cancel(aio.aio)
lib.nng_aio_cancel(aio_p)
# nng cancellation doesn't happen immediately, so we need to save the raise_cancel function
# into the enclosing scope to call it later, after we find out if the cancellation actually happened.
nonlocal raise_cancel_fn
Expand All @@ -93,7 +94,7 @@ def abort_fn(raise_cancel_arg):
# Put the Trio task to sleep.
await trio.hazmat.wait_task_rescheduled(abort_fn)

err = lib.nng_aio_result(aio.aio)
err = lib.nng_aio_result(aio_p)
if err == lib.NNG_ECANCELED:
# This operation was successfully cancelled.
# Call the function Trio gave us, which raises the proper Trio cancellation exception
Expand All @@ -102,7 +103,6 @@ def abort_fn(raise_cancel_arg):

return wait_for_aio(), resumer


class AIOHelper:
"""
Handles the nng_aio operations for the correct event loop. This class
Expand All @@ -121,14 +121,15 @@ class AIOHelper:
# asyncio_helper to get an idea of what the functions need to do.
_aio_helper_map = {
'asyncio': asyncio_helper,
'trio': trio_helper,
'trio': trio_helper
}

def __init__(self, obj, async_backend):
# set to None now so we can know if we need to free it later
# This should be at the top of __init__ so that __del__ doesn't raise
# an unexpected AttributeError if something funky happens
self.aio = None
self.send_aio = None
self.recv_aio = None
# this is not a public interface, let's make some assertions
assert isinstance(obj, (pynng.Socket, pynng.Context))
# we need to choose the correct nng lib functions based on the type of
Expand All @@ -142,59 +143,80 @@ def __init__(self, obj, async_backend):
self._lib_arecv = lib.nng_ctx_recv
self._lib_asend = lib.nng_ctx_send
self.obj = obj

if async_backend is None:
async_backend = sniffio.current_async_library()
try:
async_backend = sniffio.current_async_library()
except sniffio.AsyncLibraryNotFoundError:
return

if async_backend not in self._aio_helper_map:
raise ValueError(
'The async backend {} is not currently supported.'
.format(async_backend)
)
self.awaitable, self.cb_arg = self._aio_helper_map[async_backend](self)
aio_p = ffi.new('nng_aio **')
_aio_map[id(self.cb_arg)] = self.cb_arg
idarg = id(self.cb_arg)
as_void = ffi.cast('void *', idarg)
lib.nng_aio_alloc(aio_p, lib._async_complete, as_void)
self.aio = aio_p[0]

self.handle = ffi.new_handle(self)

send_aio_p = ffi.new('nng_aio **')
lib.nng_aio_alloc(send_aio_p, lib._send_complete, self.handle)
self.send_aio = send_aio_p[0]

recv_aio_p = ffi.new('nng_aio **')
lib.nng_aio_alloc(recv_aio_p, lib._recv_complete, self.handle)
self.recv_aio = recv_aio_p[0]

self.helper = self._aio_helper_map[async_backend]

self.send_callback = None
self.recv_callback = None

async def arecv(self):
msg = await self.arecv_msg()
return msg.bytes

async def arecv_msg(self):
check_err(self._lib_arecv(self._nng_obj, self.aio))
await self.awaitable
check_err(lib.nng_aio_result(self.aio))
msg = lib.nng_aio_get_msg(self.aio)
(awaitable, self.recv_callback) = self.helper(self.recv_aio)
check_err(self._lib_arecv(self._nng_obj, self.recv_aio))
await awaitable
check_err(lib.nng_aio_result(self.recv_aio))
msg = lib.nng_aio_get_msg(self.recv_aio)
return pynng.Message(msg)

async def asend(self, data):
(awaitable, self.send_callback) = self.helper(self.send_aio)

msg_p = ffi.new('nng_msg **')
check_err(lib.nng_msg_alloc(msg_p, 0))
msg = msg_p[0]
check_err(lib.nng_msg_append(msg, data, len(data)))
check_err(lib.nng_aio_set_msg(self.aio, msg))
check_err(self._lib_asend(self._nng_obj, self.aio))
return await self.awaitable
check_err(lib.nng_aio_set_msg(self.send_aio, msg))
check_err(self._lib_asend(self._nng_obj, self.send_aio))
return await awaitable

async def asend_msg(self, msg):
"""
Asynchronously send a Message

"""
lib.nng_aio_set_msg(self.aio, msg._nng_msg)
check_err(self._lib_asend(self._nng_obj, self.aio))
(awaitable, self.send_callback) = self.helper(self.send_aio)
lib.nng_aio_set_msg(self.send_aio, msg._nng_msg)
check_err(self._lib_asend(self._nng_obj, self.send_aio))
msg._mem_freed = True
return await self.awaitable
return await awaitable

def _free(self):
"""
Free resources allocated with nng
"""
# TODO: Do we need to check if self.awaitable is not finished?
if self.aio is not None:
lib.nng_aio_free(self.aio)
self.aio = None
if self.send_aio is not None:
lib.nng_aio_free(self.send_aio)
self.send_aio = None

if self.recv_aio is not None:
lib.nng_aio_free(self.recv_aio)
self.recv_aio = None

def __enter__(self):
return self
Expand Down
45 changes: 19 additions & 26 deletions pynng/nng.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
# during a callback to _nng_pipe_cb
# * Cleanup background queue threads used by NNG


def _pynng_atexit():
lib.nng_fini()


atexit.register(_pynng_atexit)


Expand Down Expand Up @@ -348,6 +346,8 @@ def __init__(self, *,
if dial is not None:
self.dial(dial, block=block_on_dial)

self._aio = _aio.AIOHelper(self, self._async_backend)

def dial(self, address, *, block=None):
"""Dial the specified address.

Expand Down Expand Up @@ -464,14 +464,12 @@ def send(self, data):

async def arecv(self):
"""The asynchronous version of :meth:`~Socket.recv`"""
with _aio.AIOHelper(self, self._async_backend) as aio:
return await aio.arecv()
return await self._aio.arecv()

async def asend(self, data):
"""Asynchronous version of :meth:`~Socket.send`."""
_ensure_can_send(data)
with _aio.AIOHelper(self, self._async_backend) as aio:
return await aio.asend(data)
return await self._aio.asend(data)

def __enter__(self):
return self
Expand Down Expand Up @@ -596,6 +594,7 @@ def _try_associate_msg_with_pipe(self, msg):
# Add the pipe to the socket
msg.pipe = self._add_pipe(lib_pipe)


def recv_msg(self, block=True):
"""Receive a :class:`Message` on the socket."""
flags = 0
Expand Down Expand Up @@ -632,18 +631,16 @@ async def asend_msg(self, msg):
"""
with msg._mem_freed_lock:
msg._ensure_can_send()
with _aio.AIOHelper(self, self._async_backend) as aio:
# Note: the aio helper sets the _mem_freed flag on the msg
return await aio.asend_msg(msg)
# Note: the aio helper sets the _mem_freed flag on the msg
return await self._aio.asend_msg(msg)

async def arecv_msg(self):
"""
Asynchronously receive the :class:`Message` ``msg`` on the socket.
"""
with _aio.AIOHelper(self, self._async_backend) as aio:
msg = await aio.arecv_msg()
self._try_associate_msg_with_pipe(msg)
return msg
msg = await self._aio.arecv_msg()
self._try_associate_msg_with_pipe(msg)
return msg


class Bus0(Socket):
Expand Down Expand Up @@ -1159,7 +1156,7 @@ def service_reqs(s):

"""

def __init__(self, socket):
def __init__(self, socket, async_backend=None):
# need to set attributes first, so that if anything goes wrong,
# __del__() doesn't throw an AttributeError
self._context = None
Expand All @@ -1168,18 +1165,18 @@ def __init__(self, socket):
self._context = ffi.new('nng_ctx *')
check_err(lib.nng_ctx_open(self._context, socket.socket))

self._aio = _aio.AIOHelper(self, async_backend)

assert lib.nng_ctx_id(self.context) != -1

async def arecv(self):
"""Asynchronously receive data using this context."""
with _aio.AIOHelper(self, self._socket._async_backend) as aio:
return await aio.arecv()
return await self._aio.arecv()

async def asend(self, data):
"""Asynchronously send data using this context."""
_ensure_can_send(data)
with _aio.AIOHelper(self, self._socket._async_backend) as aio:
return await aio.asend(data)
return await self._aio.asend(data)

def recv_msg(self):
"""Synchronously receive a :class:`Message` using this context."""
Expand Down Expand Up @@ -1258,18 +1255,15 @@ async def asend_msg(self, msg):
"""
with msg._mem_freed_lock:
msg._ensure_can_send()
with _aio.AIOHelper(self, self._socket._async_backend) as aio:
# Note: the aio helper sets the _mem_freed flag on the msg
return await aio.asend_msg(msg)
return self.socket.asend_msg(msg)

async def arecv_msg(self):
"""
Asynchronously receive a :class:`Message` on the context.
"""
with _aio.AIOHelper(self, self._socket._async_backend) as aio:
msg = await aio.arecv_msg()
self._socket._try_associate_msg_with_pipe(msg)
return msg
msg = await self.socket.arecv_msg()
self._socket._try_associate_msg_with_pipe(msg)
return msg


def _do_callbacks(pipe, callbacks):
Expand All @@ -1280,7 +1274,6 @@ def _do_callbacks(pipe, callbacks):
msg = 'Exception raised in pre pipe connect callback {!r}'
logger.exception(msg.format(cb))


@ffi.def_extern()
def _nng_pipe_cb(lib_pipe, event, arg):

Expand Down
2 changes: 1 addition & 1 deletion test/test_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
addr = 'tcp://127.0.0.1:31245'


@pytest.mark.trio
@pytest.mark.asyncio
async def test_arecv_asend_asyncio():
with pynng.Pair0(listen=addr, recv_timeout=1000) as listener, \
pynng.Pair0(dial=addr) as dialer:
Expand Down