Skip to content

Commit

Permalink
Test streams (#2720)
Browse files Browse the repository at this point in the history
* fixes #2717 Makes unittest example work (#2718)

* Convert tests

* Convert more

* Convert even more

* Convert more

* Convert more

* Increase timeout to fix flaky test
  • Loading branch information
asvetlov authored Feb 11, 2018
1 parent 9a7d853 commit 00b1f7f
Show file tree
Hide file tree
Showing 3 changed files with 553 additions and 617 deletions.
2 changes: 1 addition & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ def test_not_expired_ttl(self):
async def test_expired_ttl(self, loop):
dns_cache_table = _DNSCacheTable(ttl=0.01)
dns_cache_table.add('localhost', ['127.0.0.1'])
await asyncio.sleep(0.01, loop=loop)
await asyncio.sleep(0.02, loop=loop)
assert dns_cache_table.expired('localhost')

def test_next_addrs(self, dns_cache_table):
Expand Down
278 changes: 126 additions & 152 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
@@ -1,157 +1,131 @@
import asyncio
import unittest
from unittest import mock

import pytest

from aiohttp import streams


class TestFlowControlStreamReader(unittest.TestCase):

def setUp(self):
self.protocol = mock.Mock(_reading_paused=False)
self.transp = self.protocol.transport
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

def tearDown(self):
self.loop.close()

def _make_one(self, allow_pause=True, *args, **kwargs):
out = streams.StreamReader(
self.protocol, limit=1, loop=self.loop, *args, **kwargs)
out._allow_pause = allow_pause
return out

def test_read(self):
r = self._make_one()
r.feed_data(b'da', 2)
res = self.loop.run_until_complete(r.read(1))
self.assertEqual(res, b'd')
self.assertFalse(r._protocol.resume_reading.called)

def test_read_resume_paused(self):
r = self._make_one()
r.feed_data(b'test', 4)
r._protocol._reading_paused = True

res = self.loop.run_until_complete(r.read(1))
self.assertEqual(res, b't')
self.assertTrue(r._protocol.pause_reading.called)

def test_readline(self):
r = self._make_one()
r.feed_data(b'd\n', 5)
res = self.loop.run_until_complete(r.readline())
self.assertEqual(res, b'd\n')
self.assertFalse(r._protocol.resume_reading.called)

def test_readline_resume_paused(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'd\n', 5)
res = self.loop.run_until_complete(r.readline())
self.assertEqual(res, b'd\n')
self.assertTrue(r._protocol.resume_reading.called)

def test_readany(self):
r = self._make_one()
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readany())
self.assertEqual(res, b'data')
self.assertFalse(r._protocol.resume_reading.called)

def test_readany_resume_paused(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readany())
self.assertEqual(res, b'data')
self.assertTrue(r._protocol.resume_reading.called)

def test_readchunk(self):
r = self._make_one()
r.feed_data(b'data', 4)
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertFalse(r._protocol.resume_reading.called)

def test_readchunk_resume_paused(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'data', 4)
res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk())
self.assertEqual(res, b'data')
self.assertFalse(end_of_http_chunk)
self.assertTrue(r._protocol.resume_reading.called)

def test_readexactly(self):
r = self._make_one()
r.feed_data(b'data', 4)
res = self.loop.run_until_complete(r.readexactly(3))
self.assertEqual(res, b'dat')
self.assertFalse(r._protocol.resume_reading.called)

def test_feed_data(self):
r = self._make_one()
r._protocol._reading_paused = False
r.feed_data(b'datadata', 8)
self.assertTrue(r._protocol.pause_reading.called)

def test_read_nowait(self):
r = self._make_one()
r._protocol._reading_paused = True
r.feed_data(b'data1', 5)
r.feed_data(b'data2', 5)
r.feed_data(b'data3', 5)
res = self.loop.run_until_complete(r.read(5))
self.assertTrue(res == b'data1')
self.assertTrue(r._protocol.resume_reading.call_count == 0)

res = r.read_nowait(5)
self.assertTrue(res == b'data2')
self.assertTrue(r._protocol.resume_reading.call_count == 0)

res = r.read_nowait(5)
self.assertTrue(res == b'data3')
self.assertTrue(r._protocol.resume_reading.call_count == 1)

r._protocol._reading_paused = False
res = r.read_nowait(5)
self.assertTrue(res == b'')
self.assertTrue(r._protocol.resume_reading.call_count == 1)


class FlowControlMixin:

def test_feed_pause(self):
out = self._make_one()
out._protocol._reading_paused = False
out.feed_data(object(), 100)

self.assertTrue(out._protocol.pause_reading.called)

def test_resume_on_read(self):
out = self._make_one()
out.feed_data(object(), 100)

out._protocol._reading_paused = True
self.loop.run_until_complete(out.read())
self.assertTrue(out._protocol.resume_reading.called)


class TestFlowControlDataQueue(unittest.TestCase, FlowControlMixin):

def setUp(self):
self.protocol = mock.Mock()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

def tearDown(self):
self.loop.close()

def _make_one(self, *args, **kwargs):
out = streams.FlowControlDataQueue(
self.protocol, limit=1, loop=self.loop, *args, **kwargs)
out._allow_pause = True
return out
@pytest.fixture
def protocol():
return mock.Mock(_reading_paused=False)


@pytest.fixture
def stream(loop, protocol):
out = streams.StreamReader(protocol, limit=1, loop=loop)
out._allow_pause = True
return out


@pytest.fixture
def buffer(loop, protocol):
out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
out._allow_pause = True
return out


class TestFlowControlStreamReader:

async def test_read(self, stream):
stream.feed_data(b'da', 2)
res = await stream.read(1)
assert res == b'd'
assert not stream._protocol.resume_reading.called

async def test_read_resume_paused(self, stream):
stream.feed_data(b'test', 4)
stream._protocol._reading_paused = True

res = await stream.read(1)
assert res == b't'
assert stream._protocol.pause_reading.called

async def test_readline(self, stream):
stream.feed_data(b'd\n', 5)
res = await stream.readline()
assert res == b'd\n'
assert not stream._protocol.resume_reading.called

async def test_readline_resume_paused(self, stream):
stream._protocol._reading_paused = True
stream.feed_data(b'd\n', 5)
res = await stream.readline()
assert res == b'd\n'
assert stream._protocol.resume_reading.called

async def test_readany(self, stream):
stream.feed_data(b'data', 4)
res = await stream.readany()
assert res == b'data'
assert not stream._protocol.resume_reading.called

async def test_readany_resume_paused(self, stream):
stream._protocol._reading_paused = True
stream.feed_data(b'data', 4)
res = await stream.readany()
assert res == b'data'
assert stream._protocol.resume_reading.called

async def test_readchunk(self, stream):
stream.feed_data(b'data', 4)
res, end_of_http_chunk = await stream.readchunk()
assert res == b'data'
assert not end_of_http_chunk
assert not stream._protocol.resume_reading.called

async def test_readchunk_resume_paused(self, stream):
stream._protocol._reading_paused = True
stream.feed_data(b'data', 4)
res, end_of_http_chunk = await stream.readchunk()
assert res == b'data'
assert not end_of_http_chunk
assert stream._protocol.resume_reading.called

async def test_readexactly(self, stream):
stream.feed_data(b'data', 4)
res = await stream.readexactly(3)
assert res == b'dat'
assert not stream._protocol.resume_reading.called

async def test_feed_data(self, stream):
stream._protocol._reading_paused = False
stream.feed_data(b'datadata', 8)
assert stream._protocol.pause_reading.called

async def test_read_nowait(self, stream):
stream._protocol._reading_paused = True
stream.feed_data(b'data1', 5)
stream.feed_data(b'data2', 5)
stream.feed_data(b'data3', 5)
res = await stream.read(5)
assert res == b'data1'
assert stream._protocol.resume_reading.call_count == 0

res = stream.read_nowait(5)
assert res == b'data2'
assert stream._protocol.resume_reading.call_count == 0

res = stream.read_nowait(5)
assert res == b'data3'
assert stream._protocol.resume_reading.call_count == 1

stream._protocol._reading_paused = False
res = stream.read_nowait(5)
assert res == b''
assert stream._protocol.resume_reading.call_count == 1


class TestFlowControlDataQueue:

def test_feed_pause(self, buffer):
buffer._protocol._reading_paused = False
buffer.feed_data(object(), 100)

assert buffer._protocol.pause_reading.called

async def test_resume_on_read(self, buffer):
buffer.feed_data(object(), 100)

buffer._protocol._reading_paused = True
await buffer.read()
assert buffer._protocol.resume_reading.called
Loading

0 comments on commit 00b1f7f

Please sign in to comment.