diff --git a/CHANGES/3631.bugfix b/CHANGES/3631.bugfix new file mode 100644 index 00000000000..1acb123c4b2 --- /dev/null +++ b/CHANGES/3631.bugfix @@ -0,0 +1 @@ +Cleanup per-chunk data in generic data read. Memory leak fixed. diff --git a/aiohttp/streams.py b/aiohttp/streams.py index be7da4b4f36..5e70654ead7 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -460,6 +460,11 @@ def _read_nowait_chunk(self, n: int) -> bytes: self._size -= len(data) self._cursor += len(data) + chunk_splits = self._http_chunk_splits + # Prevent memory leak: drop useless chunk splits + while chunk_splits and chunk_splits[0] < self._cursor: + chunk_splits.pop(0) + if self._size < self._low_water and self._protocol._reading_paused: self._protocol.resume_reading() return data diff --git a/tests/test_streams.py b/tests/test_streams.py index 1c11a77447f..7c4ce09ab88 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,7 +1,12 @@ """Tests for streams.py""" +import abc import asyncio +import gc import re +import types +from collections import defaultdict +from itertools import groupby from unittest import mock import pytest @@ -30,6 +35,36 @@ def protocol(): return mock.Mock(_reading_paused=False) +MEMLEAK_SKIP_TYPES = ( + *(getattr(types, name) for name in types.__all__ if name.endswith('Type')), + mock.Mock, + abc.ABCMeta, +) + +def get_memory_usage(obj): + objs=[obj] + # Memory leak may be caused by leaked links to same objects. + # Without link counting, [1,2,3] may be tracked the same as [1,2,3,3,3,3,3,3] + known = defaultdict(int) + known[id(obj)] += 1 + + while objs: + refs = gc.get_referents(*objs) + objs = [] + for obj in refs: + if isinstance(obj, MEMLEAK_SKIP_TYPES): + continue + i = id(obj) + known[i] += 1 + if known[i] == 1: + objs.append(obj) + + # Make list of objects uniq + objs.sort(key=id) + objs = [next(g) for (i, g) in groupby(objs, id)] + + return sum(cnt for cnt in known.values()) + class TestStreamReader: DATA = b'line1\nline2\nline3\n' @@ -739,6 +774,9 @@ async def test_readchunk_with_other_read_calls(self) -> None: stream.begin_http_chunk_receiving() stream.feed_data(b'part2') stream.end_http_chunk_receiving() + stream.begin_http_chunk_receiving() + stream.feed_data(b'part3') + stream.end_http_chunk_receiving() data = await stream.read(7) assert b'part1pa' == data @@ -747,11 +785,40 @@ async def test_readchunk_with_other_read_calls(self) -> None: assert b'rt2' == data assert end_of_chunk + # Corner case between read/readchunk + data = await stream.read(5) + assert b'part3' == data + + data, end_of_chunk = await stream.readchunk() + assert b'' == data + assert end_of_chunk + stream.feed_eof() + data, end_of_chunk = await stream.readchunk() assert b'' == data assert not end_of_chunk + async def test_chunksplits_memory_leak(self) -> None: + """ Test for memoryleak on chunksplits """ + stream = self._make_one() + + # Warm + offset from standard numbers + stream.begin_http_chunk_receiving() + stream.feed_data(b'Y' * 500) + stream.end_http_chunk_receiving() + await stream.read(500) + + before = get_memory_usage(stream) + for _ in range(300): + stream.begin_http_chunk_receiving() + stream.feed_data(b'X') + stream.end_http_chunk_receiving() + await stream.read(N) + after = get_memory_usage(stream) + + assert abs(after - before) == 0 + async def test_read_empty_chunks(self) -> None: """Test that feeding empty chunks does not break stream""" stream = self._make_one()