Skip to content

Commit

Permalink
do not pause transport during set_parser stage
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kim committed Jan 23, 2017
1 parent 516c55c commit a88e887
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
1 change: 1 addition & 0 deletions aiohttp/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def set_parser(self, parser, output=None):
# parser still require more data
self._parser = p
self._output = output
self._output.allow_pause = True

if self._eof:
self.unset_parser()
Expand Down
12 changes: 8 additions & 4 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ def __init__(self, stream, limit=DEFAULT_LIMIT, *args, **kwargs):

self._stream = stream
self._b_limit = limit * 2
self.allow_pause = False

# resume transport reading
if stream.paused:
Expand All @@ -543,6 +544,7 @@ def __init__(self, stream, limit=DEFAULT_LIMIT, *args, **kwargs):
pass
else:
self._stream.paused = False
self.allow_pause = True

def _check_buffer_size(self):
if self._stream.paused:
Expand All @@ -554,7 +556,7 @@ def _check_buffer_size(self):
else:
self._stream.paused = False
else:
if self._buffer_size > self._b_limit:
if self.allow_pause and self._buffer_size > self._b_limit:
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand All @@ -567,7 +569,7 @@ def feed_data(self, data, size=0):

super().feed_data(data)

if (not self._stream.paused and
if (self.allow_pause and not self._stream.paused and
not has_waiter and self._buffer_size > self._b_limit):
try:
self._stream.transport.pause_reading()
Expand Down Expand Up @@ -611,6 +613,7 @@ def __init__(self, stream, *, limit=DEFAULT_LIMIT, loop=None):

self._stream = stream
self._limit = limit * 2
self.allow_pause = False

# resume transport reading
if stream.paused:
Expand All @@ -620,13 +623,14 @@ def __init__(self, stream, *, limit=DEFAULT_LIMIT, loop=None):
pass
else:
self._stream.paused = False
self.allow_pause = True

def feed_data(self, data, size):
has_waiter = self._waiter is not None and not self._waiter.cancelled()

super().feed_data(data, size)

if (not self._stream.paused and
if (self.allow_pause and not self._stream.paused and
not has_waiter and self._size > self._limit):
try:
self._stream.transport.pause_reading()
Expand All @@ -648,7 +652,7 @@ def read(self):
else:
self._stream.paused = False
else:
if self._size > self._limit:
if self.allow_pause and self._size > self._limit:
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand Down
54 changes: 50 additions & 4 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ def setUp(self):
def tearDown(self):
self.loop.close()

def _make_one(self, *args, **kwargs):
return streams.FlowControlStreamReader(
def _make_one(self, allow_pause=True, *args, **kwargs):
out = streams.FlowControlStreamReader(
self.stream, limit=1, loop=self.loop, *args, **kwargs)
out.allow_pause = allow_pause
return out

def test_read(self):
r = self._make_one()
Expand All @@ -28,6 +30,24 @@ def test_read(self):
self.assertEqual(res, b'd')
self.assertTrue(self.transp.resume_reading.called)

def test_pause_on_read(self):
r = self._make_one()
r.feed_data(b'test', 4)
r._stream.paused = False

res = self.loop.run_until_complete(r.read(1))
self.assertEqual(res, b't')
self.assertTrue(self.transp.pause_reading.called)

def test_no_pause_on_read_not_allow_pause(self):
r = self._make_one(allow_pause=False)
r.feed_data(b'test', 4)

res = self.loop.run_until_complete(r.read(1))
self.assertEqual(res, b't')
self.assertFalse(self.transp.pause_reading.called)
self.assertFalse(self.transp.resume_reading.called)

def test_readline(self):
r = self._make_one()
r._stream.paused = True
Expand Down Expand Up @@ -58,6 +78,13 @@ def test_feed_data(self):
r.feed_data(b'datadata', 8)
self.assertTrue(self.transp.pause_reading.called)

def test_feed_data_no_allow_pause(self):
r = self._make_one()
r.allow_pause = False
r._stream.paused = False
r.feed_data(b'datadata', 8)
self.assertFalse(self.transp.pause_reading.called)

def test_read_nowait(self):
r = self._make_one()
r._stream.paused = False
Expand Down Expand Up @@ -246,6 +273,21 @@ def test_no_pause_on_read_no_transport(self):
self.assertIs(res, item)
self.assertFalse(self.stream.paused)

def test_no_pause_on_allow_pause(self):
item = object()

out = self._make_one()
out.allow_pause = False
out._buffer.append((item, 100))
out._buffer.append((object(), 100))
out._buffer.append((object(), 100))
out._size = 300
self.stream.paused = False

res = self.loop.run_until_complete(out.read())
self.assertIs(res, item)
self.assertFalse(self.stream.paused)


class TestFlowControlDataQueue(unittest.TestCase, FlowControlMixin):

Expand All @@ -258,8 +300,10 @@ def tearDown(self):
self.loop.close()

def _make_one(self, *args, **kwargs):
return streams.FlowControlDataQueue(
out = streams.FlowControlDataQueue(
self.stream, limit=1, loop=self.loop, *args, **kwargs)
out.allow_pause = True
return out


class TestFlowControlChunksQueue(unittest.TestCase, FlowControlMixin):
Expand All @@ -273,8 +317,10 @@ def tearDown(self):
self.loop.close()

def _make_one(self, *args, **kwargs):
return streams.FlowControlChunksQueue(
out = streams.FlowControlChunksQueue(
self.stream, limit=1, loop=self.loop, *args, **kwargs)
out.allow_pause = True
return out

def test_read_eof(self):
out = self._make_one()
Expand Down
1 change: 1 addition & 0 deletions tests/test_stream_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def LinesParser(out, buf):
def test_feed_parser(loop, lines_parser):
stream = parsers.StreamParser(loop=loop)
s = stream.set_parser(lines_parser)
assert s.allow_pause

stream.feed_data(b'line1')
stream.feed_data(b'\r\nline2\r\ndata')
Expand Down

0 comments on commit a88e887

Please sign in to comment.