Skip to content

Commit

Permalink
When a socket/fd is closed, wake up outstanding waiters
Browse files Browse the repository at this point in the history
Fixes gh-36, gh-459
  • Loading branch information
njsmith committed Jul 16, 2018
1 parent 79d6664 commit 0c3a8f8
Show file tree
Hide file tree
Showing 24 changed files with 392 additions and 136 deletions.
2 changes: 2 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,8 @@ Exceptions and warnings

.. autoexception:: ResourceBusyError

.. autoexception:: ClosedResourceError

.. autoexception:: RunFinishedError

.. autoexception:: TrioInternalError
Expand Down
40 changes: 40 additions & 0 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,28 @@ All environments provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given socket to
become writable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_socket_close` while this
function is still working.


.. function:: notify_socket_close(sock)

Notifies Trio's internal I/O machinery that you are about to close
a socket.

This causes any operations currently waiting for this socket to
immediately raise :exc:`ClosedResourceError`.

This does *not* actually close the socket. Generally when closing a
socket, you should first call this function, and then close the
socket.

The given object *must* be exactly of type :func:`socket.socket`,
nothing else.

:raises TypeError:
if the given object is not of type :func:`socket.socket`.


Unix-specific API
Expand All @@ -174,6 +196,9 @@ Unix-like systems provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given fd to
become readable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_fd_close` while this
function is still working.


.. function:: wait_writable(fd)
Expand All @@ -192,6 +217,21 @@ Unix-like systems provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given fd to
become writable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_fd_close` while this
function is still working.

.. function:: notify_fd_close(fd):

Notifies Trio's internal I/O machinery that you are about to close
a file descriptor.

This causes any operations currently waiting for this file
descriptor to immediately raise :exc:`ClosedResourceError`.

This does *not* actually close the file descriptor. Generally when
closing a file descriptor, you should first call this function, and
then actually close it.


Kqueue-specific API
Expand Down
5 changes: 0 additions & 5 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ Abstract base classes

.. autoexception:: BrokenStreamError

.. autoexception:: ClosedStreamError

.. currentmodule:: trio.abc

.. autoclass:: trio.abc.Listener
Expand All @@ -158,9 +156,6 @@ Abstract base classes

.. currentmodule:: trio

.. autoexception:: ClosedListenerError



Generic stream tools
~~~~~~~~~~~~~~~~~~~~
Expand Down
18 changes: 18 additions & 0 deletions newsfragments/36.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Suppose one task is blocked trying to use a resource – for example,
reading from a socket – and while it's doing this, another task closes
the resource. Previously, this produced undefined behavior. Now,
closing a resource causes pending operations on that resource to
terminate immediately with a :exc:`ClosedResourceError`.

``ClosedStreamError`` and ``ClosedListenerError`` are now aliases for
:exc:`ClosedResourceError`, and deprecated.

For this to work, Trio needs to know when a resource has been closed.
To facilitate this, new functions have been added:
:func:`trio.hazmat.notify_fd_close` and
:func:`trio.hazmat.notify_socket_close`. If you're using Trio's
built-in wrappers like :cls:`~trio.SocketStream` or
:mod:`trio.socket`, then you don't need to worry about this, but if
you're using the low-level functions like
:func:`trio.hazmat.wait_readable`, you should make sure to call these
functions at appropriate times.
18 changes: 18 additions & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@
from . import ssl
# Not imported by default: testing

_deprecate.enable_attribute_deprecations(__name__)
__deprecated_attributes__ = {
"ClosedStreamError":
_deprecate.DeprecatedAttribute(
ClosedResourceError,
"0.5.0",
issue=36,
instead=ClosedResourceError
),
"ClosedListenerError":
_deprecate.DeprecatedAttribute(
ClosedResourceError,
"0.5.0",
issue=36,
instead=ClosedResourceError
),
}

_deprecate.enable_attribute_deprecations(hazmat.__name__)

# Temporary hack to make sure _result is loaded, just during the deprecation
Expand Down
24 changes: 19 additions & 5 deletions trio/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ async def aclose(self):
If the resource is already closed, then this method should silently
succeed.
Once this method completes, any other pending or future operations on
this resource should generally raise :exc:`ClosedResourceError`,
unless there's a good reason to do otherwise.
See also: :func:`trio.aclose_forcefully`.
"""
Expand Down Expand Up @@ -297,7 +301,9 @@ async def send_all(self, data):
:meth:`HalfCloseableStream.send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`send_all` is running.
Most low-level operations in trio provide a guarantee: if they raise
:exc:`trio.Cancelled`, this means that they had no effect, so the
Expand Down Expand Up @@ -328,7 +334,9 @@ async def wait_send_all_might_not_block(self):
:meth:`HalfCloseableStream.send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`wait_send_all_might_not_block` is running.
Note:
Expand Down Expand Up @@ -402,7 +410,9 @@ async def receive_some(self, max_bytes):
:meth:`receive_some` on the same stream at the same time.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`receive_some` is running.
"""

Expand Down Expand Up @@ -470,7 +480,9 @@ async def send_eof(self):
:meth:`send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`send_eof` is running.
"""

Expand All @@ -494,7 +506,9 @@ async def accept(self):
Raises:
trio.ResourceBusyError: if two tasks attempt to call
:meth:`accept` on the same listener at the same time.
trio.ClosedListenerError: if you already closed this listener.
trio.ClosedResourceError: if you previously closed this listener
object, or if another task closes this listener object while
:meth:`accept` is running.
Note that there is no ``BrokenListenerError``, because for listeners
there is no general condition of "the network/remote peer broke the
Expand Down
9 changes: 8 additions & 1 deletion trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ async def wait_socket_writable(sock):
raise TypeError("need a socket")
await wait_writable(sock)

__all__ += ["wait_socket_readable", "wait_socket_writable"]
def notify_socket_close(sock):
if type(sock) != _stdlib_socket.socket:
raise TypeError("need a socket")
notify_fd_close(sock)

__all__ += [
"wait_socket_readable", "wait_socket_writable", "notify_socket_close"
]
14 changes: 14 additions & 0 deletions trio/_core/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"WouldBlock",
"Cancelled",
"ResourceBusyError",
"ClosedResourceError",
]


Expand Down Expand Up @@ -103,3 +104,16 @@ class ResourceBusyError(Exception):
the data get scrambled.
"""


class ClosedResourceError(Exception):
"""Raised when attempting to use a resource after it has been closed.
Note that "closed" here means that *your* code closed the resource,
generally by calling a method with a name like ``close`` or ``aclose``. If
a problem arises elsewhere – for example, because of a network failure, or
because a remote peer closed their end of a connection – then that should
be indicated by a different exception class, like :exc:`BrokenStreamError`
or an :exc:`OSError` subclass.
"""
24 changes: 24 additions & 0 deletions trio/_core/_io_epoll.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import select
import attr
import outcome

from .. import _core
from . import _public
Expand Down Expand Up @@ -122,3 +123,26 @@ async def wait_readable(self, fd):
@_public
async def wait_writable(self, fd):
await self._epoll_wait(fd, "write_task")

@_public
def notify_fd_close(self, fd):
if not isinstance(fd, int):
fd = fd.fileno()
if fd not in self._registered:
return

waiters = self._registered[fd]

def interrupt(task):
exc = _core.ClosedResourceError("another task closed this fd")
_core.reschedule(task, outcome.Error(exc))

if waiters.write_task is not None:
interrupt(waiters.write_task)
waiters.write_task = None

if waiters.read_task is not None:
interrupt(waiters.read_task)
waiters.read_task = None

self._update_registrations(fd, True)
25 changes: 25 additions & 0 deletions trio/_core/_io_kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,28 @@ async def wait_readable(self, fd):
@_public
async def wait_writable(self, fd):
await self._wait_common(fd, select.KQ_FILTER_WRITE)

@_public
def notify_fd_close(self, fd):
if not isinstance(fd, int):
fd = fd.fileno()

for filter in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]:
key = (fd, filter)
receiver = self._registered.get(key)

if receiver is None:
continue

if type(receiver) is _core.Task:
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
self._kqueue.control([event], 0)
exc = _core.ClosedResourceError("another task closed this fd")
_core.reschedule(receiver, outcome.Error(exc))
del self._registered[key]
else:
# XX this is an interesting example of a case where being able
# to close a queue would be useful...
raise NotImplementedError(
"can't close an fd that monitor_kevent is using"
)
13 changes: 13 additions & 0 deletions trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,19 @@ async def wait_socket_readable(self, sock):
async def wait_socket_writable(self, sock):
await self._wait_socket("write", sock)

@_public
def notify_socket_close(self, sock):
if type(sock) is not stdlib_socket.socket:
raise TypeError("need a stdlib socket")

for mode in ["read", "write"]:
if sock in self._socket_waiters[mode]:
task = self._socket_waiters[mode].pop(sock)
exc = _core.ClosedResourceError(
"another task closed this socket"
)
_core.reschedule(task, outcome.Error(exc))

# This has cffi-isms in it and is untested... but it demonstrates the
# logic we'll want when we start actually using overlapped I/O.
#
Expand Down
43 changes: 40 additions & 3 deletions trio/_core/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def socketpair():

wait_readable_options = [_core.wait_socket_readable]
wait_writable_options = [_core.wait_socket_writable]
notify_close_options = [_core.notify_socket_close]
if hasattr(_core, "wait_readable"):
wait_readable_options.append(_core.wait_readable)

Expand All @@ -52,16 +53,27 @@ async def wait_writable_fd(fileobj):
return await _core.wait_writable(fileobj.fileno())

wait_writable_options.append(wait_writable_fd)
if hasattr(_core, "notify_fd_close"):
notify_close_options.append(_core.notify_fd_close)

# Decorators that feed in different settings for wait_readable / wait_writable.
# Note that if you use both decorators on the same test, it will run all
# N**2 *combinations*
def notify_fd_close_rawfd(fileobj):
_core.notify_fd_close(fileobj.fileno())

notify_close_options.append(notify_fd_close_rawfd)

# Decorators that feed in different settings for wait_readable / wait_writable
# / notify_close.
# Note that if you use all three decorators on the same test, it will run all
# N**3 *combinations*
read_socket_test = pytest.mark.parametrize(
"wait_readable", wait_readable_options, ids=lambda fn: fn.__name__
)
write_socket_test = pytest.mark.parametrize(
"wait_writable", wait_writable_options, ids=lambda fn: fn.__name__
)
notify_close_test = pytest.mark.parametrize(
"notify_close", notify_close_options, ids=lambda fn: fn.__name__
)


async def test_wait_socket_type_checking(socketpair):
Expand Down Expand Up @@ -179,6 +191,31 @@ async def test_double_write(socketpair, wait_writable):
nursery.cancel_scope.cancel()


@read_socket_test
@write_socket_test
@notify_close_test
async def test_interrupted_by_close(
socketpair, wait_readable, wait_writable, notify_close
):
a, b = socketpair

async def reader():
with pytest.raises(_core.ClosedResourceError):
await wait_readable(a)

async def writer():
with pytest.raises(_core.ClosedResourceError):
await wait_writable(a)

fill_socket(a)

async with _core.open_nursery() as nursery:
nursery.start_soon(reader)
nursery.start_soon(writer)
await wait_all_tasks_blocked()
notify_close(a)


@read_socket_test
@write_socket_test
async def test_socket_simultaneous_read_write(
Expand Down
Loading

0 comments on commit 0c3a8f8

Please sign in to comment.