From 02f9afece35daafb9d9f3edcd1805fcb7f50d802 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 27 Sep 2019 16:36:17 -0400 Subject: [PATCH] Revert "bpo-36889: Merge asyncio streams (GH-13251)" Reverts changes made in 3.8 to asyncio streams (23b4b697e5b6cc897696f9c0288c187d2d24bff2). --- Lib/asyncio/__init__.py | 38 -- Lib/asyncio/streams.py | 581 +++++-------------- Lib/asyncio/subprocess.py | 35 +- Lib/asyncio/windows_events.py | 2 +- Lib/test/test___all__.py | 36 +- Lib/test/test_asyncio/test_base_events.py | 5 +- Lib/test/test_asyncio/test_buffered_proto.py | 7 +- Lib/test/test_asyncio/test_pep492.py | 4 +- Lib/test/test_asyncio/test_server.py | 10 +- Lib/test/test_asyncio/test_sslproto.py | 37 +- Lib/test/test_asyncio/test_streams.py | 561 ++++++------------ Lib/test/test_asyncio/test_windows_events.py | 14 +- 12 files changed, 391 insertions(+), 939 deletions(-) diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index a6a29dbfecd507..28c2e2c429f34a 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -3,7 +3,6 @@ # flake8: noqa import sys -import warnings # This relies on each of the submodules having an __all__ variable. from .base_events import * @@ -44,40 +43,3 @@ else: from .unix_events import * # pragma: no cover __all__ += unix_events.__all__ - - -__all__ += ('StreamReader', 'StreamWriter', 'StreamReaderProtocol') # deprecated - - -def __getattr__(name): - global StreamReader, StreamWriter, StreamReaderProtocol - if name == 'StreamReader': - warnings.warn("StreamReader is deprecated since Python 3.8 " - "in favor of Stream, and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamReader as sr - StreamReader = sr - return StreamReader - if name == 'StreamWriter': - warnings.warn("StreamWriter is deprecated since Python 3.8 " - "in favor of Stream, and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamWriter as sw - StreamWriter = sw - return StreamWriter - if name == 'StreamReaderProtocol': - warnings.warn("Using asyncio internal class StreamReaderProtocol " - "is deprecated since Python 3.8 " - " and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamReaderProtocol as srp - StreamReaderProtocol = srp - return StreamReaderProtocol - - raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 204eaf7394c5bb..7639add8370934 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -1,19 +1,14 @@ __all__ = ( - 'Stream', 'StreamMode', - 'open_connection', 'start_server', - 'connect', 'connect_read_pipe', 'connect_write_pipe', - 'StreamServer') + 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', + 'open_connection', 'start_server') -import enum import socket import sys import warnings import weakref if hasattr(socket, 'AF_UNIX'): - __all__ += ('open_unix_connection', 'start_unix_server', - 'connect_unix', - 'UnixStreamServer') + __all__ += ('open_unix_connection', 'start_unix_server') from . import coroutines from . import events @@ -21,134 +16,12 @@ from . import format_helpers from . import protocols from .log import logger -from . import tasks +from .tasks import sleep _DEFAULT_LIMIT = 2 ** 16 # 64 KiB -class StreamMode(enum.Flag): - READ = enum.auto() - WRITE = enum.auto() - READWRITE = READ | WRITE - - -def _ensure_can_read(mode): - if not mode & StreamMode.READ: - raise RuntimeError("The stream is write-only") - - -def _ensure_can_write(mode): - if not mode & StreamMode.WRITE: - raise RuntimeError("The stream is read-only") - - -class _ContextManagerHelper: - __slots__ = ('_awaitable', '_result') - - def __init__(self, awaitable): - self._awaitable = awaitable - self._result = None - - def __await__(self): - return self._awaitable.__await__() - - async def __aenter__(self): - ret = await self._awaitable - result = await ret.__aenter__() - self._result = result - return result - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return await self._result.__aexit__(exc_type, exc_val, exc_tb) - - -def connect(host=None, port=None, *, - limit=_DEFAULT_LIMIT, - ssl=None, family=0, proto=0, - flags=0, sock=None, local_addr=None, - server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): - # Design note: - # Don't use decorator approach but exilicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect(host, port, limit, - ssl, family, proto, - flags, sock, local_addr, - server_hostname, - ssl_handshake_timeout, - happy_eyeballs_delay, - interleave)) - - -async def _connect(host, port, - limit, - ssl, family, proto, - flags, sock, local_addr, - server_hostname, - ssl_handshake_timeout, - happy_eyeballs_delay, interleave): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READWRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.create_connection( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - host, port, - ssl=ssl, family=family, proto=proto, - flags=flags, sock=sock, local_addr=local_addr, - server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave) - return stream - - -def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT): - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_read_pipe(pipe, limit)) - - -async def _connect_read_pipe(pipe, limit): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READ, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.connect_read_pipe( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - pipe) - return stream - - -def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT): - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_write_pipe(pipe, limit)) - - -async def _connect_write_pipe(pipe, limit): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.WRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.connect_write_pipe( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - pipe) - return stream - - async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. @@ -168,11 +41,6 @@ async def open_connection(host=None, port=None, *, StreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.) """ - warnings.warn("open_connection() is deprecated since Python 3.8 " - "in favor of connect(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -183,7 +51,8 @@ async def open_connection(host=None, port=None, *, protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True) transport, _ = await loop.create_connection( lambda: protocol, host, port, **kwds) - writer = StreamWriter(transport, protocol, reader, loop) + writer = StreamWriter(transport, protocol, reader, loop, + _asyncio_internal=True) return reader, writer @@ -210,11 +79,6 @@ async def start_server(client_connected_cb, host=None, port=None, *, The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service. """ - warnings.warn("start_server() is deprecated since Python 3.8 " - "in favor of StreamServer(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -223,7 +87,8 @@ async def start_server(client_connected_cb, host=None, port=None, *, DeprecationWarning, stacklevel=2) def factory(): - reader = StreamReader(limit=limit, loop=loop) + reader = StreamReader(limit=limit, loop=loop, + _asyncio_internal=True) protocol = StreamReaderProtocol(reader, client_connected_cb, loop=loop, _asyncio_internal=True) @@ -232,194 +97,12 @@ def factory(): return await loop.create_server(factory, host, port, **kwds) -class _BaseStreamServer: - # Design notes. - # StreamServer and UnixStreamServer are exposed as FINAL classes, - # not function factories. - # async with serve(host, port) as server: - # server.start_serving() - # looks ugly. - # The class doesn't provide API for enumerating connected streams - # It can be a subject for improvements in Python 3.9 - - _server_impl = None - - def __init__(self, client_connected_cb, - /, - limit=_DEFAULT_LIMIT, - shutdown_timeout=60, - _asyncio_internal=False): - if not _asyncio_internal: - raise RuntimeError("_ServerStream is a private asyncio class") - self._client_connected_cb = client_connected_cb - self._limit = limit - self._loop = events.get_running_loop() - self._streams = {} - self._shutdown_timeout = shutdown_timeout - - def __init_subclass__(cls): - if not cls.__module__.startswith('asyncio.'): - raise TypeError(f"asyncio.{cls.__name__} " - "class cannot be inherited from") - - async def bind(self): - if self._server_impl is not None: - return - self._server_impl = await self._bind() - - def is_bound(self): - return self._server_impl is not None - - @property - def sockets(self): - # multiple value for socket bound to both IPv4 and IPv6 families - if self._server_impl is None: - return () - return self._server_impl.sockets - - def is_serving(self): - if self._server_impl is None: - return False - return self._server_impl.is_serving() - - async def start_serving(self): - await self.bind() - await self._server_impl.start_serving() - - async def serve_forever(self): - await self.start_serving() - await self._server_impl.serve_forever() - - async def close(self): - if self._server_impl is None: - return - self._server_impl.close() - streams = list(self._streams.keys()) - active_tasks = list(self._streams.values()) - if streams: - await tasks.wait([stream.close() for stream in streams]) - await self._server_impl.wait_closed() - self._server_impl = None - await self._shutdown_active_tasks(active_tasks) - - async def abort(self): - if self._server_impl is None: - return - self._server_impl.close() - streams = list(self._streams.keys()) - active_tasks = list(self._streams.values()) - if streams: - await tasks.wait([stream.abort() for stream in streams]) - await self._server_impl.wait_closed() - self._server_impl = None - await self._shutdown_active_tasks(active_tasks) - - async def __aenter__(self): - await self.bind() - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - await self.close() - - def _attach(self, stream, task): - self._streams[stream] = task - - def _detach(self, stream, task): - del self._streams[stream] - - async def _shutdown_active_tasks(self, active_tasks): - if not active_tasks: - return - # NOTE: tasks finished with exception are reported - # by the Task.__del__() method. - done, pending = await tasks.wait(active_tasks, - timeout=self._shutdown_timeout) - if not pending: - return - for task in pending: - task.cancel() - done, pending = await tasks.wait(pending, - timeout=self._shutdown_timeout) - for task in pending: - self._loop.call_exception_handler({ - "message": (f'{task!r} ignored cancellation request ' - f'from a closing {self!r}'), - "stream_server": self - }) - - def __repr__(self): - ret = [f'{self.__class__.__name__}'] - if self.is_serving(): - ret.append('serving') - if self.sockets: - ret.append(f'sockets={self.sockets!r}') - return '<' + ' '.join(ret) + '>' - - def __del__(self, _warn=warnings.warn): - if self._server_impl is not None: - _warn(f"unclosed stream server {self!r}", - ResourceWarning, source=self) - self._server_impl.close() - - -class StreamServer(_BaseStreamServer): - - def __init__(self, client_connected_cb, /, host=None, port=None, *, - limit=_DEFAULT_LIMIT, - family=socket.AF_UNSPEC, - flags=socket.AI_PASSIVE, sock=None, backlog=100, - ssl=None, reuse_address=None, reuse_port=None, - ssl_handshake_timeout=None, - shutdown_timeout=60): - super().__init__(client_connected_cb, - limit=limit, - shutdown_timeout=shutdown_timeout, - _asyncio_internal=True) - self._host = host - self._port = port - self._family = family - self._flags = flags - self._sock = sock - self._backlog = backlog - self._ssl = ssl - self._reuse_address = reuse_address - self._reuse_port = reuse_port - self._ssl_handshake_timeout = ssl_handshake_timeout - - async def _bind(self): - def factory(): - protocol = _ServerStreamProtocol(self, - self._limit, - self._client_connected_cb, - loop=self._loop, - _asyncio_internal=True) - return protocol - return await self._loop.create_server( - factory, - self._host, - self._port, - start_serving=False, - family=self._family, - flags=self._flags, - sock=self._sock, - backlog=self._backlog, - ssl=self._ssl, - reuse_address=self._reuse_address, - reuse_port=self._reuse_port, - ssl_handshake_timeout=self._ssl_handshake_timeout) - - if hasattr(socket, 'AF_UNIX'): # UNIX Domain Sockets are supported on this platform async def open_unix_connection(path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `open_connection` but works with UNIX Domain Sockets.""" - warnings.warn("open_unix_connection() is deprecated since Python 3.8 " - "in favor of connect_unix(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -431,58 +114,13 @@ async def open_unix_connection(path=None, *, _asyncio_internal=True) transport, _ = await loop.create_unix_connection( lambda: protocol, path, **kwds) - writer = StreamWriter(transport, protocol, reader, loop) + writer = StreamWriter(transport, protocol, reader, loop, + _asyncio_internal=True) return reader, writer - - def connect_unix(path=None, *, - limit=_DEFAULT_LIMIT, - ssl=None, sock=None, - server_hostname=None, - ssl_handshake_timeout=None): - """Similar to `connect()` but works with UNIX Domain Sockets.""" - # Design note: - # Don't use decorator approach but exilicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_unix(path, - limit, - ssl, sock, - server_hostname, - ssl_handshake_timeout)) - - - async def _connect_unix(path, - limit, - ssl, sock, - server_hostname, - ssl_handshake_timeout): - """Similar to `connect()` but works with UNIX Domain Sockets.""" - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READWRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.create_unix_connection( - lambda: _StreamProtocol(stream, - loop=loop, - _asyncio_internal=True), - path, - ssl=ssl, - sock=sock, - server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) - return stream - - async def start_unix_server(client_connected_cb, path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `start_server` but works with UNIX Domain Sockets.""" - warnings.warn("start_unix_server() is deprecated since Python 3.8 " - "in favor of UnixStreamServer(), and scheduled " - "for removal in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -491,7 +129,8 @@ async def start_unix_server(client_connected_cb, path=None, *, DeprecationWarning, stacklevel=2) def factory(): - reader = StreamReader(limit=limit, loop=loop) + reader = StreamReader(limit=limit, loop=loop, + _asyncio_internal=True) protocol = StreamReaderProtocol(reader, client_connected_cb, loop=loop, _asyncio_internal=True) @@ -499,42 +138,6 @@ def factory(): return await loop.create_unix_server(factory, path, **kwds) - class UnixStreamServer(_BaseStreamServer): - - def __init__(self, client_connected_cb, /, path=None, *, - limit=_DEFAULT_LIMIT, - sock=None, - backlog=100, - ssl=None, - ssl_handshake_timeout=None, - shutdown_timeout=60): - super().__init__(client_connected_cb, - limit=limit, - shutdown_timeout=shutdown_timeout, - _asyncio_internal=True) - self._path = path - self._sock = sock - self._backlog = backlog - self._ssl = ssl - self._ssl_handshake_timeout = ssl_handshake_timeout - - async def _bind(self): - def factory(): - protocol = _ServerStreamProtocol(self, - self._limit, - self._client_connected_cb, - loop=self._loop, - _asyncio_internal=True) - return protocol - return await self._loop.create_unix_server( - factory, - self._path, - start_serving=False, - sock=self._sock, - backlog=self._backlog, - ssl=self._ssl, - ssl_handshake_timeout=self._ssl_handshake_timeout) - class FlowControlMixin(protocols.Protocol): """Reusable flow control logic for StreamWriter.drain(). @@ -613,8 +216,6 @@ def _get_close_waiter(self, stream): raise NotImplementedError -# begin legacy stream APIs - class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. @@ -624,47 +225,105 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): call inappropriate methods of the protocol.) """ + _source_traceback = None + def __init__(self, stream_reader, client_connected_cb=None, loop=None, *, _asyncio_internal=False): super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) - self._stream_reader = stream_reader + if stream_reader is not None: + self._stream_reader_wr = weakref.ref(stream_reader, + self._on_reader_gc) + self._source_traceback = stream_reader._source_traceback + else: + self._stream_reader_wr = None + if client_connected_cb is not None: + # This is a stream created by the `create_server()` function. + # Keep a strong reference to the reader until a connection + # is established. + self._strong_reader = stream_reader + self._reject_connection = False self._stream_writer = None + self._transport = None self._client_connected_cb = client_connected_cb self._over_ssl = False self._closed = self._loop.create_future() + def _on_reader_gc(self, wr): + transport = self._transport + if transport is not None: + # connection_made was called + context = { + 'message': ('An open stream object is being garbage ' + 'collected; call "stream.close()" explicitly.') + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + transport.abort() + else: + self._reject_connection = True + self._stream_reader_wr = None + + @property + def _stream_reader(self): + if self._stream_reader_wr is None: + return None + return self._stream_reader_wr() + def connection_made(self, transport): - self._stream_reader.set_transport(transport) + if self._reject_connection: + context = { + 'message': ('An open stream was garbage collected prior to ' + 'establishing network connection; ' + 'call "stream.close()" explicitly.') + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + transport.abort() + return + self._transport = transport + reader = self._stream_reader + if reader is not None: + reader.set_transport(transport) self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, - self._stream_reader, - self._loop) - res = self._client_connected_cb(self._stream_reader, + reader, + self._loop, + _asyncio_internal=True) + res = self._client_connected_cb(reader, self._stream_writer) if coroutines.iscoroutine(res): self._loop.create_task(res) + self._strong_reader = None def connection_lost(self, exc): - if self._stream_reader is not None: + reader = self._stream_reader + if reader is not None: if exc is None: - self._stream_reader.feed_eof() + reader.feed_eof() else: - self._stream_reader.set_exception(exc) + reader.set_exception(exc) if not self._closed.done(): if exc is None: self._closed.set_result(None) else: self._closed.set_exception(exc) super().connection_lost(exc) - self._stream_reader = None + self._stream_reader_wr = None self._stream_writer = None + self._transport = None def data_received(self, data): - self._stream_reader.feed_data(data) + reader = self._stream_reader + if reader is not None: + reader.feed_data(data) def eof_received(self): - self._stream_reader.feed_eof() + reader = self._stream_reader + if reader is not None: + reader.feed_eof() if self._over_ssl: # Prevent a warning in SSLProtocol.eof_received: # "returning true from eof_received() @@ -672,6 +331,9 @@ def eof_received(self): return False return True + def _get_close_waiter(self, stream): + return self._closed + def __del__(self): # Prevent reports about unhandled exceptions. # Better than self._closed._log_traceback = False hack @@ -680,6 +342,13 @@ def __del__(self): closed.exception() +def _swallow_unhandled_exception(task): + # Do a trick to suppress unhandled exception + # if stream.write() was used without await and + # stream.drain() was paused and resumed with an exception + task.exception() + + class StreamWriter: """Wraps a Transport. @@ -690,13 +359,21 @@ class StreamWriter: directly. """ - def __init__(self, transport, protocol, reader, loop): + def __init__(self, transport, protocol, reader, loop, + *, _asyncio_internal=False): + if not _asyncio_internal: + warnings.warn(f"{self.__class__} should be instaniated " + "by asyncio internals only, " + "please avoid its creation from user code", + DeprecationWarning) self._transport = transport self._protocol = protocol # drain() expects that the reader has an exception() method assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -710,9 +387,35 @@ def transport(self): def write(self, data): self._transport.write(data) + return self._fast_drain() def writelines(self, data): self._transport.writelines(data) + return self._fast_drain() + + def _fast_drain(self): + # The helper tries to use fast-path to return already existing complete future + # object if underlying transport is not paused and actual waiting for writing + # resume is not needed + if self._reader is not None: + # this branch will be simplified after merging reader with writer + exc = self._reader.exception() + if exc is not None: + fut = self._loop.create_future() + fut.set_exception(exc) + return fut + if not self._transport.is_closing(): + if self._protocol._connection_lost: + fut = self._loop.create_future() + fut.set_exception(ConnectionResetError('Connection lost')) + return fut + if not self._protocol._paused: + # fast path, the stream is not paused + # no need to wait for resume signal + return self._complete_fut + ret = self._loop.create_task(self.drain()) + ret.add_done_callback(_swallow_unhandled_exception) + return ret def write_eof(self): return self._transport.write_eof() @@ -721,13 +424,14 @@ def can_write_eof(self): return self._transport.can_write_eof() def close(self): - return self._transport.close() + self._transport.close() + return self._protocol._get_close_waiter(self) def is_closing(self): return self._transport.is_closing() async def wait_closed(self): - await self._protocol._closed + await self._protocol._get_close_waiter(self) def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) @@ -745,19 +449,25 @@ async def drain(self): if exc is not None: raise exc if self._transport.is_closing(): - # Yield to the event loop so connection_lost() may be - # called. Without this, _drain_helper() would return - # immediately, and code that calls - # write(...); await drain() - # in a loop would never call connection_lost(), so it - # would not see an error when the socket is closed. - await tasks.sleep(0, loop=self._loop) + # Wait for protocol.connection_lost() call + # Raise connection closing error if any, + # ConnectionResetError otherwise + await sleep(0) await self._protocol._drain_helper() class StreamReader: - def __init__(self, limit=_DEFAULT_LIMIT, loop=None): + _source_traceback = None + + def __init__(self, limit=_DEFAULT_LIMIT, loop=None, + *, _asyncio_internal=False): + if not _asyncio_internal: + warnings.warn(f"{self.__class__} should be instaniated " + "by asyncio internals only, " + "please avoid its creation from user code", + DeprecationWarning) + # The line length limit is a security feature; # it also doubles as half the buffer limit. @@ -775,6 +485,9 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None): self._exception = None self._transport = None self._paused = False + if self._loop.get_debug(): + self._source_traceback = format_helpers.extract_stack( + sys._getframe(1)) def __repr__(self): info = ['StreamReader'] @@ -1768,3 +1481,5 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() +======= +>>>>>>> parent of 23b4b697e5... bpo-36889: Merge asyncio streams (GH-13251) diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index e6bec71d6c7dac..d34b6118fdcf72 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -27,8 +27,6 @@ def __init__(self, limit, loop, *, _asyncio_internal=False): self._process_exited = False self._pipe_fds = [] self._stdin_closed = self._loop.create_future() - self._stdout_closed = self._loop.create_future() - self._stderr_closed = self._loop.create_future() def __repr__(self): info = [self.__class__.__name__] @@ -42,35 +40,30 @@ def __repr__(self): def connection_made(self, transport): self._transport = transport + stdout_transport = transport.get_pipe_transport(1) if stdout_transport is not None: - self.stdout = streams.Stream(mode=streams.StreamMode.READ, - transport=stdout_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) + self.stdout = streams.StreamReader(limit=self._limit, + loop=self._loop, + _asyncio_internal=True) self.stdout.set_transport(stdout_transport) self._pipe_fds.append(1) stderr_transport = transport.get_pipe_transport(2) if stderr_transport is not None: - self.stderr = streams.Stream(mode=streams.StreamMode.READ, - transport=stderr_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) + self.stderr = streams.StreamReader(limit=self._limit, + loop=self._loop, + _asyncio_internal=True) self.stderr.set_transport(stderr_transport) self._pipe_fds.append(2) stdin_transport = transport.get_pipe_transport(0) if stdin_transport is not None: - self.stdin = streams.Stream(mode=streams.StreamMode.WRITE, - transport=stdin_transport, - protocol=self, - loop=self._loop, - _asyncio_internal=True) + self.stdin = streams.StreamWriter(stdin_transport, + protocol=self, + reader=None, + loop=self._loop, + _asyncio_internal=True) def pipe_data_received(self, fd, data): if fd == 1: @@ -121,10 +114,6 @@ def _maybe_close_transport(self): def _get_close_waiter(self, stream): if stream is self.stdin: return self._stdin_closed - elif stream is self.stdout: - return self._stdout_closed - elif stream is self.stderr: - return self._stderr_closed class Process: diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index ac51109ff1a83d..ff7fd39665ea7b 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -653,7 +653,7 @@ async def connect_pipe(self, address): # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) - await tasks.sleep(delay) + await tasks.sleep(delay, loop=self._loop) return windows_utils.PipeHandle(handle) diff --git a/Lib/test/test___all__.py b/Lib/test/test___all__.py index c077881511b8ce..f6e82eb64ab025 100644 --- a/Lib/test/test___all__.py +++ b/Lib/test/test___all__.py @@ -30,27 +30,21 @@ def check_all(self, modname): raise NoAll(modname) names = {} with self.subTest(module=modname): - with support.check_warnings( - ("", DeprecationWarning), - ("", ResourceWarning), - quiet=True): - try: - exec("from %s import *" % modname, names) - except Exception as e: - # Include the module name in the exception string - self.fail("__all__ failure in {}: {}: {}".format( - modname, e.__class__.__name__, e)) - if "__builtins__" in names: - del names["__builtins__"] - if '__annotations__' in names: - del names['__annotations__'] - if "__warningregistry__" in names: - del names["__warningregistry__"] - keys = set(names) - all_list = sys.modules[modname].__all__ - all_set = set(all_list) - self.assertCountEqual(all_set, all_list, "in module {}".format(modname)) - self.assertEqual(keys, all_set, "in module {}".format(modname)) + try: + exec("from %s import *" % modname, names) + except Exception as e: + # Include the module name in the exception string + self.fail("__all__ failure in {}: {}: {}".format( + modname, e.__class__.__name__, e)) + if "__builtins__" in names: + del names["__builtins__"] + if '__annotations__' in names: + del names['__annotations__'] + keys = set(names) + all_list = sys.modules[modname].__all__ + all_set = set(all_list) + self.assertCountEqual(all_set, all_list, "in module {}".format(modname)) + self.assertEqual(keys, all_set, "in module {}".format(modname)) def walk_modules(self, basedir, modpath): for fn in sorted(os.listdir(basedir)): diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 811b37425dd287..33e728989e643f 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1152,9 +1152,8 @@ def test_create_server_stream_bittype(self): @unittest.skipUnless(hasattr(socket, 'AF_INET6'), 'no IPv6 support') def test_create_server_ipv6(self): async def main(): - with self.assertWarns(DeprecationWarning): - srv = await asyncio.start_server( - lambda: None, '::1', 0, loop=self.loop) + srv = await asyncio.start_server( + lambda: None, '::1', 0, loop=self.loop) try: self.assertGreater(len(srv.sockets), 0) finally: diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py index b1531fb9343f5e..f24e363ebfcfa3 100644 --- a/Lib/test/test_asyncio/test_buffered_proto.py +++ b/Lib/test/test_asyncio/test_buffered_proto.py @@ -58,10 +58,9 @@ async def on_server_client(reader, writer): writer.close() await writer.wait_closed() - with self.assertWarns(DeprecationWarning): - srv = self.loop.run_until_complete( - asyncio.start_server( - on_server_client, '127.0.0.1', 0)) + srv = self.loop.run_until_complete( + asyncio.start_server( + on_server_client, '127.0.0.1', 0)) addr = srv.sockets[0].getsockname() self.loop.run_until_complete( diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py index 11c0ce495d5261..297a3b3901d631 100644 --- a/Lib/test/test_asyncio/test_pep492.py +++ b/Lib/test/test_asyncio/test_pep492.py @@ -94,9 +94,7 @@ class StreamReaderTests(BaseTest): def test_readline(self): DATA = b'line1\nline2\nline3' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, _asyncio_internal=True) stream.feed_data(DATA) stream.feed_eof() diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py index 0e38e6c8ecd4c2..4e758ad12e600e 100644 --- a/Lib/test/test_asyncio/test_server.py +++ b/Lib/test/test_asyncio/test_server.py @@ -46,9 +46,8 @@ async def main(srv): async with srv: await srv.serve_forever() - with self.assertWarns(DeprecationWarning): - srv = self.loop.run_until_complete(asyncio.start_server( - serve, support.HOSTv4, 0, loop=self.loop, start_serving=False)) + srv = self.loop.run_until_complete(asyncio.start_server( + serve, support.HOSTv4, 0, loop=self.loop, start_serving=False)) self.assertFalse(srv.is_serving()) @@ -103,9 +102,8 @@ async def main(srv): await srv.serve_forever() with test_utils.unix_socket_path() as addr: - with self.assertWarns(DeprecationWarning): - srv = self.loop.run_until_complete(asyncio.start_unix_server( - serve, addr, loop=self.loop, start_serving=False)) + srv = self.loop.run_until_complete(asyncio.start_unix_server( + serve, addr, loop=self.loop, start_serving=False)) main_task = self.loop.create_task(main(srv)) diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 1c2285063ef6da..31c3830dadab8e 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -658,13 +658,12 @@ def server(sock): sock.close() async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop, - ssl_handshake_timeout=1.0) + reader, writer = await asyncio.open_connection( + *addr, + ssl=client_sslctx, + server_hostname='', + loop=self.loop, + ssl_handshake_timeout=1.0) with self.tcp_server(server, max_clients=1, @@ -698,13 +697,12 @@ def server(sock): sock.close() async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop, - ssl_handshake_timeout=1.0) + reader, writer = await asyncio.open_connection( + *addr, + ssl=client_sslctx, + server_hostname='', + loop=self.loop, + ssl_handshake_timeout=1.0) with self.tcp_server(server, max_clients=1, @@ -735,12 +733,11 @@ def server(sock): sock.close() async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop) + reader, writer = await asyncio.open_connection( + *addr, + ssl=client_sslctx, + server_hostname='', + loop=self.loop) self.assertEqual(await reader.readline(), b'A\n') writer.write(b'B') diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index eab71e80308fef..9492fed7c91b05 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1,8 +1,6 @@ """Tests for streams.py.""" -import contextlib import gc -import io import os import queue import pickle @@ -18,7 +16,6 @@ ssl = None import asyncio -from asyncio.streams import _StreamProtocol, _ensure_can_read, _ensure_can_write from test.test_asyncio import utils as test_utils @@ -26,24 +23,6 @@ def tearDownModule(): asyncio.set_event_loop_policy(None) -class StreamModeTests(unittest.TestCase): - def test__ensure_can_read_ok(self): - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READ)) - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_read_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - _ensure_can_read(asyncio.StreamMode.WRITE) - - def test__ensure_can_write_ok(self): - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.WRITE)) - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_write_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - _ensure_can_write(asyncio.StreamMode.READ) - - class StreamTests(test_utils.TestCase): DATA = b'line1\nline2\nline3\n' @@ -63,15 +42,13 @@ def tearDown(self): @mock.patch('asyncio.streams.events') def test_ctor_global_loop(self, m_events): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + stream = asyncio.StreamReader(_asyncio_internal=True) self.assertIs(stream._loop, m_events.get_event_loop.return_value) def _basetest_open_connection(self, open_connection_fut): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - with self.assertWarns(DeprecationWarning): - reader, writer = self.loop.run_until_complete(open_connection_fut) + reader, writer = self.loop.run_until_complete(open_connection_fut) writer.write(b'GET / HTTP/1.0\r\n\r\n') f = reader.readline() data = self.loop.run_until_complete(f) @@ -99,9 +76,7 @@ def _basetest_open_connection_no_loop_ssl(self, open_connection_fut): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) try: - with self.assertWarns(DeprecationWarning): - reader, writer = self.loop.run_until_complete( - open_connection_fut) + reader, writer = self.loop.run_until_complete(open_connection_fut) finally: asyncio.set_event_loop(None) writer.write(b'GET / HTTP/1.0\r\n\r\n') @@ -137,8 +112,7 @@ def test_open_unix_connection_no_loop_ssl(self): def _basetest_open_connection_error(self, open_connection_fut): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - with self.assertWarns(DeprecationWarning): - reader, writer = self.loop.run_until_complete(open_connection_fut) + reader, writer = self.loop.run_until_complete(open_connection_fut) writer._protocol.connection_lost(ZeroDivisionError()) f = reader.read() with self.assertRaises(ZeroDivisionError): @@ -161,26 +135,23 @@ def test_open_unix_connection_error(self): self._basetest_open_connection_error(conn_fut) def test_feed_empty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'') self.assertEqual(b'', stream._buffer) def test_feed_nonempty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(self.DATA) self.assertEqual(self.DATA, stream._buffer) def test_read_zero(self): # Read zero bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.read(0)) @@ -189,9 +160,8 @@ def test_read_zero(self): def test_read(self): # Read bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) read_task = asyncio.Task(stream.read(30), loop=self.loop) def cb(): @@ -204,9 +174,8 @@ def cb(): def test_read_line_breaks(self): # Read bytes without line breaks. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'line1') stream.feed_data(b'line2') @@ -217,9 +186,8 @@ def test_read_line_breaks(self): def test_read_eof(self): # Read bytes, stop at eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) read_task = asyncio.Task(stream.read(1024), loop=self.loop) def cb(): @@ -232,9 +200,8 @@ def cb(): def test_read_until_eof(self): # Read all bytes until eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) read_task = asyncio.Task(stream.read(-1), loop=self.loop) def cb(): @@ -249,9 +216,8 @@ def cb(): self.assertEqual(b'', stream._buffer) def test_read_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.read(2)) @@ -263,19 +229,16 @@ def test_read_exception(self): def test_invalid_limit(self): with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=0, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=0, loop=self.loop, + _asyncio_internal=True) with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=-1, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=-1, loop=self.loop, + _asyncio_internal=True) def test_read_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=3, loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.read(5)) self.assertEqual(b'chunk', data) @@ -284,9 +247,8 @@ def test_read_limit(self): def test_readline(self): # Read one line. 'readline' will need to wait for the data # to come from 'cb' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'chunk1 ') read_task = asyncio.Task(stream.readline(), loop=self.loop) @@ -301,12 +263,11 @@ def cb(): self.assertEqual(b' chunk4', stream._buffer) def test_readline_limit_with_existing_data(self): - # Read one line. The data is in Stream's buffer + # Read one line. The data is in StreamReader's buffer # before the event loop is run. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=3, loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'li') stream.feed_data(b'ne1\nline2\n') @@ -315,9 +276,8 @@ def test_readline_limit_with_existing_data(self): # The buffer should contain the remaining data after exception self.assertEqual(b'line2\n', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=3, loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'li') stream.feed_data(b'ne1') stream.feed_data(b'li') @@ -332,9 +292,8 @@ def test_readline_limit_with_existing_data(self): self.assertEqual(b'', stream._buffer) def test_at_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) self.assertFalse(stream.at_eof()) stream.feed_data(b'some data\n') @@ -349,12 +308,11 @@ def test_at_eof(self): self.assertTrue(stream.at_eof()) def test_readline_limit(self): - # Read one line. Streams are fed with data after + # Read one line. StreamReaders are fed with data after # their 'readline' methods are called. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop, + _asyncio_internal=True) def cb(): stream.feed_data(b'chunk1') stream.feed_data(b'chunk2') @@ -368,9 +326,8 @@ def cb(): # a ValueError it should be empty. self.assertEqual(b'', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop, + _asyncio_internal=True) def cb(): stream.feed_data(b'chunk1') stream.feed_data(b'chunk2\n') @@ -383,9 +340,8 @@ def cb(): self.assertEqual(b'chunk3\n', stream._buffer) # check strictness of the limit - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'1234567\n') line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'1234567\n', line) @@ -404,9 +360,8 @@ def cb(): def test_readline_nolimit_nowait(self): # All needed data for the first 'readline' call will be # in the buffer. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(self.DATA[:6]) stream.feed_data(self.DATA[6:]) @@ -416,9 +371,8 @@ def test_readline_nolimit_nowait(self): self.assertEqual(b'line2\nline3\n', stream._buffer) def test_readline_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'some data') stream.feed_eof() @@ -426,18 +380,16 @@ def test_readline_eof(self): self.assertEqual(b'some data', line) def test_readline_empty_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_eof() line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', line) def test_readline_read_byte_count(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(self.DATA) self.loop.run_until_complete(stream.readline()) @@ -448,9 +400,8 @@ def test_readline_read_byte_count(self): self.assertEqual(b'ine3\n', stream._buffer) def test_readline_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readline()) @@ -462,16 +413,14 @@ def test_readline_exception(self): self.assertEqual(b'', stream._buffer) def test_readuntil_separator(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) with self.assertRaisesRegex(ValueError, 'Separator should be'): self.loop.run_until_complete(stream.readuntil(separator=b'')) def test_readuntil_multi_chunks(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'lineAAA') data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA')) @@ -489,9 +438,8 @@ def test_readuntil_multi_chunks(self): self.assertEqual(b'xxx', stream._buffer) def test_readuntil_multi_chunks_1(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'QWEaa') stream.feed_data(b'XYaa') @@ -526,9 +474,8 @@ def test_readuntil_multi_chunks_1(self): self.assertEqual(b'', stream._buffer) def test_readuntil_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'some dataAA') stream.feed_eof() @@ -539,9 +486,8 @@ def test_readuntil_eof(self): self.assertEqual(b'', stream._buffer) def test_readuntil_limit_found_sep(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=3, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, limit=3, + _asyncio_internal=True) stream.feed_data(b'some dataAA') with self.assertRaisesRegex(asyncio.LimitOverrunError, @@ -559,9 +505,8 @@ def test_readuntil_limit_found_sep(self): def test_readexactly_zero_or_less(self): # Read exact number of bytes (zero or less). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.readexactly(0)) @@ -574,9 +519,8 @@ def test_readexactly_zero_or_less(self): def test_readexactly(self): # Read exact number of bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) n = 2 * len(self.DATA) read_task = asyncio.Task(stream.readexactly(n), loop=self.loop) @@ -592,9 +536,8 @@ def cb(): self.assertEqual(self.DATA, stream._buffer) def test_readexactly_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=3, loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.readexactly(5)) self.assertEqual(b'chunk', data) @@ -602,9 +545,8 @@ def test_readexactly_limit(self): def test_readexactly_eof(self): # Read exact number of bytes (eof). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) n = 2 * len(self.DATA) read_task = asyncio.Task(stream.readexactly(n), loop=self.loop) @@ -622,9 +564,8 @@ def cb(): self.assertEqual(b'', stream._buffer) def test_readexactly_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readexactly(2)) @@ -635,9 +576,8 @@ def test_readexactly_exception(self): ValueError, self.loop.run_until_complete, stream.readexactly(2)) def test_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) self.assertIsNone(stream.exception()) exc = ValueError() @@ -645,9 +585,8 @@ def test_exception(self): self.assertIs(stream.exception(), exc) def test_exception_waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) async def set_err(): stream.set_exception(ValueError()) @@ -660,9 +599,8 @@ async def set_err(): self.assertRaises(ValueError, t1.result) def test_exception_cancel(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) t = asyncio.Task(stream.readline(), loop=self.loop) test_utils.run_briefly(self.loop) @@ -717,9 +655,8 @@ def stop(self): self.server = None async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, loop=self.loop) + reader, writer = await asyncio.open_connection( + *addr, loop=self.loop) # send a line writer.write(b"hello world!\n") # read it back @@ -733,8 +670,7 @@ async def client(addr): # test the server variant with a coroutine as client handler server = MyServer(self.loop) - with self.assertWarns(DeprecationWarning): - addr = server.start() + addr = server.start() msg = self.loop.run_until_complete(asyncio.Task(client(addr), loop=self.loop)) server.stop() @@ -742,8 +678,7 @@ async def client(addr): # test the server variant with a callback as client handler server = MyServer(self.loop) - with self.assertWarns(DeprecationWarning): - addr = server.start_callback() + addr = server.start_callback() msg = self.loop.run_until_complete(asyncio.Task(client(addr), loop=self.loop)) server.stop() @@ -791,9 +726,8 @@ def stop(self): self.server = None async def client(path): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_unix_connection( - path, loop=self.loop) + reader, writer = await asyncio.open_unix_connection( + path, loop=self.loop) # send a line writer.write(b"hello world!\n") # read it back @@ -808,8 +742,7 @@ async def client(path): # test the server variant with a coroutine as client handler with test_utils.unix_socket_path() as path: server = MyServer(self.loop, path) - with self.assertWarns(DeprecationWarning): - server.start() + server.start() msg = self.loop.run_until_complete(asyncio.Task(client(path), loop=self.loop)) server.stop() @@ -818,8 +751,7 @@ async def client(path): # test the server variant with a callback as client handler with test_utils.unix_socket_path() as path: server = MyServer(self.loop, path) - with self.assertWarns(DeprecationWarning): - server.start_callback() + server.start_callback() msg = self.loop.run_until_complete(asyncio.Task(client(path), loop=self.loop)) server.stop() @@ -831,7 +763,7 @@ async def client(path): def test_read_all_from_pipe_reader(self): # See asyncio issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the - # Stream's limit so that twice it is less than the size + # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. @@ -845,11 +777,10 @@ def test_read_all_from_pipe_reader(self): args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=1, - _asyncio_internal=True) - protocol = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) + reader = asyncio.StreamReader(loop=self.loop, limit=1, + _asyncio_internal=True) + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop, + _asyncio_internal=True) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) @@ -866,74 +797,28 @@ def test_read_all_from_pipe_reader(self): asyncio.set_child_watcher(None) os.close(wfd) - data = self.loop.run_until_complete(stream.read(-1)) + data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data') def test_streamreader_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - reader = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + reader = asyncio.StreamReader(_asyncio_internal=True) self.assertIs(reader._loop, self.loop) def test_streamreaderprotocol_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - stream = mock.Mock() - protocol = _StreamProtocol(stream, _asyncio_internal=True) + reader = mock.Mock() + protocol = asyncio.StreamReaderProtocol(reader, _asyncio_internal=True) self.assertIs(protocol._loop, self.loop) - def test_drain_raises_deprecated(self): - # See http://bugs.python.org/issue25441 - - # This test should not use asyncio for the mock server; the - # whole point of the test is to test for a bug in drain() - # where it never gives up the event loop but the socket is - # closed on the server side. - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - q = queue.Queue() - - def server(): - # Runs in a separate thread. - with socket.create_server(('127.0.0.1', 0)) as sock: - addr = sock.getsockname() - q.put(addr) - clt, _ = sock.accept() - clt.close() - - async def client(host, port): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - host, port, loop=self.loop) - - while True: - writer.write(b"foo\n") - await writer.drain() - - # Start the server thread and wait for it to be listening. - thread = threading.Thread(target=server) - thread.setDaemon(True) - thread.start() - addr = q.get() - - # Should not be stuck in an infinite loop. - with self.assertRaises((ConnectionResetError, ConnectionAbortedError, - BrokenPipeError)): - self.loop.run_until_complete(client(*addr)) - - # Clean up the thread. (Only on success; on failure, it may - # be stuck in accept().) - thread.join() - self.assertEqual([], messages) - def test_drain_raises(self): # See http://bugs.python.org/issue25441 @@ -955,11 +840,12 @@ def server(): clt.close() async def client(host, port): - stream = await asyncio.connect(host, port) + reader, writer = await asyncio.open_connection( + host, port, loop=self.loop) while True: - stream.write(b"foo\n") - await stream.drain() + writer.write(b"foo\n") + await writer.drain() # Start the server thread and wait for it to be listening. thread = threading.Thread(target=server) @@ -978,62 +864,54 @@ async def client(host, port): self.assertEqual([], messages) def test___repr__(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - self.assertEqual("", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) + self.assertEqual("", repr(stream)) def test___repr__nondefault_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=123, - _asyncio_internal=True) - self.assertEqual("", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop, limit=123, + _asyncio_internal=True) + self.assertEqual("", repr(stream)) def test___repr__eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_eof() - self.assertEqual("", repr(stream)) + self.assertEqual("", repr(stream)) def test___repr__data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream.feed_data(b'data') - self.assertEqual("", repr(stream)) + self.assertEqual("", repr(stream)) def test___repr__exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) exc = RuntimeError() stream.set_exception(exc) - self.assertEqual("", + self.assertEqual("", repr(stream)) def test___repr__waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream._waiter = asyncio.Future(loop=self.loop) self.assertRegex( repr(stream), - r">") + r">") stream._waiter.set_result(None) self.loop.run_until_complete(stream._waiter) stream._waiter = None - self.assertEqual("", repr(stream)) + self.assertEqual("", repr(stream)) def test___repr__transport(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) stream._transport = mock.Mock() stream._transport.__repr__ = mock.Mock() stream._transport.__repr__.return_value = "" - self.assertEqual(">", - repr(stream)) + self.assertEqual(">", repr(stream)) def test_IncompleteReadError_pickleable(self): e = asyncio.IncompleteReadError(b'abc', 10) @@ -1052,11 +930,10 @@ def test_LimitOverrunError_pickleable(self): self.assertEqual(str(e), str(e2)) self.assertEqual(e.consumed, e2.consumed) - def test_wait_closed_on_close_deprecated(self): + def test_wait_closed_on_close(self): with test_utils.run_test_server() as httpd: - with self.assertWarns(DeprecationWarning): - rd, wr = self.loop.run_until_complete( - asyncio.open_connection(*httpd.address, loop=self.loop)) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, loop=self.loop)) wr.write(b'GET / HTTP/1.0\r\n\r\n') f = rd.readline() @@ -1070,28 +947,10 @@ def test_wait_closed_on_close_deprecated(self): self.assertTrue(wr.is_closing()) self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertFalse(stream.is_closing()) - stream.close() - self.assertTrue(stream.is_closing()) - self.loop.run_until_complete(stream.wait_closed()) - - def test_wait_closed_on_close_with_unread_data_deprecated(self): + def test_wait_closed_on_close_with_unread_data(self): with test_utils.run_test_server() as httpd: - with self.assertWarns(DeprecationWarning): - rd, wr = self.loop.run_until_complete( - asyncio.open_connection(*httpd.address, loop=self.loop)) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, loop=self.loop)) wr.write(b'GET / HTTP/1.0\r\n\r\n') f = rd.readline() @@ -1100,44 +959,32 @@ def test_wait_closed_on_close_with_unread_data_deprecated(self): wr.close() self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close_with_unread_data(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - stream.close() - self.loop.run_until_complete(stream.wait_closed()) - def test_del_stream_before_sock_closing(self): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - async def test(): - - with test_utils.run_test_server() as httpd: - stream = await asyncio.connect(*httpd.address) - sock = stream.get_extra_info('socket') - self.assertNotEqual(sock.fileno(), -1) + with test_utils.run_test_server() as httpd: + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, loop=self.loop)) + sock = wr.get_extra_info('socket') + self.assertNotEqual(sock.fileno(), -1) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + wr.write(b'GET / HTTP/1.0\r\n\r\n') + f = rd.readline() + data = self.loop.run_until_complete(f) + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - # drop refs to reader/writer - del stream - gc.collect() - # make a chance to close the socket - await asyncio.sleep(0) + # drop refs to reader/writer + del rd + del wr + gc.collect() + # make a chance to close the socket + test_utils.run_briefly(self.loop) - self.assertEqual(1, len(messages), messages) - self.assertEqual(sock.fileno(), -1) + self.assertEqual(1, len(messages)) + self.assertEqual(sock.fileno(), -1) - self.loop.run_until_complete(test()) - self.assertEqual(1, len(messages), messages) + self.assertEqual(1, len(messages)) self.assertEqual('An open stream object is being garbage ' 'collected; call "stream.close()" explicitly.', messages[0]['message']) @@ -1147,12 +994,11 @@ def test_del_stream_before_connection_made(self): self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - pr = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) - del stream + rd = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) + pr = asyncio.StreamReaderProtocol(rd, loop=self.loop, + _asyncio_internal=True) + del rd gc.collect() tr, _ = self.loop.run_until_complete( self.loop.create_connection( @@ -1169,14 +1015,14 @@ def test_del_stream_before_connection_made(self): def test_async_writer_api(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - await stream.close() + await wr.close() messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1186,18 +1032,18 @@ async def inner(httpd): self.assertEqual(messages, []) - def test_async_writer_api_exception_after_close(self): + def test_async_writer_api(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() + wr.close() with self.assertRaises(ConnectionResetError): - await stream.write(b'data') + await wr.write(b'data') messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1213,13 +1059,11 @@ def test_eof_feed_when_closing_writer(self): self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - with self.assertWarns(DeprecationWarning): - rd, wr = self.loop.run_until_complete( - asyncio.open_connection(*httpd.address, - loop=self.loop)) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, + loop=self.loop)) - wr.close() - f = wr.wait_closed() + f = wr.close() self.loop.run_until_complete(f) assert rd.at_eof() f = rd.read() @@ -1232,62 +1076,13 @@ def test_stream_reader_create_warning(self): with contextlib.suppress(AttributeError): del asyncio.StreamReader with self.assertWarns(DeprecationWarning): - asyncio.StreamReader + asyncio.StreamReader(loop=self.loop) - def test_stream_writer_create_warning(self): - with contextlib.suppress(AttributeError): - del asyncio.StreamWriter + def test_stream_reader_protocol_create_warning(self): + reader = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) with self.assertWarns(DeprecationWarning): - asyncio.StreamWriter - - def test_stream_reader_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.write(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.writelines([b'data', b'other']) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - stream.write_eof() - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.drain() - - self.loop.run_until_complete(inner()) - - def test_stream_writer_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.WRITE, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - stream.feed_data(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readline() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readuntil() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.read() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readexactly(10) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - async for chunk in stream: - pass - - self.loop.run_until_complete(inner()) - - def _basetest_connect(self, stream): - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() - self.loop.run_until_complete(stream.wait_closed()) + asyncio.StreamReaderProtocol(reader, loop=self.loop) self.assertEqual([], messages) @@ -1776,8 +1571,16 @@ async def test(): self.assertEqual(data, b'2345') os.close(rpipe) +======= + def test_stream_writer_create_warning(self): + reader = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) + proto = asyncio.StreamReaderProtocol(reader, loop=self.loop, + _asyncio_internal=True) + with self.assertWarns(DeprecationWarning): + asyncio.StreamWriter('transport', proto, reader, self.loop) +>>>>>>> parent of 23b4b697e5... bpo-36889: Merge asyncio streams (GH-13251) - self.loop.run_until_complete(test()) def test_stream_ctor_forbidden(self): with self.assertRaisesRegex(RuntimeError, diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 1e1c01d713b5c6..f2f9313a0d0b32 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -17,7 +17,6 @@ import asyncio from asyncio import windows_events -from asyncio.streams import _StreamProtocol from test.test_asyncio import utils as test_utils from test.support.script_helper import spawn_python @@ -102,16 +101,16 @@ async def _test_pipe(self): clients = [] for i in range(5): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, _asyncio_internal=True) - protocol = _StreamProtocol(stream, - loop=self.loop, - _asyncio_internal=True) + stream_reader = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) + protocol = asyncio.StreamReaderProtocol(stream_reader, + loop=self.loop, + _asyncio_internal=True) trans, proto = await self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) self.assertEqual(protocol, proto) - clients.append((stream, trans)) + clients.append((stream_reader, trans)) for i, (r, w) in enumerate(clients): w.write('lower-{}\n'.format(i).encode()) @@ -120,7 +119,6 @@ async def _test_pipe(self): response = await r.readline() self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) w.close() - await r.close() server.close()