Skip to content

Commit

Permalink
Feature/bugfix: make the HTTP client able to return HTTP chunks when …
Browse files Browse the repository at this point in the history
…chunked transfer encoding is used (#2150)

* implement http chunk parsing in http parser (C version)

* http chunk decoding: implement chunk signals in Python parser

* StreamReader: add tests for [begin|end]_chunk_receiving methods

* update documentation to clarify the difference between iter_any() and iter_chunks()

* add tests for http chunks parsing

* add changelog file for PR 2150

* http chunk parsing: readchunk() now returns tuples of (data, end_of_http_chunk)

* http chunk parsing: adapt iterchunks() generator to new return format

* streams.py: use parenthesis for line wrapping instead of backslash

* add unit tests for ChunkTupleAsyncStreamIterator

* do not catch EofStream in ChunkTupleAsyncStreamIterator

* change the behaviour of stream.readchunk when searching for the next http chunk

* add tests to the stream.readchunk() method

* http_parser.py: remove useless blank line

* update documentation in streams.rst

* update documentation in docs/client_reference.rst

* minor change to test_streams.py

* change formatting in streams.rst

* fix spelling errors in documentation

* stream.rs: replace 'boolean' with :class:
  • Loading branch information
jlacoline authored and asvetlov committed Sep 1, 2017
1 parent 42361c5 commit e7c9390
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 23 deletions.
29 changes: 29 additions & 0 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ cdef class HttpParser:
self._csettings.on_body = cb_on_body
self._csettings.on_message_begin = cb_on_message_begin
self._csettings.on_message_complete = cb_on_message_complete
self._csettings.on_chunk_header = cb_on_chunk_header
self._csettings.on_chunk_complete = cb_on_chunk_complete

self._last_error = None

Expand Down Expand Up @@ -208,6 +210,11 @@ cdef class HttpParser:
self._payload.feed_eof()
self._payload = None

cdef _on_chunk_header(self):
self._payload.begin_http_chunk_receiving()

cdef _on_chunk_complete(self):
self._payload.end_http_chunk_receiving()

### Public API ###

Expand Down Expand Up @@ -436,6 +443,28 @@ cdef int cb_on_message_complete(cparser.http_parser* parser) except -1:
return 0


cdef int cb_on_chunk_header(cparser.http_parser* parser) except -1:
cdef HttpParser pyparser = <HttpParser>parser.data
try:
pyparser._on_chunk_header()
except BaseException as exc:
pyparser._last_error = exc
return -1
else:
return 0


cdef int cb_on_chunk_complete(cparser.http_parser* parser) except -1:
cdef HttpParser pyparser = <HttpParser>parser.data
try:
pyparser._on_chunk_complete()
except BaseException as exc:
pyparser._last_error = exc
return -1
else:
return 0


cdef parser_error_from_errno(cparser.http_errno errno):
cdef bytes desc = cparser.http_errno_description(errno)

Expand Down
13 changes: 9 additions & 4 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'):
else:
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
self._chunk_size = size
self.payload.begin_http_chunk_receiving()
else:
self._chunk_tail = chunk
return False, None
Expand All @@ -547,18 +548,16 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'):
required = self._chunk_size
chunk_len = len(chunk)

if required >= chunk_len:
if required > chunk_len:
self._chunk_size = required - chunk_len
if self._chunk_size == 0:
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF

self.payload.feed_data(chunk, chunk_len)
return False, None
else:
self._chunk_size = 0
self.payload.feed_data(chunk[:required], required)
chunk = chunk[required:]
self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
self.payload.end_http_chunk_receiving()

# toss the CRLF at the end of the chunk
if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
Expand Down Expand Up @@ -644,6 +643,12 @@ def feed_eof(self):

self.out.feed_eof()

def begin_http_chunk_receiving(self):
self.out.begin_http_chunk_receiving()

def end_http_chunk_receiving(self):
self.out.end_http_chunk_receiving()


HttpRequestParser = HttpRequestParserPy
HttpResponseParser = HttpResponseParserPy
Expand Down
61 changes: 52 additions & 9 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ def __anext__(self):
raise StopAsyncIteration # NOQA
return rv

class ChunkTupleAsyncStreamIterator(AsyncStreamIterator):
@asyncio.coroutine
def __anext__(self):
rv = yield from self.read_func()
if rv == (b'', False):
raise StopAsyncIteration # NOQA
return rv


class AsyncStreamReaderMixin:

Expand All @@ -58,20 +66,21 @@ def iter_chunked(self, n):
return AsyncStreamIterator(lambda: self.read(n))

def iter_any(self):
"""Returns an asynchronous iterator that yields slices of data
as they come.
"""Returns an asynchronous iterator that yields all the available
data as soon as it is received
Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readany)

def iter_chunks(self):
"""Returns an asynchronous iterator that yields chunks of the
size as received by the server.
"""Returns an asynchronous iterator that yields chunks of data
as they are received by the server. The yielded objects are tuples
of (bytes, bool) as returned by the StreamReader.readchunk method.
Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readchunk)
return ChunkTupleAsyncStreamIterator(self.readchunk)


class StreamReader(AsyncStreamReaderMixin):
Expand All @@ -96,6 +105,8 @@ def __init__(self, limit=DEFAULT_LIMIT, timer=None, loop=None):
loop = asyncio.get_event_loop()
self._loop = loop
self._size = 0
self._cursor = 0
self._http_chunk_splits = None
self._buffer = collections.deque()
self._buffer_offset = 0
self._eof = False
Expand Down Expand Up @@ -200,6 +211,7 @@ def unread_data(self, data):
self._buffer[0] = self._buffer[0][self._buffer_offset:]
self._buffer_offset = 0
self._size += len(data)
self._cursor -= len(data)
self._buffer.appendleft(data)

def feed_data(self, data):
Expand All @@ -218,6 +230,18 @@ def feed_data(self, data):
if not waiter.done():
waiter.set_result(False)

def begin_http_chunk_receiving(self):
if self._http_chunk_splits is None:
self._http_chunk_splits = []

def end_http_chunk_receiving(self):
if self._http_chunk_splits is None:
raise RuntimeError("Called end_chunk_receiving without calling "
"begin_chunk_receiving first")
if not self._http_chunk_splits or \
self._http_chunk_splits[-1] != self.total_bytes:
self._http_chunk_splits.append(self.total_bytes)

@asyncio.coroutine
def _wait(self, func_name):
# StreamReader uses a future to link the protocol feed_data() method
Expand Down Expand Up @@ -320,16 +344,34 @@ def readany(self):

@asyncio.coroutine
def readchunk(self):
"""Returns a tuple of (data, end_of_http_chunk). When chunked transfer
encoding is used, end_of_http_chunk is a boolean indicating if the end
of the data corresponds to the end of a HTTP chunk , otherwise it is
always False.
"""
if self._exception is not None:
raise self._exception

if not self._buffer and not self._eof:
if (self._http_chunk_splits and
self._cursor == self._http_chunk_splits[0]):
# end of http chunk without available data
self._http_chunk_splits = self._http_chunk_splits[1:]
return (b"", True)
yield from self._wait('readchunk')

if self._buffer:
return self._read_nowait_chunk(-1)
if not self._buffer:
# end of file
return (b"", False)
elif self._http_chunk_splits is not None:
while self._http_chunk_splits:
pos = self._http_chunk_splits[0]
self._http_chunk_splits = self._http_chunk_splits[1:]
if pos > self._cursor:
return (self._read_nowait(pos-self._cursor), True)
return (self._read_nowait(-1), False)
else:
return b""
return (self._read_nowait_chunk(-1), False)

@asyncio.coroutine
def readexactly(self, n):
Expand Down Expand Up @@ -378,6 +420,7 @@ def _read_nowait_chunk(self, n):
data = self._buffer.popleft()

self._size -= len(data)
self._cursor += len(data)
return data

def _read_nowait(self, n):
Expand Down Expand Up @@ -438,7 +481,7 @@ def readany(self):

@asyncio.coroutine
def readchunk(self):
return b''
return (b'', False)

@asyncio.coroutine
def readexactly(self, n):
Expand Down
1 change: 1 addition & 0 deletions changes/2150.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the HTTP client able to return HTTP chunks when chunked transfer encoding is used.
5 changes: 4 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,10 @@ Response object

.. attribute:: content

Payload stream, contains response's BODY (:class:`StreamReader`).
Payload stream, which contains response's BODY (:class:`StreamReader`).
It supports various reading methods depending on the expected format.
When chunked transfer encoding is used by the server, allows retrieving
the actual http chunks.

Reading from the stream may raise
:exc:`aiohttp.ClientPayloadError` if the response object is
Expand Down
28 changes: 27 additions & 1 deletion docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ Reading Methods
:return bytes: the given line


.. comethod:: StreamReader.readchunk()

Read a chunk of data as it was received by the server.

Returns a tuple of (data, end_of_HTTP_chunk).

When chunked transfer encoding is used, end_of_HTTP_chunk is a :class:`bool`
indicating if the end of the data corresponds to the end of a HTTP chunk,
otherwise it is always ``False``.

:return tuple[bytes, bool]: a chunk of data and a :class:`bool` that is ``True``
when the end of the returned chunk corresponds
to the end of a HTTP chunk.


Asynchronous Iteration Support
------------------------------

Expand Down Expand Up @@ -109,9 +124,20 @@ size limit and over any available data.

Iterates over data chunks as received from the server::

async for data in response.content.iter_chunks():
async for data, _ in response.content.iter_chunks():
print(data)

If chunked transfer encoding is used, the original http chunks formatting
can be retrieved by reading the second element of returned tuples::

buffer = b""

async for data, end_of_http_chunk in response.content.iter_chunks():
buffer += data
if end_of_http_chunk:
print(buffer)
buffer = b""


Helpers
-------
Expand Down
6 changes: 4 additions & 2 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ def test_readany_resume_paused(self):
def test_readchunk(self):
r = self._make_one()
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readchunk())
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertFalse(r._protocol.resume_reading.called)

def test_readchunk_resume_paused(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readchunk())
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertTrue(r._protocol.resume_reading.called)

def test_readexactly(self):
Expand Down
8 changes: 7 additions & 1 deletion tests/test_http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ def test_http_request_chunked_payload(parser):
parser.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand All @@ -502,6 +503,7 @@ def test_http_request_chunked_payload_and_next_message(parser):
b'transfer-encoding: chunked\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()

assert len(messages) == 1
Expand All @@ -521,14 +523,17 @@ def test_http_request_chunked_payload_chunks(parser):
parser.feed_data(b'\n4')
parser.feed_data(b'\r')
parser.feed_data(b'\n')
parser.feed_data(b'line\r\n0\r\n')
parser.feed_data(b'li')
parser.feed_data(b'ne\r\n0\r\n')
parser.feed_data(b'test: test\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert not payload.is_eof()

parser.feed_data(b'\r\n')
assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand All @@ -541,6 +546,7 @@ def test_parse_chunked_payload_chunk_extension(parser):
b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest: test\r\n\r\n')

assert b'dataline' == b''.join(d for d in payload._buffer)
assert [4, 8] == payload._http_chunk_splits
assert payload.is_eof()


Expand Down
21 changes: 21 additions & 0 deletions tests/test_py35/test_streams_35.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,24 @@ async def test_stream_reader_iter(loop):
async for raw in create_stream(loop):
assert raw == next(it)
pytest.raises(StopIteration, next, it)


async def test_stream_reader_iter_chunks_no_chunked_encoding(loop):
it = iter([b'line1\nline2\nline3\n'])
async for data, end_of_chunk in create_stream(loop).iter_chunks():
assert (data, end_of_chunk) == (next(it), False)
pytest.raises(StopIteration, next, it)


async def test_stream_reader_iter_chunks_chunked_encoding(loop):
stream = streams.StreamReader(loop=loop)
for line in DATA.splitlines(keepends=True):
stream.begin_http_chunk_receiving()
stream.feed_data(line)
stream.end_http_chunk_receiving()
stream.feed_eof()

it = iter([b'line1\n', b'line2\n', b'line3\n'])
async for data, end_of_chunk in stream.iter_chunks():
assert (data, end_of_chunk) == (next(it), True)
pytest.raises(StopIteration, next, it)
Loading

0 comments on commit e7c9390

Please sign in to comment.