From 144a4000df0dd01d99b435752084164f44454071 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 14:20:35 +0300 Subject: [PATCH 1/8] Work on --- aiohttp/_http_parser.pyx | 19 +++++++++++++------ aiohttp/http_parser.py | 13 +++++++++---- aiohttp/payload.py | 10 +++++----- aiohttp/streams.py | 9 +++------ tests/test_http_parser.py | 12 ++++++------ tests/test_multipart.py | 5 ++--- tests/test_streams.py | 27 ++++++++++++++------------- tests/test_web_request.py | 14 +++++++------- 8 files changed, 59 insertions(+), 50 deletions(-) diff --git a/aiohttp/_http_parser.pyx b/aiohttp/_http_parser.pyx index 82e48d4d250..28736a322d5 100644 --- a/aiohttp/_http_parser.pyx +++ b/aiohttp/_http_parser.pyx @@ -303,6 +303,7 @@ cdef class HttpParser: object _payload_exception object _last_error bint _auto_decompress + int _limit str _content_encoding @@ -324,7 +325,8 @@ cdef class HttpParser: PyMem_Free(self._csettings) cdef _init(self, cparser.http_parser_type mode, - object protocol, object loop, object timer=None, + object protocol, object loop, int limit, + object timer=None, size_t max_line_size=8190, size_t max_headers=32768, size_t max_field_size=8190, payload_exception=None, bint response_with_body=True, bint read_until_eof=False, @@ -370,6 +372,7 @@ cdef class HttpParser: self._csettings.on_chunk_complete = cb_on_chunk_complete self._last_error = None + self._limit = limit cdef _process_header(self): if self._raw_name: @@ -454,7 +457,8 @@ cdef class HttpParser: self._read_until_eof) ): payload = StreamReader( - self._protocol, timer=self._timer, loop=self._loop) + self._protocol, timer=self._timer, loop=self._loop, + limit=self._limit) else: payload = EMPTY_PAYLOAD @@ -566,8 +570,10 @@ cdef class HttpRequestParser(HttpParser): def __init__(self, protocol, loop, timer=None, size_t max_line_size=8190, size_t max_headers=32768, size_t max_field_size=8190, payload_exception=None, - bint response_with_body=True, bint read_until_eof=False): - self._init(cparser.HTTP_REQUEST, protocol, loop, timer, + bint response_with_body=True, bint read_until_eof=False, + int limit=2**16, + ): + self._init(cparser.HTTP_REQUEST, protocol, loop, limit, timer, max_line_size, max_headers, max_field_size, payload_exception, response_with_body, read_until_eof) @@ -594,8 +600,9 @@ cdef class HttpResponseParser(HttpParser): size_t max_line_size=8190, size_t max_headers=32768, size_t max_field_size=8190, payload_exception=None, bint response_with_body=True, bint read_until_eof=False, - bint auto_decompress=True): - self._init(cparser.HTTP_RESPONSE, protocol, loop, timer, + bint auto_decompress=True, int limit=2**16 + ): + self._init(cparser.HTTP_RESPONSE, protocol, loop, limit, timer, max_line_size, max_headers, max_field_size, payload_exception, response_with_body, read_until_eof, auto_decompress) diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index bce7d158796..ea2df3b9008 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -178,7 +178,8 @@ def __init__(self, protocol: BaseProtocol, payload_exception: Optional[Type[BaseException]]=None, response_with_body: bool=True, read_until_eof: bool=False, - auto_decompress: bool=True) -> None: + auto_decompress: bool=True, + limit: int=2**16) -> None: self.protocol = protocol self.loop = loop self.max_line_size = max_line_size @@ -198,6 +199,7 @@ def __init__(self, protocol: BaseProtocol, self._payload = None self._payload_parser = None # type: Optional[HttpPayloadParser] self._auto_decompress = auto_decompress + self._limit = limit self._headers_parser = HeadersParser(max_line_size, max_headers, max_field_size) @@ -288,7 +290,8 @@ def feed_data( if ((length is not None and length > 0) or msg.chunked and not msg.upgrade): payload = StreamReader( - self.protocol, timer=self.timer, loop=loop) + self.protocol, timer=self.timer, loop=loop, + limit=self._limit) payload_parser = HttpPayloadParser( payload, length=length, chunked=msg.chunked, method=method, @@ -300,7 +303,8 @@ def feed_data( self._payload_parser = payload_parser elif method == METH_CONNECT: payload = StreamReader( - self.protocol, timer=self.timer, loop=loop) + self.protocol, timer=self.timer, loop=loop, + limit=self._limit) self._upgraded = True self._payload_parser = HttpPayloadParser( payload, method=msg.method, @@ -310,7 +314,8 @@ def feed_data( if (getattr(msg, 'code', 100) >= 199 and length is None and self.read_until_eof): payload = StreamReader( - self.protocol, timer=self.timer, loop=loop) + self.protocol, timer=self.timer, loop=loop, + limit=self._limit) payload_parser = HttpPayloadParser( payload, length=length, chunked=msg.chunked, method=method, diff --git a/aiohttp/payload.py b/aiohttp/payload.py index ccc64f38526..04f18f33f26 100644 --- a/aiohttp/payload.py +++ b/aiohttp/payload.py @@ -33,7 +33,7 @@ parse_mimetype, sentinel, ) -from .streams import DEFAULT_LIMIT, StreamReader +from .streams import StreamReader from .typedefs import JSONEncoder, _CIMultiDict __all__ = ('PAYLOAD_REGISTRY', 'get_payload', 'payload_type', 'Payload', @@ -295,12 +295,12 @@ async def write(self, writer: AbstractStreamWriter) -> None: loop = asyncio.get_event_loop() try: chunk = await loop.run_in_executor( - None, self._value.read, DEFAULT_LIMIT + None, self._value.read, 2**16 ) while chunk: await writer.write(chunk) chunk = await loop.run_in_executor( - None, self._value.read, DEFAULT_LIMIT + None, self._value.read, 2**16 ) finally: await loop.run_in_executor(None, self._value.close) @@ -345,12 +345,12 @@ async def write(self, writer: AbstractStreamWriter) -> None: loop = asyncio.get_event_loop() try: chunk = await loop.run_in_executor( - None, self._value.read, DEFAULT_LIMIT + None, self._value.read, 2**16 ) while chunk: await writer.write(chunk.encode(self._encoding)) chunk = await loop.run_in_executor( - None, self._value.read, DEFAULT_LIMIT + None, self._value.read, 2**16 ) finally: await loop.run_in_executor(None, self._value.close) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index d3929dde1c8..e6441beca24 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -17,8 +17,6 @@ 'EMPTY_PAYLOAD', 'EofStream', 'StreamReader', 'DataQueue', 'FlowControlDataQueue') -DEFAULT_LIMIT = 2 ** 16 - _T = TypeVar('_T') @@ -105,8 +103,7 @@ class StreamReader(AsyncStreamReaderMixin): total_bytes = 0 - def __init__(self, protocol: BaseProtocol, - *, limit: int=DEFAULT_LIMIT, + def __init__(self, protocol: BaseProtocol, limit: int, *, timer: Optional[BaseTimerContext]=None, loop: asyncio.AbstractEventLoop) -> None: self._protocol = protocol @@ -133,7 +130,7 @@ def __repr__(self) -> str: info.append('%d bytes' % self._size) if self._eof: info.append('eof') - if self._low_water != DEFAULT_LIMIT: + if self._low_water != 2 ** 16: # default limit info.append('low=%d high=%d' % (self._low_water, self._high_water)) if self._waiter: info.append('w=%r' % self._waiter) @@ -601,7 +598,7 @@ class FlowControlDataQueue(DataQueue[_T]): It is a destination for parsed data.""" def __init__(self, protocol: BaseProtocol, *, - limit: int=DEFAULT_LIMIT, + limit: int=2**16, loop: asyncio.AbstractEventLoop) -> None: super().__init__(loop=loop) diff --git a/tests/test_http_parser.py b/tests/test_http_parser.py index ea996657316..cc9bc0d4d07 100644 --- a/tests/test_http_parser.py +++ b/tests/test_http_parser.py @@ -827,7 +827,7 @@ async def test_parse_chunked_payload_size_error(self, stream) -> None: http_exceptions.TransferEncodingError) async def test_parse_chunked_payload_split_end(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\n') p.feed_data(b'\r\n') @@ -836,7 +836,7 @@ async def test_parse_chunked_payload_split_end(self, protocol) -> None: assert b'asdf' == b''.join(out._buffer) async def test_parse_chunked_payload_split_end2(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\n\r') p.feed_data(b'\n') @@ -846,7 +846,7 @@ async def test_parse_chunked_payload_split_end2(self, protocol) -> None: async def test_parse_chunked_payload_split_end_trailers(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\n') p.feed_data(b'Content-MD5: 912ec803b2ce49e4a541068d495ab570\r\n') @@ -857,7 +857,7 @@ async def test_parse_chunked_payload_split_end_trailers(self, async def test_parse_chunked_payload_split_end_trailers2(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\n') p.feed_data(b'Content-MD5: 912ec803b2ce49e4a541068d495ab570\r\n\r') @@ -868,7 +868,7 @@ async def test_parse_chunked_payload_split_end_trailers2(self, async def test_parse_chunked_payload_split_end_trailers3(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\nContent-MD5: ') p.feed_data(b'912ec803b2ce49e4a541068d495ab570\r\n\r\n') @@ -878,7 +878,7 @@ async def test_parse_chunked_payload_split_end_trailers3(self, async def test_parse_chunked_payload_split_end_trailers4(self, protocol) -> None: - out = aiohttp.StreamReader(protocol, loop=None) + out = aiohttp.StreamReader(protocol, 2 ** 16, loop=None) p = HttpPayloadParser(out, chunked=True) p.feed_data(b'4\r\nasdf\r\n0\r\n' b'C') diff --git a/tests/test_multipart.py b/tests/test_multipart.py index 8f828b2338c..514f819ce80 100644 --- a/tests/test_multipart.py +++ b/tests/test_multipart.py @@ -18,7 +18,6 @@ ) from aiohttp.helpers import parse_mimetype from aiohttp.multipart import MultipartResponseWrapper -from aiohttp.streams import DEFAULT_LIMIT as stream_reader_default_limit from aiohttp.streams import StreamReader from aiohttp.test_utils import make_mocked_coro @@ -652,9 +651,9 @@ async def test_filename(self) -> None: assert 'foo.html' == part.filename async def test_reading_long_part(self, newline) -> None: - size = 2 * stream_reader_default_limit + size = 2 * 2 ** 16 protocol = mock.Mock(_reading_paused=False) - stream = StreamReader(protocol, loop=asyncio.get_event_loop()) + stream = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) stream.feed_data(b'0' * size + b'%s--:--' % newline) stream.feed_eof() obj = aiohttp.BodyPartReader(BOUNDARY, {}, stream, _newline=newline) diff --git a/tests/test_streams.py b/tests/test_streams.py index 2671b9d52ab..f3a16299b8c 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -24,7 +24,7 @@ def chunkify(seq, n): async def create_stream(): loop = asyncio.get_event_loop() protocol = mock.Mock(_reading_paused=False) - stream = streams.StreamReader(protocol, loop=loop) + stream = streams.StreamReader(protocol, 2 ** 16, loop=loop) stream.feed_data(DATA) stream.feed_eof() return stream @@ -73,6 +73,7 @@ class TestStreamReader: def _make_one(self, *args, **kwargs): loop = asyncio.get_event_loop() + kwargs.setdefault("limit", 2 ** 16) return streams.StreamReader(mock.Mock(_reading_paused=False), *args, **kwargs, loop=loop) @@ -1095,7 +1096,7 @@ async def set_err(): async def test_feed_data_waiters(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() eof_waiter = reader._eof_waiter = loop.create_future() @@ -1112,7 +1113,7 @@ async def test_feed_data_waiters(protocol) -> None: async def test_feed_data_completed_waiters(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() waiter.set_result(1) @@ -1123,7 +1124,7 @@ async def test_feed_data_completed_waiters(protocol) -> None: async def test_feed_eof_waiters(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() eof_waiter = reader._eof_waiter = loop.create_future() @@ -1138,7 +1139,7 @@ async def test_feed_eof_waiters(protocol) -> None: async def test_feed_eof_cancelled(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() eof_waiter = reader._eof_waiter = loop.create_future() @@ -1155,7 +1156,7 @@ async def test_feed_eof_cancelled(protocol) -> None: async def test_on_eof(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) on_eof = mock.Mock() reader.on_eof(on_eof) @@ -1176,7 +1177,7 @@ async def test_on_eof_empty_reader() -> None: async def test_on_eof_exc_in_callback(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) on_eof = mock.Mock() on_eof.side_effect = ValueError @@ -1200,7 +1201,7 @@ async def test_on_eof_exc_in_callback_empty_stream_reader() -> None: async def test_on_eof_eof_is_set(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) reader.feed_eof() on_eof = mock.Mock() @@ -1211,7 +1212,7 @@ async def test_on_eof_eof_is_set(protocol) -> None: async def test_on_eof_eof_is_set_exception(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) reader.feed_eof() on_eof = mock.Mock() @@ -1224,7 +1225,7 @@ async def test_on_eof_eof_is_set_exception(protocol) -> None: async def test_set_exception(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() eof_waiter = reader._eof_waiter = loop.create_future() @@ -1239,7 +1240,7 @@ async def test_set_exception(protocol) -> None: async def test_set_exception_cancelled(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) waiter = reader._waiter = loop.create_future() eof_waiter = reader._eof_waiter = loop.create_future() @@ -1257,7 +1258,7 @@ async def test_set_exception_cancelled(protocol) -> None: async def test_set_exception_eof_callbacks(protocol) -> None: loop = asyncio.get_event_loop() - reader = streams.StreamReader(protocol, loop=loop) + reader = streams.StreamReader(protocol, 2 ** 16, loop=loop) on_eof = mock.Mock() reader.on_eof(on_eof) @@ -1342,7 +1343,7 @@ async def test_stream_reader_iter_chunks_no_chunked_encoding() -> None: async def test_stream_reader_iter_chunks_chunked_encoding(protocol) -> None: loop = asyncio.get_event_loop() - stream = streams.StreamReader(protocol, loop=loop) + stream = streams.StreamReader(protocol, 2 ** 16, loop=loop) for line in DATA.splitlines(keepends=True): stream.begin_http_chunk_receiving() stream.feed_data(line) diff --git a/tests/test_web_request.py b/tests/test_web_request.py index 04266e3c405..246b2cce6ac 100644 --- a/tests/test_web_request.py +++ b/tests/test_web_request.py @@ -548,7 +548,7 @@ def test_clone_headers_dict() -> None: async def test_cannot_clone_after_read(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) payload.feed_data(b'data') payload.feed_eof() req = make_mocked_request('GET', '/path', payload=payload) @@ -558,7 +558,7 @@ async def test_cannot_clone_after_read(protocol) -> None: async def test_make_too_big_request(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) large_file = 1024 ** 2 * b'x' too_large_file = large_file + b'x' payload.feed_data(too_large_file) @@ -571,7 +571,7 @@ async def test_make_too_big_request(protocol) -> None: async def test_request_with_wrong_content_type_encoding(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) payload.feed_data(b'{}') payload.feed_eof() headers = {'Content-Type': 'text/html; charset=test'} @@ -583,7 +583,7 @@ async def test_request_with_wrong_content_type_encoding(protocol) -> None: async def test_make_too_big_request_same_size_to_max(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) large_file = 1024 ** 2 * b'x' payload.feed_data(large_file) payload.feed_eof() @@ -594,7 +594,7 @@ async def test_make_too_big_request_same_size_to_max(protocol) -> None: async def test_make_too_big_request_adjust_limit(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) large_file = 1024 ** 2 * b'x' too_large_file = large_file + b'x' payload.feed_data(too_large_file) @@ -607,7 +607,7 @@ async def test_make_too_big_request_adjust_limit(protocol) -> None: async def test_multipart_formdata(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) payload.feed_data(b"""-----------------------------326931944431359\r Content-Disposition: form-data; name="a"\r \r @@ -628,7 +628,7 @@ async def test_multipart_formdata(protocol) -> None: async def test_make_too_big_request_limit_None(protocol) -> None: - payload = StreamReader(protocol, loop=asyncio.get_event_loop()) + payload = StreamReader(protocol, 2 ** 16, loop=asyncio.get_event_loop()) large_file = 1024 ** 2 * b'x' too_large_file = large_file + b'x' payload.feed_data(too_large_file) From f2e85ffc727359d719dd01b0d9a7bd504cf55b5f Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 14:40:24 +0300 Subject: [PATCH 2/8] Convert more --- aiohttp/_http_parser.pyx | 7 +++---- aiohttp/client_proto.py | 2 +- aiohttp/http_parser.py | 4 ++-- aiohttp/web_protocol.py | 2 +- tests/test_http_parser.py | 6 +++--- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/aiohttp/_http_parser.pyx b/aiohttp/_http_parser.pyx index 28736a322d5..b43976bf4b5 100644 --- a/aiohttp/_http_parser.pyx +++ b/aiohttp/_http_parser.pyx @@ -567,11 +567,10 @@ cdef class HttpParser: cdef class HttpRequestParser(HttpParser): - def __init__(self, protocol, loop, timer=None, + def __init__(self, protocol, loop, int limit, timer=None, size_t max_line_size=8190, size_t max_headers=32768, size_t max_field_size=8190, payload_exception=None, bint response_with_body=True, bint read_until_eof=False, - int limit=2**16, ): self._init(cparser.HTTP_REQUEST, protocol, loop, limit, timer, max_line_size, max_headers, max_field_size, @@ -596,11 +595,11 @@ cdef class HttpRequestParser(HttpParser): cdef class HttpResponseParser(HttpParser): - def __init__(self, protocol, loop, timer=None, + def __init__(self, protocol, loop, int limit, timer=None, size_t max_line_size=8190, size_t max_headers=32768, size_t max_field_size=8190, payload_exception=None, bint response_with_body=True, bint read_until_eof=False, - bint auto_decompress=True, int limit=2**16 + bint auto_decompress=True ): self._init(cparser.HTTP_RESPONSE, protocol, loop, limit, timer, max_line_size, max_headers, max_field_size, diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index 859398f4495..ff98caa0159 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -151,7 +151,7 @@ def set_response_params(self, *, timer: BaseTimerContext=None, self._reschedule_timeout() self._parser = HttpResponseParser( - self, self._loop, timer=timer, + self, self._loop, 2 ** 16, timer=timer, payload_exception=ClientPayloadError, response_with_body=not skip_payload, read_until_eof=read_until_eof, diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index ea2df3b9008..18a37a54fc8 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -168,6 +168,7 @@ class HttpParser(abc.ABC): def __init__(self, protocol: BaseProtocol, loop: asyncio.AbstractEventLoop, + limit: int, max_line_size: int=8190, max_headers: int=32768, max_field_size: int=8190, @@ -178,8 +179,7 @@ def __init__(self, protocol: BaseProtocol, payload_exception: Optional[Type[BaseException]]=None, response_with_body: bool=True, read_until_eof: bool=False, - auto_decompress: bool=True, - limit: int=2**16) -> None: + auto_decompress: bool=True) -> None: self.protocol = protocol self.loop = loop self.max_line_size = max_line_size diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 0e9597aee8f..6e8212f3bfa 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -179,7 +179,7 @@ def __init__(self, manager: 'Server', *, self._upgrade = False self._payload_parser = None # type: Any self._request_parser = HttpRequestParser( - self, loop, + self, loop, 2 ** 16, max_line_size=max_line_size, max_field_size=max_field_size, max_headers=max_headers, diff --git a/tests/test_http_parser.py b/tests/test_http_parser.py index cc9bc0d4d07..2d57a93e7d4 100644 --- a/tests/test_http_parser.py +++ b/tests/test_http_parser.py @@ -41,7 +41,7 @@ def protocol(): @pytest.fixture(params=REQUEST_PARSERS) def parser(loop, protocol, request): # Parser implementations - return request.param(protocol, loop, + return request.param(protocol, loop, 2 ** 16, max_line_size=8190, max_headers=32768, max_field_size=8190) @@ -56,7 +56,7 @@ def request_cls(request): @pytest.fixture(params=RESPONSE_PARSERS) def response(loop, protocol, request): # Parser implementations - return request.param(protocol, loop, + return request.param(protocol, loop, 2 ** 16, max_line_size=8190, max_headers=32768, max_field_size=8190) @@ -732,7 +732,7 @@ def _test_parse_no_length_or_te_on_post(loop, protocol, request_cls): def test_parse_payload_response_without_body(loop, protocol, response_cls) -> None: - parser = response_cls(protocol, loop, response_with_body=False) + parser = response_cls(protocol, loop, 2 ** 16, response_with_body=False) text = (b'HTTP/1.1 200 Ok\r\n' b'content-length: 10\r\n\r\n') msg, payload = parser.feed_data(text)[0][0] From 4c6e08942381353f3f31840ac2632f53c5c3e791 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 15:05:04 +0300 Subject: [PATCH 3/8] Work on --- aiohttp/client.py | 26 +++++++++++++++++++------- aiohttp/client_proto.py | 5 +++-- aiohttp/web_protocol.py | 5 +++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index 7778d2cb877..c4dba2bc60d 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -183,7 +183,9 @@ class ClientSession: '_timeout', '_raise_for_status', '_auto_decompress', '_trust_env', '_default_headers', '_skip_auto_headers', '_request_class', '_response_class', - '_ws_response_class', '_trace_configs') + '_ws_response_class', '_trace_configs', + '_read_bufsize', + ) def __init__(self, *, connector: Optional[BaseConnector]=None, cookies: Optional[LooseCookies]=None, @@ -202,7 +204,8 @@ def __init__(self, *, connector: Optional[BaseConnector]=None, auto_decompress: bool=True, trust_env: bool=False, requote_redirect_url: bool=True, - trace_configs: Optional[List[TraceConfig]]=None) -> None: + trace_configs: Optional[List[TraceConfig]]=None, + read_bufsize: int=2**16) -> None: loop = get_running_loop() if connector is None: @@ -240,6 +243,7 @@ def __init__(self, *, connector: Optional[BaseConnector]=None, self._auto_decompress = auto_decompress self._trust_env = trust_env self._requote_redirect_url = requote_redirect_url + self._read_bufsize = read_bufsize # Convert to list of tuples if headers: @@ -315,7 +319,8 @@ async def _request( timeout: Union[ClientTimeout, object]=sentinel, ssl: Optional[Union[SSLContext, bool, Fingerprint]]=None, proxy_headers: Optional[LooseHeaders]=None, - trace_request_ctx: Optional[SimpleNamespace]=None + trace_request_ctx: Optional[SimpleNamespace]=None, + read_bufsize: Optional[int] = None ) -> ClientResponse: # NOTE: timeout clamps existing connect and read timeouts. We cannot @@ -371,6 +376,9 @@ async def _request( tm = TimeoutHandle(self._loop, real_timeout.total) handle = tm.start() + if read_bufsize is None: + read_bufsize = self._read_bufsize + traces = [ Trace( self, @@ -461,7 +469,8 @@ async def _request( skip_payload=method.upper() == 'HEAD', read_until_eof=read_until_eof, auto_decompress=self._auto_decompress, - read_timeout=real_timeout.sock_read) + read_timeout=real_timeout.sock_read, + read_bufsize=read_bufsize) try: try: @@ -1100,7 +1109,8 @@ def request( timeout: Union[ClientTimeout, object]=sentinel, cookies: Optional[LooseCookies]=None, version: HttpVersion=http.HttpVersion11, - connector: Optional[BaseConnector]=None + connector: Optional[BaseConnector]=None, + read_bufsize: Optional[int] = None, ) -> _SessionRequestContextManager: """Constructs and sends a request. Returns response object. method - HTTP method @@ -1161,5 +1171,7 @@ def request( raise_for_status=raise_for_status, read_until_eof=read_until_eof, proxy=proxy, - proxy_auth=proxy_auth,), - session) + proxy_auth=proxy_auth, + read_bufsize=read_bufsize), + session + ) diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index ff98caa0159..f6d191725ef 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -144,14 +144,15 @@ def set_response_params(self, *, timer: BaseTimerContext=None, skip_payload: bool=False, read_until_eof: bool=False, auto_decompress: bool=True, - read_timeout: Optional[float]=None) -> None: + read_timeout: Optional[float]=None, + read_bufsize: int = 2 ** 16) -> None: self._skip_payload = skip_payload self._read_timeout = read_timeout self._reschedule_timeout() self._parser = HttpResponseParser( - self, self._loop, 2 ** 16, timer=timer, + self, self._loop, read_bufsize, timer=timer, payload_exception=ClientPayloadError, response_with_body=not skip_payload, read_until_eof=read_until_eof, diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 6e8212f3bfa..30e9c387e2a 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -151,7 +151,8 @@ def __init__(self, manager: 'Server', *, max_line_size: int=8190, max_headers: int=32768, max_field_size: int=8190, - lingering_time: float=10.0): + lingering_time: float=10.0, + read_bufsize: int=2 ** 16): super().__init__(loop) @@ -179,7 +180,7 @@ def __init__(self, manager: 'Server', *, self._upgrade = False self._payload_parser = None # type: Any self._request_parser = HttpRequestParser( - self, loop, 2 ** 16, + self, loop, read_bufsize, max_line_size=max_line_size, max_field_size=max_field_size, max_headers=max_headers, From 2c82570df79f78cbd0f7fa455cde8aa1797a9f49 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 15:27:33 +0300 Subject: [PATCH 4/8] Work on --- aiohttp/client.py | 2 +- aiohttp/streams.py | 4 ++-- aiohttp/web_ws.py | 2 +- tests/test_http_parser.py | 19 +++++++++++++++++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index c4dba2bc60d..8c12a2561cc 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -794,7 +794,7 @@ async def _ws_connect( transport = conn.transport assert transport is not None reader = FlowControlDataQueue( - conn_proto, limit=2 ** 16, loop=self._loop) # type: FlowControlDataQueue[WSMessage] # noqa + conn_proto, 2 ** 16, loop=self._loop) # type: FlowControlDataQueue[WSMessage] # noqa conn_proto.set_parser( WebSocketReader(reader, max_msg_size), reader) writer = WebSocketWriter( diff --git a/aiohttp/streams.py b/aiohttp/streams.py index e6441beca24..416f459808f 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -597,8 +597,8 @@ class FlowControlDataQueue(DataQueue[_T]): It is a destination for parsed data.""" - def __init__(self, protocol: BaseProtocol, *, - limit: int=2**16, + def __init__(self, protocol: BaseProtocol, + limit: int, *, loop: asyncio.AbstractEventLoop) -> None: super().__init__(loop=loop) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 6ba7b419e2e..b8794f705bd 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -232,7 +232,7 @@ def _post_start(self, request: BaseRequest, loop = self._loop assert loop is not None self._reader = FlowControlDataQueue( - request._protocol, limit=2 ** 16, loop=loop) + request._protocol, 2 ** 16, loop=loop) request.protocol.set_parser(WebSocketReader( self._reader, self._max_msg_size, compress=self._compress)) # disable HTTP keepalive for WebSocket diff --git a/tests/test_http_parser.py b/tests/test_http_parser.py index 2d57a93e7d4..3aeb3cdf519 100644 --- a/tests/test_http_parser.py +++ b/tests/test_http_parser.py @@ -791,6 +791,7 @@ class TestParsePayload: async def test_parse_eof_payload(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, readall=True) p.feed_data(b'data') @@ -801,6 +802,7 @@ async def test_parse_eof_payload(self, stream) -> None: async def test_parse_no_body(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, method='PUT') @@ -809,6 +811,7 @@ async def test_parse_no_body(self, stream) -> None: async def test_parse_length_payload_eof(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=4) @@ -819,6 +822,7 @@ async def test_parse_length_payload_eof(self, stream) -> None: async def test_parse_chunked_payload_size_error(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, chunked=True) with pytest.raises(http_exceptions.TransferEncodingError): @@ -889,6 +893,7 @@ async def test_parse_chunked_payload_split_end_trailers4(self, async def test_http_payload_parser_length(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=2) eof, tail = p.feed_data(b'1245') @@ -903,6 +908,7 @@ async def test_http_payload_parser_deflate(self, stream) -> None: length = len(COMPRESSED) out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=length, compression='deflate') p.feed_data(COMPRESSED) @@ -917,6 +923,7 @@ async def test_http_payload_parser_deflate_no_hdrs(self, stream) -> None: length = len(COMPRESSED) out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=length, compression='deflate') p.feed_data(COMPRESSED) @@ -929,6 +936,7 @@ async def test_http_payload_parser_deflate_light(self, stream) -> None: length = len(COMPRESSED) out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=length, compression='deflate') p.feed_data(COMPRESSED) @@ -937,6 +945,7 @@ async def test_http_payload_parser_deflate_light(self, stream) -> None: async def test_http_payload_parser_deflate_split(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, compression='deflate', readall=True) # Feeding one correct byte should be enough to choose exact @@ -948,6 +957,7 @@ async def test_http_payload_parser_deflate_split(self, stream) -> None: async def test_http_payload_parser_deflate_split_err(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, compression='deflate', readall=True) # Feeding one wrong byte should be enough to choose exact @@ -959,6 +969,7 @@ async def test_http_payload_parser_deflate_split_err(self, stream) -> None: async def test_http_payload_parser_length_zero(self, stream) -> None: out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser(out, length=0) assert p.done @@ -968,6 +979,7 @@ async def test_http_payload_parser_length_zero(self, stream) -> None: async def test_http_payload_brotli(self, stream) -> None: compressed = brotli.compress(b'brotli data') out = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) p = HttpPayloadParser( out, length=len(compressed), compression='br') @@ -980,6 +992,7 @@ class TestDeflateBuffer: async def test_feed_data(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'deflate') @@ -992,6 +1005,7 @@ async def test_feed_data(self, stream) -> None: async def test_feed_data_err(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'deflate') @@ -1006,6 +1020,7 @@ async def test_feed_data_err(self, stream) -> None: async def test_feed_eof(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'deflate') @@ -1018,6 +1033,7 @@ async def test_feed_eof(self, stream) -> None: async def test_feed_eof_err_deflate(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'deflate') @@ -1030,6 +1046,7 @@ async def test_feed_eof_err_deflate(self, stream) -> None: async def test_feed_eof_no_err_gzip(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'gzip') @@ -1042,6 +1059,7 @@ async def test_feed_eof_no_err_gzip(self, stream) -> None: async def test_feed_eof_no_err_brotli(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'br') @@ -1054,6 +1072,7 @@ async def test_feed_eof_no_err_brotli(self, stream) -> None: async def test_empty_body(self, stream) -> None: buf = aiohttp.FlowControlDataQueue(stream, + 2 ** 16, loop=asyncio.get_event_loop()) dbuf = DeflateBuffer(buf, 'deflate') dbuf.feed_eof() From d6f4c0ae0db35d49167e0417ed1bc6f9604455b8 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 15:46:49 +0300 Subject: [PATCH 5/8] More tests --- aiohttp/streams.py | 3 +++ tests/test_client_functional.py | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 416f459808f..afe449f8e11 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -138,6 +138,9 @@ def __repr__(self) -> str: info.append('e=%r' % self._exception) return '<%s>' % ' '.join(info) + def get_read_buffer_limits(self) -> Tuple[int, int]: + return (self._low_water, self._high_water) + def exception(self) -> Optional[BaseException]: return self._exception diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 8fe0e2e1d4d..44b43a510ff 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -3006,3 +3006,29 @@ async def handler(request): with pytest.raises(aiohttp.ServerTimeoutError): async with await client.get('/') as resp: await resp.read() + + +async def test_read_bufsize_session_default(aiohttp_client) -> None: + async def handler(request): + return web.Response(body=b'1234567') + + app = web.Application() + app.add_routes([web.get('/', handler)]) + + client = await aiohttp_client(app, read_bufsize=2) + + async with await client.get('/') as resp: + assert resp.content.get_read_buffer_limits() == (2, 4) + + +async def test_read_bufsize_explicit(aiohttp_client) -> None: + async def handler(request): + return web.Response(body=b'1234567') + + app = web.Application() + app.add_routes([web.get('/', handler)]) + + client = await aiohttp_client(app) + + async with await client.get('/', read_bufsize=4) as resp: + assert resp.content.get_read_buffer_limits() == (4, 8) From 282c64ccfc9d4b59daba07d5585f4582d0c4a043 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 16:26:27 +0300 Subject: [PATCH 6/8] More tests --- tests/test_web_functional.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_web_functional.py b/tests/test_web_functional.py index 5c011c2a006..b87905821b2 100644 --- a/tests/test_web_functional.py +++ b/tests/test_web_functional.py @@ -1923,3 +1923,19 @@ async def on_prepare(request, response): resp = await client.get('/') assert resp.status == 404 assert resp.headers['X-Custom'] == 'val' + + +async def test_read_bufsize(aiohttp_client) -> None: + + async def handler(request): + ret = request.content.get_read_buffer_limits() + data = await request.text() # read posted data + return web.Response(text=f"{data} {ret!r}") + + app = web.Application(handler_args={"read_bufsize": 2}) + app.router.add_post('/', handler) + + client = await aiohttp_client(app) + resp = await client.post('/', data=b'data') + assert resp.status == 200 + assert await resp.text() == "data (2, 4)" From 2d60416e0d8af296911d4d908e7b4f79603269a9 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 16:44:11 +0300 Subject: [PATCH 7/8] Add docs --- docs/client_reference.rst | 23 ++++++++++++++++++++++- docs/spelling_wordlist.txt | 2 ++ docs/web_reference.rst | 6 ++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/docs/client_reference.rst b/docs/client_reference.rst index 738cb286db8..2ac53a8f91e 100644 --- a/docs/client_reference.rst +++ b/docs/client_reference.rst @@ -46,6 +46,7 @@ The client session supports the context manager protocol for self closing. raise_for_status=False, \ connector_owner=True, \ auto_decompress=True, \ + read_bufsize=2**16, \ requote_redirect_url=False, \ trust_env=False, \ trace_configs=None) @@ -153,6 +154,11 @@ The client session supports the context manager protocol for self closing. .. versionadded:: 2.3 + :param int read_bufsize: Size of the read buffer (:attr:`ClientResponse.content`). + 64 KiB by default. + + .. versionadded:: 3.7 + :param bool trust_env: Get proxies information from *HTTP_PROXY* / *HTTPS_PROXY* environment variables if the parameter is ``True`` (``False`` by default). @@ -296,7 +302,9 @@ The client session supports the context manager protocol for self closing. auth=None, allow_redirects=True,\ max_redirects=10,\ compress=None, chunked=None, expect100=False, raise_for_status=None,\ - read_until_eof=True, proxy=None, proxy_auth=None,\ + read_until_eof=True, \ + read_bufsize=None, \ + proxy=None, proxy_auth=None,\ timeout=sentinel, ssl=None, \ proxy_headers=None) :async-with: @@ -418,6 +426,12 @@ The client session supports the context manager protocol for self closing. does not have Content-Length header. ``True`` by default (optional). + :param int read_bufsize: Size of the read buffer (:attr:`ClientResponse.content`). + ``None`` by default, + it means that the session global value is used. + + .. versionadded:: 3.7 + :param proxy: Proxy URL, :class:`str` or :class:`~yarl.URL` (optional) :param aiohttp.BasicAuth proxy_auth: an object that represents proxy HTTP @@ -705,6 +719,7 @@ certification chaining. encoding='utf-8', \ version=HttpVersion(major=1, minor=1), \ compress=None, chunked=None, expect100=False, raise_for_status=False, \ + read_bufsize=None, \ connector=None, \ read_until_eof=True, timeout=sentinel) :async-with: @@ -766,6 +781,12 @@ certification chaining. does not have Content-Length header. ``True`` by default (optional). + :param int read_bufsize: Size of the read buffer (:attr:`ClientResponse.content`). + ``None`` by default, + it means that the session global value is used. + + .. versionadded:: 3.7 + :param timeout: a :class:`ClientTimeout` settings structure, 300 seconds (5min) total timeout by default. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index bbb8837c8a2..e187f65b559 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -138,6 +138,8 @@ keepalive keepalived keepalives keepaliving +KiB +kib kwarg latin lifecycle diff --git a/docs/web_reference.rst b/docs/web_reference.rst index 915e9f886b3..297a3a10d4e 100644 --- a/docs/web_reference.rst +++ b/docs/web_reference.rst @@ -2544,6 +2544,12 @@ application on specific TCP or Unix socket, e.g.:: reads and ignores additional data coming from the client when lingering close is on. Use ``0`` to disable lingering on server channel closing. + :param int read_bufsize: Size of the read buffer (:attr:`BaseRequest.content`). + ``None`` by default, + it means that the session global value is used. + + .. versionadded:: 3.7 + .. attribute:: app From 8559588ee2fbe1708a3bba4c08d22f8bdae7759e Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 17 Oct 2020 16:45:38 +0300 Subject: [PATCH 8/8] Add CHANGES --- CHANGES/4453.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/4453.feature diff --git a/CHANGES/4453.feature b/CHANGES/4453.feature new file mode 100644 index 00000000000..bf6df98b969 --- /dev/null +++ b/CHANGES/4453.feature @@ -0,0 +1 @@ +Allow configuring the sbuffer size of input stream by passing ``read_bufsize`` argument.