Skip to content

Commit

Permalink
🐽 Fix memory leak in chunked streams handling code (#3631)
Browse files Browse the repository at this point in the history
  • Loading branch information
socketpair committed Mar 24, 2019
1 parent 7345ffb commit e44fcac
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES/3631.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cleanup per-chunk data in generic data read. Memory leak fixed.
5 changes: 5 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ def _read_nowait_chunk(self, n: int) -> bytes:
self._size -= len(data)
self._cursor += len(data)

chunk_splits = self._http_chunk_splits
# Prevent memory leak: drop useless chunk splits
while chunk_splits and chunk_splits[0] < self._cursor:
chunk_splits.pop(0)

if self._size < self._low_water and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
Expand Down
67 changes: 67 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Tests for streams.py"""

import abc
import asyncio
import gc
import re
import types
from collections import defaultdict
from itertools import groupby
from unittest import mock

import pytest
Expand Down Expand Up @@ -30,6 +35,36 @@ def protocol():
return mock.Mock(_reading_paused=False)


MEMLEAK_SKIP_TYPES = (
*(getattr(types, name) for name in types.__all__ if name.endswith('Type')),
mock.Mock,
abc.ABCMeta,
)

def get_memory_usage(obj):
objs=[obj]
# Memory leak may be caused by leaked links to same objects.
# Without link counting, [1,2,3] may be tracked the same as [1,2,3,3,3,3,3,3]
known = defaultdict(int)
known[id(obj)] += 1

while objs:
refs = gc.get_referents(*objs)
objs = []
for obj in refs:
if isinstance(obj, MEMLEAK_SKIP_TYPES):
continue
i = id(obj)
known[i] += 1
if known[i] == 1:
objs.append(obj)

# Make list of objects uniq
objs.sort(key=id)
objs = [next(g) for (i, g) in groupby(objs, id)]

return sum(cnt for cnt in known.values())

class TestStreamReader:

DATA = b'line1\nline2\nline3\n'
Expand Down Expand Up @@ -739,6 +774,9 @@ async def test_readchunk_with_other_read_calls(self) -> None:
stream.begin_http_chunk_receiving()
stream.feed_data(b'part2')
stream.end_http_chunk_receiving()
stream.begin_http_chunk_receiving()
stream.feed_data(b'part3')
stream.end_http_chunk_receiving()

data = await stream.read(7)
assert b'part1pa' == data
Expand All @@ -747,11 +785,40 @@ async def test_readchunk_with_other_read_calls(self) -> None:
assert b'rt2' == data
assert end_of_chunk

# Corner case between read/readchunk
data = await stream.read(5)
assert b'part3' == data

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert end_of_chunk

stream.feed_eof()

data, end_of_chunk = await stream.readchunk()
assert b'' == data
assert not end_of_chunk

async def test_chunksplits_memory_leak(self) -> None:
""" Test for memoryleak on chunksplits """
stream = self._make_one()

# Warm + offset from standard numbers
stream.begin_http_chunk_receiving()
stream.feed_data(b'Y' * 500)
stream.end_http_chunk_receiving()
await stream.read(500)

before = get_memory_usage(stream)
for _ in range(300):
stream.begin_http_chunk_receiving()
stream.feed_data(b'X')
stream.end_http_chunk_receiving()
await stream.read(N)
after = get_memory_usage(stream)

assert abs(after - before) == 0

async def test_read_empty_chunks(self) -> None:
"""Test that feeding empty chunks does not break stream"""
stream = self._make_one()
Expand Down

0 comments on commit e44fcac

Please sign in to comment.