Skip to content

Commit

Permalink
Refactor udp.sendto() and pyaddr->sockaddr conversion
Browse files Browse the repository at this point in the history
* sendto() now tries to use uv_udp_try_send() first

* __convert_pyaddr_to_sockaddr() was optimized to maintain an
  internal LRU cache of resolved addresses (quite an expensive
  operation)
  • Loading branch information
1st1 committed Apr 16, 2019
1 parent c2b65bc commit 46c5e9e
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 58 deletions.
46 changes: 36 additions & 10 deletions uvloop/dns.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ cdef __convert_sockaddr_to_pyaddr(const system.sockaddr* addr):
raise convert_error(err)

return (
(<bytes>buf).decode(),
PyUnicode_FromString(buf),
system.ntohs(addr4.sin_port)
)

Expand All @@ -54,7 +54,7 @@ cdef __convert_sockaddr_to_pyaddr(const system.sockaddr* addr):
raise convert_error(err)

return (
(<bytes>buf).decode(),
PyUnicode_FromString(buf),
system.ntohs(addr6.sin6_port),
system.ntohl(addr6.sin6_flowinfo),
addr6.sin6_scope_id
Expand All @@ -63,6 +63,17 @@ cdef __convert_sockaddr_to_pyaddr(const system.sockaddr* addr):
raise RuntimeError("cannot convert sockaddr into Python object")


@cython.freelist(DEFAULT_FREELIST_SIZE)
cdef class SockAddrHolder:
cdef:
int family
system.sockaddr_storage addr
Py_ssize_t addr_size


cdef LruCache sockaddrs = LruCache(maxsize=DNS_PYADDR_TO_SOCKADDR_CACHE_SIZE)


cdef __convert_pyaddr_to_sockaddr(int family, object addr,
system.sockaddr* res):
cdef:
Expand All @@ -72,7 +83,14 @@ cdef __convert_pyaddr_to_sockaddr(int family, object addr,
int flowinfo = 0
char *buf
Py_ssize_t buflen
SockAddrHolder ret

ret = sockaddrs.get(addr, None)
if ret is not None and ret.family == family:
memcpy(res, &ret.addr, ret.addr_size)
return

ret = SockAddrHolder.__new__(SockAddrHolder)
if family == uv.AF_INET:
if not isinstance(addr, tuple):
raise TypeError('AF_INET address must be tuple')
Expand All @@ -90,7 +108,8 @@ cdef __convert_pyaddr_to_sockaddr(int family, object addr,

port = __port_to_int(port, None)

err = uv.uv_ip4_addr(host, <int>port, <system.sockaddr_in*>res)
ret.addr_size = sizeof(system.sockaddr_in)
err = uv.uv_ip4_addr(host, <int>port, <system.sockaddr_in*>&ret.addr)
if err < 0:
raise convert_error(err)

Expand Down Expand Up @@ -121,12 +140,14 @@ cdef __convert_pyaddr_to_sockaddr(int family, object addr,
if addr_len > 3:
scope_id = addr[3]

err = uv.uv_ip6_addr(host, port, <system.sockaddr_in6*>res)
ret.addr_size = sizeof(system.sockaddr_in6)

err = uv.uv_ip6_addr(host, port, <system.sockaddr_in6*>&ret.addr)
if err < 0:
raise convert_error(err)

(<system.sockaddr_in6*>res).sin6_flowinfo = flowinfo
(<system.sockaddr_in6*>res).sin6_scope_id = scope_id
(<system.sockaddr_in6*>&ret.addr).sin6_flowinfo = flowinfo
(<system.sockaddr_in6*>&ret.addr).sin6_scope_id = scope_id

elif family == uv.AF_UNIX:
if isinstance(addr, str):
Expand All @@ -139,13 +160,18 @@ cdef __convert_pyaddr_to_sockaddr(int family, object addr,
raise ValueError(
f'unix socket path {addr!r} is longer than 107 characters')

memset(res, 0, sizeof(system.sockaddr_un))
(<system.sockaddr_un*>res).sun_family = uv.AF_UNIX
memcpy((<system.sockaddr_un*>res).sun_path, buf, buflen)
ret.addr_size = sizeof(system.sockaddr_un)
memset(&ret.addr, 0, sizeof(system.sockaddr_un))
(<system.sockaddr_un*>&ret.addr).sun_family = uv.AF_UNIX
memcpy((<system.sockaddr_un*>&ret.addr).sun_path, buf, buflen)

else:
raise ValueError(
f'epected AF_INET, AF_INET6, or AF_UNIX family, got {family}')
f'expected AF_INET, AF_INET6, or AF_UNIX family, got {family}')

ret.family = family
sockaddrs[addr] = ret
memcpy(res, &ret.addr, ret.addr_size)


cdef __static_getaddrinfo(object host, object port,
Expand Down
98 changes: 57 additions & 41 deletions uvloop/handles/udp.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cdef class _UDPSendContext:
self.closed = 1
PyBuffer_Release(&self.py_buf) # void
self.req.data = NULL
self.uv_buf.base = NULL
Py_DECREF(self)
self.udp = None

Expand All @@ -34,7 +35,8 @@ cdef class _UDPSendContext:
Py_INCREF(ctx)

PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE)
ctx.uv_buf = uv.uv_buf_init(<char*>ctx.py_buf.buf, ctx.py_buf.len)
ctx.uv_buf.base = <char*>ctx.py_buf.buf
ctx.uv_buf.len = ctx.py_buf.len
ctx.udp = udp

ctx.closed = 0
Expand Down Expand Up @@ -193,37 +195,63 @@ cdef class UDPTransport(UVBaseTransport):
_UDPSendContext ctx
system.sockaddr_storage saddr_st
system.sockaddr *saddr
Py_buffer try_pybuf
uv.uv_buf_t try_uvbuf

self._ensure_alive()

if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
raise RuntimeError('UDPTransport.family is undefined; cannot send')

if addr is not None and self._family != uv.AF_UNIX:
validate_address(addr, self._family, uv.SOCK_DGRAM, 0)

ctx = _UDPSendContext.new(self, data)
try:
if addr is None:
saddr = NULL
else:
if addr is None:
saddr = NULL
else:
try:
__convert_pyaddr_to_sockaddr(self._family, addr,
<system.sockaddr*>&saddr_st)
saddr = <system.sockaddr*>(&saddr_st)
except Exception:
ctx.close()
raise

err = uv.uv_udp_send(&ctx.req,
<uv.uv_udp_t*>self._handle,
&ctx.uv_buf,
1,
saddr,
__uv_udp_on_send)

if err < 0:
ctx.close()
except (ValueError, TypeError):
raise
except Exception:
raise ValueError(
f'{addr!r}: socket family mismatch or '
f'a DNS lookup is required')
saddr = <system.sockaddr*>(&saddr_st)

if self._get_write_buffer_size() == 0:
PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE)
try_uvbuf.base = <char*>try_pybuf.buf
try_uvbuf.len = try_pybuf.len
err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle,
&try_uvbuf,
1,
saddr)
PyBuffer_Release(&try_pybuf)
else:
err = uv.UV_EAGAIN

if err == uv.UV_EAGAIN:
ctx = _UDPSendContext.new(self, data)
err = uv.uv_udp_send(&ctx.req,
<uv.uv_udp_t*>self._handle,
&ctx.uv_buf,
1,
saddr,
__uv_udp_on_send)

if err < 0:
ctx.close()

exc = convert_error(err)
self._fatal_error(exc, True)
else:
self._maybe_pause_protocol()

exc = convert_error(err)
self._fatal_error(exc, True)
else:
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
else:
self._on_sent(None)

cdef _on_receive(self, bytes data, object exc, object addr):
if exc is None:
Expand All @@ -248,15 +276,17 @@ cdef class UDPTransport(UVBaseTransport):

def sendto(self, data, addr=None):
if not data:
# Replicating asyncio logic here.
return

if self._conn_lost:
# TODO add warning
# Replicating asyncio logic here.
if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
aio_logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return

self._send(data, addr)
self._maybe_pause_protocol()


cdef void __uv_udp_on_receive(uv.uv_udp_t* handle,
Expand Down Expand Up @@ -357,17 +387,3 @@ cdef void __uv_udp_on_send(uv.uv_udp_send_t* req, int status) with gil:
udp._on_sent(exc)
except BaseException as exc:
udp._error(exc, False)


@ft_lru_cache()
def validate_address(object addr, int sock_family, int sock_type,
int sock_proto):
addrinfo = __static_getaddrinfo_pyaddr(
addr[0], addr[1],
uv.AF_UNSPEC, sock_type, sock_proto, 0)
if addrinfo is None:
raise ValueError(
'UDP.sendto(): address {!r} requires a DNS lookup'.format(addr))
if addrinfo[0] != sock_family:
raise ValueError(
'UDP.sendto(): {!r} socket family mismatch'.format(addr))
1 change: 1 addition & 0 deletions uvloop/includes/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ DEF FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
DEF FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB

DEF DEFAULT_FREELIST_SIZE = 250
DEF DNS_PYADDR_TO_SOCKADDR_CACHE_SIZE = 2048

DEF DEBUG_STACK_DEPTH = 10

Expand Down
2 changes: 2 additions & 0 deletions uvloop/includes/python.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cdef extern from "Python.h":
int PY_VERSION_HEX

unicode PyUnicode_FromString(const char *)

void* PyMem_RawMalloc(size_t n)
void* PyMem_RawRealloc(void *p, size_t n)
void* PyMem_RawCalloc(size_t nelem, size_t elsize)
Expand Down
1 change: 0 additions & 1 deletion uvloop/includes/stdlib.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ cdef errno_EBADF = errno.EBADF
cdef errno_EINVAL = errno.EINVAL

cdef ft_partial = functools.partial
cdef ft_lru_cache = functools.lru_cache

cdef gc_disable = gc.disable

Expand Down
7 changes: 3 additions & 4 deletions uvloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ cdef extern from "uv.h" nogil:
const system.sockaddr* addr,
unsigned flags) with gil

# Buffers

uv_buf_t uv_buf_init(char* base, unsigned int len)

# Generic request functions
int uv_cancel(uv_req_t* req)

Expand Down Expand Up @@ -376,6 +372,9 @@ cdef extern from "uv.h" nogil:
int uv_udp_send(uv_udp_send_t* req, uv_udp_t* handle,
const uv_buf_t bufs[], unsigned int nbufs,
const system.sockaddr* addr, uv_udp_send_cb send_cb)
int uv_udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[], unsigned int nbufs,
const system.sockaddr* addr)
int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb)
int uv_udp_recv_stop(uv_udp_t* handle)
Expand Down
5 changes: 3 additions & 2 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ from .includes.python cimport (
PyMemoryView_FromMemory, PyBUF_WRITE,
PyMemoryView_FromObject, PyMemoryView_Check,
PyOS_AfterFork_Parent, PyOS_AfterFork_Child,
PyOS_BeforeFork
PyOS_BeforeFork,
PyUnicode_FromString
)
from .includes.flowcontrol cimport add_flowcontrol_defaults

Expand Down Expand Up @@ -2962,7 +2963,6 @@ cdef class Loop:

if rads is not None:
rai = (<AddrInfo>rads).data
sock = udp._get_socket()
while rai is not NULL:
if rai.ai_family != lai.ai_family:
rai = rai.ai_next
Expand Down Expand Up @@ -3088,6 +3088,7 @@ class _SyncSocketWriterFuture(aio_Future):

include "cbhandles.pyx"
include "pseudosock.pyx"
include "lru.pyx"

include "handles/handle.pyx"
include "handles/async_.pyx"
Expand Down
79 changes: 79 additions & 0 deletions uvloop/lru.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
cdef object _LRU_MARKER = object()


@cython.final
cdef class LruCache:

cdef:
object _dict
int _maxsize
object _dict_move_to_end
object _dict_get

# We use an OrderedDict for LRU implementation. Operations:
#
# * We use a simple `__setitem__` to push a new entry:
# `entries[key] = new_entry`
# That will push `new_entry` to the *end* of the entries dict.
#
# * When we have a cache hit, we call
# `entries.move_to_end(key, last=True)`
# to move the entry to the *end* of the entries dict.
#
# * When we need to remove entries to maintain `max_size`, we call
# `entries.popitem(last=False)`
# to remove an entry from the *beginning* of the entries dict.
#
# So new entries and hits are always promoted to the end of the
# entries dict, whereas the unused one will group in the
# beginning of it.

def __init__(self, *, maxsize):
if maxsize <= 0:
raise ValueError(
f'maxsize is expected to be greater than 0, got {maxsize}')

self._dict = col_OrderedDict()
self._dict_move_to_end = self._dict.move_to_end
self._dict_get = self._dict.get
self._maxsize = maxsize

cdef get(self, key, default):
o = self._dict_get(key, _LRU_MARKER)
if o is _LRU_MARKER:
return default
self._dict_move_to_end(key) # last=True
return o

cdef inline needs_cleanup(self):
return len(self._dict) > self._maxsize

cdef inline cleanup_one(self):
k, _ = self._dict.popitem(last=False)
return k

def __getitem__(self, key):
o = self._dict[key]
self._dict_move_to_end(key) # last=True
return o

def __setitem__(self, key, o):
if key in self._dict:
self._dict[key] = o
self._dict_move_to_end(key) # last=True
else:
self._dict[key] = o
while self.needs_cleanup():
self.cleanup_one()

def __delitem__(self, key):
del self._dict[key]

def __contains__(self, key):
return key in self._dict

def __len__(self):
return len(self._dict)

def __iter__(self):
return iter(self._dict)

0 comments on commit 46c5e9e

Please sign in to comment.