From 366f07262dd1490ed36fb5d4941b5e94617d7706 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 15:42:00 -0500 Subject: [PATCH 01/14] Bump pgproto to the latest --- asyncpg/pgproto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncpg/pgproto b/asyncpg/pgproto index c6491237..4de72c5d 160000 --- a/asyncpg/pgproto +++ b/asyncpg/pgproto @@ -1 +1 @@ -Subproject commit c64912370b8532df7a561389f139f838b3681670 +Subproject commit 4de72c5de696e5892a4209d94b9e3af0043ca7d5 From 13adb88e6c69ead0b718aa99090a6e62edb49db6 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 15:42:15 -0500 Subject: [PATCH 02/14] Bump Cython & uvloop deps --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 6bbb6ebf..c34c7565 100644 --- a/setup.py +++ b/setup.py @@ -27,12 +27,12 @@ from setuptools.command import sdist as setuptools_sdist -CYTHON_DEPENDENCY = 'Cython==0.29' +CYTHON_DEPENDENCY = 'Cython==0.29.14' # Minimal dependencies required to test asyncpg. TEST_DEPENDENCIES = [ 'flake8~=3.5.0', - 'uvloop~=0.12.0;platform_system!="Windows"', + 'uvloop~=0.14.0;platform_system!="Windows"', ] # Dependencies required to build documentation. From 9754a45fc33486281843b6678b11c751dc8c5bc7 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 15:49:33 -0500 Subject: [PATCH 03/14] Handle asyncio.CancelledError (now a BaseException) --- asyncpg/connect_utils.py | 4 ++-- asyncpg/connection.py | 4 ++-- asyncpg/pool.py | 10 +++++----- asyncpg/protocol/protocol.pyx | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 433ad3da..dc05cf37 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -541,7 +541,7 @@ async def _connect_addr(*, addr, loop, timeout, params, config, if timeout <= 0: raise asyncio.TimeoutError await asyncio.wait_for(connected, loop=loop, timeout=timeout) - except Exception: + except (Exception, asyncio.CancelledError): tr.close() raise @@ -614,7 +614,7 @@ async def _negotiate_ssl_connection(host, port, conn_factory, *, loop, ssl, try: return await conn_factory(sock=sock) # Must come after tr.close() - except Exception: + except (Exception, asyncio.CancelledError): sock.close() raise diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 8e841871..11e639d6 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1080,7 +1080,7 @@ async def close(self, *, timeout=None): try: if not self.is_closed(): await self._protocol.close(timeout) - except Exception: + except (Exception, asyncio.CancelledError): # If we fail to close gracefully, abort the connection. self._abort() raise @@ -1213,7 +1213,7 @@ async def _cancel(self, waiter): # the CancelledError, and don't want the loop to warn about # an unretrieved exception. pass - except Exception as ex: + except (Exception, asyncio.CancelledError) as ex: if not waiter.done(): waiter.set_exception(ex) finally: diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 64f4071e..b9576a4d 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -146,7 +146,7 @@ async def acquire(self) -> PoolConnectionProxy: if self._setup is not None: try: await self._setup(proxy) - except Exception as ex: + except (Exception, asyncio.CancelledError) as ex: # If a user-defined `setup` function fails, we don't # know if the connection is safe for re-use, hence # we close it. A new connection will be created @@ -204,7 +204,7 @@ async def release(self, timeout): budget -= time.monotonic() - started await self._con.reset(timeout=budget) - except Exception as ex: + except (Exception, asyncio.CancelledError) as ex: # If the `reset` call failed, terminate the connection. # A new one will be created when `acquire` is called # again. @@ -480,7 +480,7 @@ async def _get_new_connection(self): if self._init is not None: try: await self._init(con) - except Exception as ex: + except (Exception, asyncio.CancelledError) as ex: # If a user-defined `init` function fails, we don't # know if the connection is safe for re-use, hence # we close it. A new connection will be created @@ -587,7 +587,7 @@ async def _acquire_impl(): ch = await self._queue.get() # type: PoolConnectionHolder try: proxy = await ch.acquire() # type: PoolConnectionProxy - except Exception: + except (Exception, asyncio.CancelledError): self._queue.put_nowait(ch) raise else: @@ -679,7 +679,7 @@ async def close(self): ch.close() for ch in self._holders] await asyncio.gather(*close_coros, loop=self._loop) - except Exception: + except (Exception, asyncio.CancelledError): self.terminate() raise diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index 8ae000ce..b54b6004 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -350,7 +350,7 @@ cdef class BaseProtocol(CoreProtocol): sink(buffer), timeout=timer.get_remaining_budget(), loop=self.loop) - except Exception as ex: + except (Exception, asyncio.CancelledError) as ex: # Abort the COPY operation on any error in # output sink. self._request_cancel() @@ -476,7 +476,7 @@ cdef class BaseProtocol(CoreProtocol): else: raise apg_exc.InternalClientError('TimoutError was not raised') - except Exception as e: + except (Exception, asyncio.CancelledError) as e: self._write_copy_fail_msg(str(e)) self._request_cancel() # Make asyncio shut up about unretrieved QueryCanceledError From 786fbeddcb4ce19787ac953a65c7ec51bca8785f Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:02:54 -0500 Subject: [PATCH 04/14] Stop passing `loop=` parameter (deprecated in asyncio 3.8) --- asyncpg/_testbase/__init__.py | 2 +- asyncpg/_testbase/fuzzer.py | 46 ++++++++++------------ asyncpg/connect_utils.py | 12 +++--- asyncpg/pool.py | 14 +++---- asyncpg/protocol/protocol.pyx | 8 ++-- tests/test_adversity.py | 2 +- tests/test_cancellation.py | 8 ++-- tests/test_connect.py | 9 ++--- tests/test_copy.py | 16 ++++---- tests/test_execute.py | 4 +- tests/test_introspection.py | 4 +- tests/test_listeners.py | 23 +++++------ tests/test_pool.py | 74 +++++++++++++++++------------------ tests/test_prepare.py | 14 +++---- tests/test_test.py | 2 +- tests/test_timeout.py | 4 +- 16 files changed, 115 insertions(+), 127 deletions(-) diff --git a/asyncpg/_testbase/__init__.py b/asyncpg/_testbase/__init__.py index 2aecb0fe..baf55c1b 100644 --- a/asyncpg/_testbase/__init__.py +++ b/asyncpg/_testbase/__init__.py @@ -80,7 +80,7 @@ def wrapper(self, *args, __meth__=meth, **kwargs): coro = __meth__(self, *args, **kwargs) timeout = getattr(__meth__, '__timeout__', mcls.TEST_TIMEOUT) if timeout: - coro = asyncio.wait_for(coro, timeout, loop=self.loop) + coro = asyncio.wait_for(coro, timeout) try: self.loop.run_until_complete(coro) except asyncio.TimeoutError: diff --git a/asyncpg/_testbase/fuzzer.py b/asyncpg/_testbase/fuzzer.py index 649e5770..88f6e5c1 100644 --- a/asyncpg/_testbase/fuzzer.py +++ b/asyncpg/_testbase/fuzzer.py @@ -36,15 +36,13 @@ def __init__(self, *, listening_addr: str='127.0.0.1', self.listen_task = None async def _wait(self, work): - work_task = asyncio.ensure_future(work, loop=self.loop) - stop_event_task = asyncio.ensure_future(self.stop_event.wait(), - loop=self.loop) + work_task = asyncio.ensure_future(work) + stop_event_task = asyncio.ensure_future(self.stop_event.wait()) try: await asyncio.wait( [work_task, stop_event_task], - return_when=asyncio.FIRST_COMPLETED, - loop=self.loop) + return_when=asyncio.FIRST_COMPLETED) if self.stop_event.is_set(): raise StopServer() @@ -58,7 +56,8 @@ async def _wait(self, work): def start(self): started = threading.Event() - self.thread = threading.Thread(target=self._start, args=(started,)) + self.thread = threading.Thread( + target=self._start_thread, args=(started,)) self.thread.start() if not started.wait(timeout=2): raise RuntimeError('fuzzer proxy failed to start') @@ -70,13 +69,14 @@ def stop(self): def _stop(self): self.stop_event.set() - def _start(self, started_event): + def _start_thread(self, started_event): self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) - self.connectivity = asyncio.Event(loop=self.loop) + self.connectivity = asyncio.Event() self.connectivity.set() - self.connectivity_loss = asyncio.Event(loop=self.loop) - self.stop_event = asyncio.Event(loop=self.loop) + self.connectivity_loss = asyncio.Event() + self.stop_event = asyncio.Event() if self.listening_port is None: self.listening_port = cluster.find_available_port() @@ -92,7 +92,7 @@ def _start(self, started_event): self.loop.close() async def _main(self, started_event): - self.listen_task = asyncio.ensure_future(self.listen(), loop=self.loop) + self.listen_task = asyncio.ensure_future(self.listen()) # Notify the main thread that we are ready to go. started_event.set() try: @@ -100,7 +100,7 @@ async def _main(self, started_event): finally: for c in list(self.connections): c.close() - await asyncio.sleep(0.01, loop=self.loop) + await asyncio.sleep(0.01) if hasattr(self.loop, 'remove_reader'): self.loop.remove_reader(self.sock.fileno()) self.sock.close() @@ -176,15 +176,15 @@ def close(self): async def handle(self): self.proxy_to_backend_task = asyncio.ensure_future( - self.proxy_to_backend(), loop=self.loop) + self.proxy_to_backend()) self.proxy_from_backend_task = asyncio.ensure_future( - self.proxy_from_backend(), loop=self.loop) + self.proxy_from_backend()) try: await asyncio.wait( [self.proxy_to_backend_task, self.proxy_from_backend_task], - loop=self.loop, return_when=asyncio.FIRST_COMPLETED) + return_when=asyncio.FIRST_COMPLETED) finally: # Asyncio fails to properly remove the readers and writers @@ -201,17 +201,14 @@ async def handle(self): async def _read(self, sock, n): read_task = asyncio.ensure_future( - self.loop.sock_recv(sock, n), - loop=self.loop) + self.loop.sock_recv(sock, n)) conn_event_task = asyncio.ensure_future( - self.connectivity_loss.wait(), - loop=self.loop) + self.connectivity_loss.wait()) try: await asyncio.wait( [read_task, conn_event_task], - return_when=asyncio.FIRST_COMPLETED, - loop=self.loop) + return_when=asyncio.FIRST_COMPLETED) if self.connectivity_loss.is_set(): return None @@ -225,15 +222,14 @@ async def _read(self, sock, n): async def _write(self, sock, data): write_task = asyncio.ensure_future( - self.loop.sock_sendall(sock, data), loop=self.loop) + self.loop.sock_sendall(sock, data)) conn_event_task = asyncio.ensure_future( - self.connectivity_loss.wait(), loop=self.loop) + self.connectivity_loss.wait()) try: await asyncio.wait( [write_task, conn_event_task], - return_when=asyncio.FIRST_COMPLETED, - loop=self.loop) + return_when=asyncio.FIRST_COMPLETED) if self.connectivity_loss.is_set(): return None diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index dc05cf37..665fa06b 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -531,7 +531,7 @@ async def _connect_addr(*, addr, loop, timeout, params, config, before = time.monotonic() try: tr, pr = await asyncio.wait_for( - connector, timeout=timeout, loop=loop) + connector, timeout=timeout) except asyncio.CancelledError: connector.add_done_callback(_close_leaked_connection) raise @@ -540,7 +540,7 @@ async def _connect_addr(*, addr, loop, timeout, params, config, try: if timeout <= 0: raise asyncio.TimeoutError - await asyncio.wait_for(connected, loop=loop, timeout=timeout) + await asyncio.wait_for(connected, timeout=timeout) except (Exception, asyncio.CancelledError): tr.close() raise @@ -581,7 +581,7 @@ async def _negotiate_ssl_connection(host, port, conn_factory, *, loop, ssl, # accept SSLRequests. If the SSLRequest is accepted but either the SSL # negotiation fails or the PostgreSQL user isn't permitted to use SSL, # there's nothing that would attempt to reconnect with a non-SSL socket. - reader, writer = await asyncio.open_connection(host, port, loop=loop) + reader, writer = await asyncio.open_connection(host, port) tr = writer.transport try: @@ -632,18 +632,18 @@ async def _create_ssl_connection(protocol_factory, host, port, *, async def _open_connection(*, loop, addr, params: _ConnectionParameters): if isinstance(addr, str): - r, w = await asyncio.open_unix_connection(addr, loop=loop) + r, w = await asyncio.open_unix_connection(addr) else: if params.ssl: r, w = await _negotiate_ssl_connection( *addr, - functools.partial(asyncio.open_connection, loop=loop), + asyncio.open_connection, loop=loop, ssl=params.ssl, server_hostname=addr[0], ssl_is_advisory=params.ssl_is_advisory) else: - r, w = await asyncio.open_connection(*addr, loop=loop) + r, w = await asyncio.open_connection(*addr) _set_nodelay(_get_socket(w.transport)) return r, w diff --git a/asyncpg/pool.py b/asyncpg/pool.py index b9576a4d..7b477e4d 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -199,7 +199,7 @@ async def release(self, timeout): started = time.monotonic() await asyncio.wait_for( self._con._protocol._wait_for_cancellation(), - budget, loop=self._pool._loop) + budget) if budget is not None: budget -= time.monotonic() - started @@ -362,7 +362,7 @@ def __init__(self, *connect_args, self._holders = [] self._initialized = False self._initializing = False - self._queue = asyncio.LifoQueue(maxsize=self._maxsize, loop=self._loop) + self._queue = asyncio.LifoQueue(maxsize=self._maxsize) self._working_addr = None self._working_config = None @@ -424,7 +424,7 @@ async def _initialize(self): break connect_tasks.append(ch.connect()) - await asyncio.gather(*connect_tasks, loop=self._loop) + await asyncio.gather(*connect_tasks) def set_connect_args(self, dsn=None, **connect_kwargs): r"""Set the new connection arguments for this pool. @@ -604,7 +604,7 @@ async def _acquire_impl(): return await _acquire_impl() else: return await asyncio.wait_for( - _acquire_impl(), timeout=timeout, loop=self._loop) + _acquire_impl(), timeout=timeout) async def release(self, connection, *, timeout=None): """Release a database connection back to the pool. @@ -642,7 +642,7 @@ async def release(self, connection, *, timeout=None): # Use asyncio.shield() to guarantee that task cancellation # does not prevent the connection from being returned to the # pool properly. - return await asyncio.shield(ch.release(timeout), loop=self._loop) + return await asyncio.shield(ch.release(timeout)) async def close(self): """Attempt to gracefully close all connections in the pool. @@ -673,11 +673,11 @@ async def close(self): release_coros = [ ch.wait_until_released() for ch in self._holders] - await asyncio.gather(*release_coros, loop=self._loop) + await asyncio.gather(*release_coros) close_coros = [ ch.close() for ch in self._holders] - await asyncio.gather(*close_coros, loop=self._loop) + await asyncio.gather(*close_coros) except (Exception, asyncio.CancelledError): self.terminate() diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index b54b6004..857fb4cc 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -93,7 +93,7 @@ cdef class BaseProtocol(CoreProtocol): self.closing = False self.is_reading = True - self.writing_allowed = asyncio.Event(loop=self.loop) + self.writing_allowed = asyncio.Event() self.writing_allowed.set() self.timeout_handle = None @@ -348,8 +348,7 @@ cdef class BaseProtocol(CoreProtocol): with timer: await asyncio.wait_for( sink(buffer), - timeout=timer.get_remaining_budget(), - loop=self.loop) + timeout=timer.get_remaining_budget()) except (Exception, asyncio.CancelledError) as ex: # Abort the COPY operation on any error in # output sink. @@ -456,8 +455,7 @@ cdef class BaseProtocol(CoreProtocol): with timer: chunk = await asyncio.wait_for( iterator.__anext__(), - timeout=timer.get_remaining_budget(), - loop=self.loop) + timeout=timer.get_remaining_budget()) self._write_copy_data_msg(chunk) except builtins.StopAsyncIteration: pass diff --git a/tests/test_adversity.py b/tests/test_adversity.py index a0d153eb..54b4f481 100644 --- a/tests/test_adversity.py +++ b/tests/test_adversity.py @@ -62,6 +62,6 @@ def kill_connectivity(): workers = [worker(pool) for _ in range(concurrency)] self.loop.call_later(1, kill_connectivity) await asyncio.gather( - *workers, loop=self.loop, return_exceptions=True) + *workers, return_exceptions=True) finally: pool.terminate() diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index bc21309d..7bad9de1 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -55,7 +55,7 @@ async def test8(): with self.subTest(testfunc=test), self.assertRunUnder(1): st = await self.con.prepare('SELECT pg_sleep(20)') task = self.loop.create_task(st.fetch()) - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) task.cancel() with self.assertRaises(asyncio.CancelledError): @@ -67,7 +67,7 @@ async def test8(): async def test_cancellation_02(self): st = await self.con.prepare('SELECT 1') task = self.loop.create_task(st.fetch()) - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) task.cancel() self.assertEqual(await task, [(1,)]) @@ -76,7 +76,7 @@ async def test_cancellation_03(self): async with self.con.transaction(): task = self.loop.create_task( self.con.fetch('SELECT pg_sleep(20)')) - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) task.cancel() with self.assertRaises(asyncio.CancelledError): @@ -90,7 +90,7 @@ async def test_cancellation_03(self): async def test_cancellation_04(self): await self.con.fetchval('SELECT pg_sleep(0)') - waiter = asyncio.Future(loop=self.loop) + waiter = asyncio.Future() self.con._cancel_current_command(waiter) await waiter self.assertEqual(await self.con.fetchval('SELECT 42'), 42) diff --git a/tests/test_connect.py b/tests/test_connect.py index 5830b38e..f0767827 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -934,14 +934,14 @@ def test_connect_pgpass_inaccessible_directory(self): async def test_connect_args_validation(self): for val in {-1, 'a', True, False, 0}: with self.assertRaisesRegex(ValueError, 'greater than 0'): - await asyncpg.connect(command_timeout=val, loop=self.loop) + await asyncpg.connect(command_timeout=val) for arg in {'max_cacheable_statement_size', 'max_cached_statement_lifetime', 'statement_cache_size'}: for val in {None, -1, True, False}: with self.assertRaisesRegex(ValueError, 'greater or equal'): - await asyncpg.connect(**{arg: val}, loop=self.loop) + await asyncpg.connect(**{arg: val}) class TestConnection(tb.ConnectedTestCase): @@ -1040,8 +1040,7 @@ async def test_connection_implicit_host(self): con = await asyncpg.connect( port=conn_spec.get('port'), database=conn_spec.get('database'), - user=conn_spec.get('user'), - loop=self.loop) + user=conn_spec.get('user')) await con.close() @@ -1207,7 +1206,7 @@ async def worker(): self.assertEqual(await con.fetchval('SELECT 43'), 43) tasks = [worker() for _ in range(100)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) await pool.close() diff --git a/tests/test_copy.py b/tests/test_copy.py index 6bedb23c..257cc79c 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -233,7 +233,7 @@ async def test_copy_from_query_to_sink(self): async def writer(data): # Sleeping here to simulate slow output sink to test # backpressure. - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) f.write(data) await self.con.copy_from_query(''' @@ -259,7 +259,7 @@ async def test_copy_from_query_cancellation_explicit(self): async def writer(data): # Sleeping here to simulate slow output sink to test # backpressure. - await asyncio.sleep(0.5, loop=self.loop) + await asyncio.sleep(0.5) coro = self.con.copy_from_query(''' SELECT @@ -269,7 +269,7 @@ async def writer(data): ''', output=writer) task = self.loop.create_task(coro) - await asyncio.sleep(0.7, loop=self.loop) + await asyncio.sleep(0.7) task.cancel() with self.assertRaises(asyncio.CancelledError): @@ -279,7 +279,7 @@ async def writer(data): async def test_copy_from_query_cancellation_on_sink_error(self): async def writer(data): - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) raise RuntimeError('failure') coro = self.con.copy_from_query(''' @@ -308,7 +308,7 @@ async def writer(data): ''', output=writer) task = self.loop.create_task(coro) - await asyncio.sleep(0.7, loop=self.loop) + await asyncio.sleep(0.7) task.cancel() with self.assertRaises(asyncio.CancelledError): @@ -318,7 +318,7 @@ async def writer(data): async def test_copy_from_query_timeout_1(self): async def writer(data): - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) coro = self.con.copy_from_query(''' SELECT @@ -337,7 +337,7 @@ async def writer(data): async def test_copy_from_query_timeout_2(self): async def writer(data): try: - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) except asyncio.TimeoutError: raise else: @@ -569,7 +569,7 @@ def __aiter__(self): async def __anext__(self): self.rowcount += 1 - await asyncio.sleep(60, loop=self.loop) + await asyncio.sleep(60) return b'a1' * 50 + b'\t' + b'b1' * 50 + b'\n' with self.assertRaises(asyncio.TimeoutError): diff --git a/tests/test_execute.py b/tests/test_execute.py index ccde0993..5ecc100f 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -72,7 +72,7 @@ async def test_execute_script_interrupted_close(self): fut = self.loop.create_task( self.con.execute('''SELECT pg_sleep(10)''')) - await asyncio.sleep(0.2, loop=self.loop) + await asyncio.sleep(0.2) self.assertFalse(self.con.is_closed()) await self.con.close() @@ -85,7 +85,7 @@ async def test_execute_script_interrupted_terminate(self): fut = self.loop.create_task( self.con.execute('''SELECT pg_sleep(10)''')) - await asyncio.sleep(0.2, loop=self.loop) + await asyncio.sleep(0.2) self.assertFalse(self.con.is_closed()) self.con.terminate() diff --git a/tests/test_introspection.py b/tests/test_introspection.py index 92ebc7a4..eb3258f9 100644 --- a/tests/test_introspection.py +++ b/tests/test_introspection.py @@ -21,7 +21,7 @@ class SlowIntrospectionConnection(apg_con.Connection): async def _introspect_types(self, *args, **kwargs): self.introspect_count += 1 - await asyncio.sleep(0.4, loop=self._loop) + await asyncio.sleep(0.4) return await super()._introspect_types(*args, **kwargs) @@ -154,7 +154,7 @@ async def test_introspection_retries_after_cache_bust(self): # slow_intro_conn cache is now populated with intro_1_t async def wait_and_drop(): - await asyncio.sleep(0.1, loop=self.loop) + await asyncio.sleep(0.1) await slow_intro_conn.reload_schema_state() # Now, in parallel, run another query that diff --git a/tests/test_listeners.py b/tests/test_listeners.py index d4855c13..4879cd88 100644 --- a/tests/test_listeners.py +++ b/tests/test_listeners.py @@ -17,8 +17,8 @@ async def test_listen_01(self): async with self.create_pool(database='postgres') as pool: async with pool.acquire() as con: - q1 = asyncio.Queue(loop=self.loop) - q2 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() + q2 = asyncio.Queue() def listener1(*args): q1.put_nowait(args) @@ -46,25 +46,22 @@ def listener2(*args): await q1.get(), (con, con.get_server_pid(), 'test', 'aaaa')) with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(q2.get(), - timeout=0.05, loop=self.loop) + await asyncio.wait_for(q2.get(), timeout=0.05) await con.reset() await con.remove_listener('test', listener1) await con.execute("NOTIFY test, 'aaaa'") with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(q1.get(), - timeout=0.05, loop=self.loop) + await asyncio.wait_for(q1.get(), timeout=0.05) with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(q2.get(), - timeout=0.05, loop=self.loop) + await asyncio.wait_for(q2.get(), timeout=0.05) async def test_listen_02(self): async with self.create_pool(database='postgres') as pool: async with pool.acquire() as con1, pool.acquire() as con2: - q1 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() def listener1(*args): q1.put_nowait(args) @@ -82,7 +79,7 @@ async def test_listen_notletters(self): async with self.create_pool(database='postgres') as pool: async with pool.acquire() as con1, pool.acquire() as con2: - q1 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() def listener1(*args): q1.put_nowait(args) @@ -115,7 +112,7 @@ class TestLogListeners(tb.ConnectedTestCase): 'client_min_messages': 'notice' }) async def test_log_listener_01(self): - q1 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() def notice_callb(con, message): # Message fields depend on PG version, hide some values. @@ -194,7 +191,7 @@ async def raise_warning(): 'client_min_messages': 'notice' }) async def test_log_listener_02(self): - q1 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() cur_id = None @@ -235,7 +232,7 @@ def notice_callb(con, message): 'client_min_messages': 'notice' }) async def test_log_listener_03(self): - q1 = asyncio.Queue(loop=self.loop) + q1 = asyncio.Queue() async def raise_message(level, code): await self.con.execute(""" diff --git a/tests/test_pool.py b/tests/test_pool.py index b1894f3a..e51923e4 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -34,14 +34,14 @@ class SlowResetConnection(pg_connection.Connection): """Connection class to simulate races with Connection.reset().""" async def reset(self, *, timeout=None): - await asyncio.sleep(0.2, loop=self._loop) + await asyncio.sleep(0.2) return await super().reset(timeout=timeout) class SlowCancelConnection(pg_connection.Connection): """Connection class to simulate races with Connection._cancel().""" async def _cancel(self, waiter): - await asyncio.sleep(0.2, loop=self._loop) + await asyncio.sleep(0.2) return await super()._cancel(waiter) @@ -59,7 +59,7 @@ async def worker(): await pool.release(con) tasks = [worker() for _ in range(n)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) await pool.close() async def test_pool_02(self): @@ -74,7 +74,7 @@ async def worker(): await pool.release(con) tasks = [worker() for _ in range(n)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) async def test_pool_03(self): pool = await self.create_pool(database='postgres', @@ -121,11 +121,11 @@ async def worker(): self.assertEqual(await con.fetchval('SELECT 1'), 1) tasks = [worker() for _ in range(n)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) await pool.close() async def test_pool_06(self): - fut = asyncio.Future(loop=self.loop) + fut = asyncio.Future() async def setup(con): fut.set_result(con) @@ -159,8 +159,7 @@ async def user(pool): min_size=2, max_size=5, init=init, setup=setup) as pool: - users = asyncio.gather(*[user(pool) for _ in range(10)], - loop=self.loop) + users = asyncio.gather(*[user(pool) for _ in range(10)]) await users self.assertEqual(len(cons), 5) @@ -370,7 +369,7 @@ async def worker(): await pool.release(con) tasks = [worker() for _ in range(5)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) await pool.close() finally: @@ -393,11 +392,11 @@ async def worker(): task = self.loop.create_task(worker()) # Let the worker() run. - await asyncio.sleep(0.1, loop=self.loop) + await asyncio.sleep(0.1) # Cancel the worker. task.cancel() # Wait to make sure the cleanup has completed. - await asyncio.sleep(0.4, loop=self.loop) + await asyncio.sleep(0.4) # Check that the connection has been returned to the pool. self.assertEqual(pool._queue.qsize(), 1) @@ -414,11 +413,11 @@ async def worker(): task = self.loop.create_task(worker()) # Let the worker() run. - await asyncio.sleep(0.1, loop=self.loop) + await asyncio.sleep(0.1) # Cancel the worker. task.cancel() # Wait to make sure the cleanup has completed. - await asyncio.sleep(0.5, loop=self.loop) + await asyncio.sleep(0.5) # Check that the connection has been returned to the pool. self.assertEqual(pool._queue.qsize(), 1) @@ -431,8 +430,8 @@ async def sleep_and_release(): async with pool.acquire() as con: await con.execute('SELECT pg_sleep(1)') - asyncio.ensure_future(sleep_and_release(), loop=self.loop) - await asyncio.sleep(0.5, loop=self.loop) + asyncio.ensure_future(sleep_and_release()) + await asyncio.sleep(0.5) async with pool.acquire() as con: await con.fetchval('SELECT 1') @@ -462,8 +461,7 @@ async def test(pool): max_queries=1, connection_class=MyConnection, statement_cache_size=3) as pool: - await asyncio.gather(*[test(pool) for _ in range(N)], - loop=self.loop) + await asyncio.gather(*[test(pool) for _ in range(N)]) self.assertEqual(len(cons), N) @@ -499,34 +497,34 @@ async def get_xact_id(con): async def test_pool_connection_methods(self): async def test_fetch(pool): i = random.randint(0, 20) - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) r = await pool.fetch('SELECT {}::int'.format(i)) self.assertEqual(r, [(i,)]) return 1 async def test_fetchrow(pool): i = random.randint(0, 20) - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) r = await pool.fetchrow('SELECT {}::int'.format(i)) self.assertEqual(r, (i,)) return 1 async def test_fetchval(pool): i = random.randint(0, 20) - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) r = await pool.fetchval('SELECT {}::int'.format(i)) self.assertEqual(r, i) return 1 async def test_execute(pool): - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) r = await pool.execute('SELECT generate_series(0, 10)') self.assertEqual(r, 'SELECT {}'.format(11)) return 1 async def test_execute_with_arg(pool): i = random.randint(0, 20) - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) r = await pool.execute('SELECT generate_series(0, $1)', i) self.assertEqual(r, 'SELECT {}'.format(i + 1)) return 1 @@ -536,7 +534,7 @@ async def run(N, meth): min_size=5, max_size=10) as pool: coros = [meth(pool) for _ in range(N)] - res = await asyncio.gather(*coros, loop=self.loop) + res = await asyncio.gather(*coros) self.assertEqual(res, [1] * N) methods = [test_fetch, test_fetchrow, test_fetchval, @@ -549,7 +547,7 @@ async def run(N, meth): async def test_pool_connection_execute_many(self): async def worker(pool): - await asyncio.sleep(random.random() / 100, loop=self.loop) + await asyncio.sleep(random.random() / 100) await pool.executemany(''' INSERT INTO exmany VALUES($1, $2) ''', [ @@ -566,7 +564,7 @@ async def worker(pool): try: coros = [worker(pool) for _ in range(N)] - res = await asyncio.gather(*coros, loop=self.loop) + res = await asyncio.gather(*coros) self.assertEqual(res, [1] * N) n_rows = await pool.fetchval('SELECT count(*) FROM exmany') @@ -609,7 +607,7 @@ async def test_pool_max_inactive_time_02(self): 'SELECT 1') self.assertIs(pool._holders[0]._con, con) - await asyncio.sleep(1, loop=self.loop) + await asyncio.sleep(1) self.assertIs(pool._holders[0]._con, None) self.assertEqual( @@ -628,7 +626,7 @@ async def test_pool_max_inactive_time_03(self): con = pool._holders[0]._con await pool.execute('SELECT pg_sleep(0.5)') - await asyncio.sleep(0.6, loop=self.loop) + await asyncio.sleep(0.6) self.assertIs(pool._holders[0]._con, con) @@ -645,7 +643,7 @@ async def test_pool_max_inactive_time_04(self): async def worker(pool): nonlocal N - await asyncio.sleep(random.random() / 10 + 0.1, loop=self.loop) + await asyncio.sleep(random.random() / 10 + 0.1) async with pool.acquire() as con: if random.random() > 0.5: await con.execute('SELECT pg_sleep({:.2f})'.format( @@ -664,7 +662,7 @@ async def worker(pool): max_inactive_connection_lifetime=0.1) as pool: workers = [worker(pool) for _ in range(50)] - await asyncio.gather(*workers, loop=self.loop) + await asyncio.gather(*workers) self.assertGreaterEqual(N, 50) @@ -679,7 +677,7 @@ async def test_pool_max_inactive_time_05(self): self.assertIsNotNone(pool._holders[1]._con) await pool.execute('SELECT pg_sleep(0.3)') - await asyncio.sleep(0.3, loop=self.loop) + await asyncio.sleep(0.3) self.assertIs(pool._holders[0]._con, None) # The connection in the second holder was never used, @@ -788,7 +786,7 @@ async def worker(): async with pool.acquire() as connection: async with connection.transaction(): flag.set_result(True) - await asyncio.sleep(0.1, loop=self.loop) + await asyncio.sleep(0.1) conn_released = True @@ -807,7 +805,7 @@ async def test_pool_close_timeout(self): async def worker(): async with pool.acquire(): flag.set_result(True) - await asyncio.sleep(0.5, loop=self.loop) + await asyncio.sleep(0.5) task = self.loop.create_task(worker()) @@ -873,8 +871,8 @@ async def test_pool_set_connection_args(self): async def test_pool_init_race(self): pool = self.create_pool(database='postgres', min_size=1, max_size=1) - t1 = asyncio.ensure_future(pool, loop=self.loop) - t2 = asyncio.ensure_future(pool, loop=self.loop) + t1 = asyncio.ensure_future(pool) + t2 = asyncio.ensure_future(pool) await t1 with self.assertRaisesRegex( @@ -887,8 +885,8 @@ async def test_pool_init_race(self): async def test_pool_init_and_use_race(self): pool = self.create_pool(database='postgres', min_size=1, max_size=1) - pool_task = asyncio.ensure_future(pool, loop=self.loop) - await asyncio.sleep(0, loop=self.loop) + pool_task = asyncio.ensure_future(pool) + await asyncio.sleep(0) with self.assertRaisesRegex( asyncpg.InterfaceError, @@ -908,7 +906,7 @@ async def worker(): pool_backend_pid = await conn.fetchval( 'SELECT pg_backend_pid()') backend_pid_fut.set_result(pool_backend_pid) - await asyncio.sleep(0.2, loop=self.loop) + await asyncio.sleep(0.2) task = self.loop.create_task(worker()) try: @@ -990,7 +988,7 @@ async def worker(): await pool.release(con) tasks = [worker() for _ in range(n)] - await asyncio.gather(*tasks, loop=self.loop) + await asyncio.gather(*tasks) await pool.close() async def test_standby_cursors(self): diff --git a/tests/test_prepare.py b/tests/test_prepare.py index 8fc06e3e..c441b45a 100644 --- a/tests/test_prepare.py +++ b/tests/test_prepare.py @@ -88,7 +88,7 @@ async def test_prepare_06_interrupted_close(self): stmt = await self.con.prepare('''SELECT pg_sleep(10)''') fut = self.loop.create_task(stmt.fetch()) - await asyncio.sleep(0.2, loop=self.loop) + await asyncio.sleep(0.2) self.assertFalse(self.con.is_closed()) await self.con.close() @@ -104,7 +104,7 @@ async def test_prepare_07_interrupted_terminate(self): stmt = await self.con.prepare('''SELECT pg_sleep(10)''') fut = self.loop.create_task(stmt.fetchval()) - await asyncio.sleep(0.2, loop=self.loop) + await asyncio.sleep(0.2) self.assertFalse(self.con.is_closed()) self.con.terminate() @@ -364,7 +364,7 @@ async def test_prepare_19_concurrent_calls(self): # Wait for some time to make sure the first query is fully # prepared (!) and is now awaiting the results (!!). - await asyncio.sleep(0.01, loop=self.loop) + await asyncio.sleep(0.01) with self.assertRaisesRegex(asyncpg.InterfaceError, 'another operation'): @@ -386,7 +386,7 @@ async def test_prepare_20_concurrent_calls(self): vf = self.loop.create_task( meth('SELECT ROW(pg_sleep(0.1), 1)')) - await asyncio.sleep(0.01, loop=self.loop) + await asyncio.sleep(0.01) with self.assertRaisesRegex(asyncpg.InterfaceError, 'another operation'): @@ -477,7 +477,7 @@ async def test_prepare_24_max_lifetime(self): s = await self.con._prepare('SELECT 1', use_cache=True) self.assertIs(s._state, state) - await asyncio.sleep(1, loop=self.loop) + await asyncio.sleep(1) s = await self.con._prepare('SELECT 1', use_cache=True) self.assertIsNot(s._state, state) @@ -492,7 +492,7 @@ async def test_prepare_25_max_lifetime_reset(self): # Disable max_lifetime cache.set_max_lifetime(0) - await asyncio.sleep(1, loop=self.loop) + await asyncio.sleep(1) # The statement should still be cached (as we disabled the timeout). s = await self.con._prepare('SELECT 1', use_cache=True) @@ -512,7 +512,7 @@ async def test_prepare_26_max_lifetime_max_size(self): self.assertIsNot(s._state, state) # Check that nothing crashes after the initial timeout - await asyncio.sleep(1, loop=self.loop) + await asyncio.sleep(1) @tb.with_connection_options(max_cacheable_statement_size=50) async def test_prepare_27_max_cacheable_statement_size(self): diff --git a/tests/test_test.py b/tests/test_test.py index 9e04a183..25050aac 100644 --- a/tests/test_test.py +++ b/tests/test_test.py @@ -16,7 +16,7 @@ class BaseSimpleTestCase: async def test_tests_zero_error(self): - await asyncio.sleep(0.01, loop=self.loop) + await asyncio.sleep(0.01) 1 / 0 diff --git a/tests/test_timeout.py b/tests/test_timeout.py index e03a3387..c2bca631 100644 --- a/tests/test_timeout.py +++ b/tests/test_timeout.py @@ -38,7 +38,7 @@ async def test_timeout_02(self): async def test_timeout_03(self): task = self.loop.create_task( self.con.fetch('select pg_sleep(10)', timeout=0.2)) - await asyncio.sleep(0.05, loop=self.loop) + await asyncio.sleep(0.05) task.cancel() with self.assertRaises(asyncio.CancelledError), \ self.assertRunUnder(MAX_RUNTIME): @@ -139,7 +139,7 @@ async def test_command_timeout_01(self): class SlowPrepareConnection(pg_connection.Connection): """Connection class to test timeouts.""" async def _get_statement(self, query, timeout): - await asyncio.sleep(0.3, loop=self._loop) + await asyncio.sleep(0.3) return await super()._get_statement(query, timeout) From dd91aa35f059cb120e03e061bd52fd4955e4e4d2 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:20:11 -0500 Subject: [PATCH 05/14] Close asyncio.Streams explicitly (do not rely on GC) --- asyncpg/connect_utils.py | 4 +++- asyncpg/connection.py | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 665fa06b..94cdc7ed 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -610,7 +610,9 @@ async def _negotiate_ssl_connection(host, port, conn_factory, *, loop, ssl, sock = sock.dup() # Must come before tr.close() finally: - tr.close() + writer.close() + if hasattr(writer, 'wait_closed'): + await writer.wait_closed() try: return await conn_factory(sock=sock) # Must come after tr.close() diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 11e639d6..2f3c34dd 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1223,6 +1223,8 @@ async def _cancel(self, waiter): waiter.set_result(None) if w is not None: w.close() + if hasattr(w, 'wait_closed'): + await w.wait_closed() def _cancel_current_command(self, waiter): self._cancellations.add(self._loop.create_task(self._cancel(waiter))) From ad9c3879cef2ee266e50a55c47ff98985bea87d2 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:20:29 -0500 Subject: [PATCH 06/14] Initialize asyncio queues in Pool in an async-def context --- asyncpg/pool.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 7b477e4d..53f8bebf 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -308,7 +308,9 @@ class Pool: '_init', '_connect_args', '_connect_kwargs', '_working_addr', '_working_config', '_working_params', '_holders', '_initialized', '_initializing', '_closing', - '_closed', '_connection_class', '_generation') + '_closed', '_connection_class', '_generation', + '_setup', '_max_queries', '_max_inactive_connection_lifetime' + ) def __init__(self, *connect_args, min_size, @@ -362,7 +364,7 @@ def __init__(self, *connect_args, self._holders = [] self._initialized = False self._initializing = False - self._queue = asyncio.LifoQueue(maxsize=self._maxsize) + self._queue = None self._working_addr = None self._working_config = None @@ -377,15 +379,10 @@ def __init__(self, *connect_args, self._connect_args = connect_args self._connect_kwargs = connect_kwargs - for _ in range(max_size): - ch = PoolConnectionHolder( - self, - max_queries=max_queries, - max_inactive_time=max_inactive_connection_lifetime, - setup=setup) - - self._holders.append(ch) - self._queue.put_nowait(ch) + self._setup = setup + self._max_queries = max_queries + self._max_inactive_connection_lifetime = \ + max_inactive_connection_lifetime async def _async__init__(self): if self._initialized: @@ -404,6 +401,17 @@ async def _async__init__(self): self._initialized = True async def _initialize(self): + self._queue = asyncio.LifoQueue(maxsize=self._maxsize) + for _ in range(self._maxsize): + ch = PoolConnectionHolder( + self, + max_queries=self._max_queries, + max_inactive_time=self._max_inactive_connection_lifetime, + setup=self._setup) + + self._holders.append(ch) + self._queue.put_nowait(ch) + if self._minsize: # Since we use a LIFO queue, the first items in the queue will be # the last ones in `self._holders`. We want to pre-connect the From ddafaa5168c77020085b715d0724f59b59e92a44 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:28:21 -0500 Subject: [PATCH 07/14] Pin flake8 & pycodestyle; fix style issues --- .flake8 | 2 +- asyncpg/pool.py | 15 ++++++++------- setup.py | 6 +++++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/.flake8 b/.flake8 index bfc97a81..7cf64d1f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -ignore = E402,E731 +ignore = E402,E731,W504,E252 exclude = .git,__pycache__,build,dist,.eggs,.github,.local diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 53f8bebf..20a3234e 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -304,13 +304,14 @@ class Pool: Pools are created by calling :func:`~asyncpg.pool.create_pool`. """ - __slots__ = ('_queue', '_loop', '_minsize', '_maxsize', - '_init', '_connect_args', '_connect_kwargs', - '_working_addr', '_working_config', '_working_params', - '_holders', '_initialized', '_initializing', '_closing', - '_closed', '_connection_class', '_generation', - '_setup', '_max_queries', '_max_inactive_connection_lifetime' - ) + __slots__ = ( + '_queue', '_loop', '_minsize', '_maxsize', + '_init', '_connect_args', '_connect_kwargs', + '_working_addr', '_working_config', '_working_params', + '_holders', '_initialized', '_initializing', '_closing', + '_closed', '_connection_class', '_generation', + '_setup', '_max_queries', '_max_inactive_connection_lifetime' + ) def __init__(self, *connect_args, min_size, diff --git a/setup.py b/setup.py index c34c7565..5e2e1494 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,11 @@ # Minimal dependencies required to test asyncpg. TEST_DEPENDENCIES = [ - 'flake8~=3.5.0', + # pycodestyle is a dependency of flake8, but it must be frozen because + # their combination breaks too often + # (example breakage: https://gitlab.com/pycqa/flake8/issues/427) + 'pycodestyle~=2.5.0', + 'flake8~=3.7.9', 'uvloop~=0.14.0;platform_system!="Windows"', ] From c2cfff1b25c7ec9d25a0a769e816309d12103782 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:28:37 -0500 Subject: [PATCH 08/14] Adopt new Python 3.8 tuple.__hash__ algo for Record.__hash__ --- asyncpg/protocol/record/recordobj.c | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/asyncpg/protocol/record/recordobj.c b/asyncpg/protocol/record/recordobj.c index e1de8b01..2f468a33 100644 --- a/asyncpg/protocol/record/recordobj.c +++ b/asyncpg/protocol/record/recordobj.c @@ -115,6 +115,48 @@ record_length(ApgRecordObject *o) } +#if PY_VERSION_HEX >= 0x03080000 + +#if SIZEOF_PY_UHASH_T > 4 +#define _PyHASH_XXPRIME_1 ((Py_uhash_t)11400714785074694791ULL) +#define _PyHASH_XXPRIME_2 ((Py_uhash_t)14029467366897019727ULL) +#define _PyHASH_XXPRIME_5 ((Py_uhash_t)2870177450012600261ULL) +#define _PyHASH_XXROTATE(x) ((x << 31) | (x >> 33)) /* Rotate left 31 bits */ +#else +#define _PyHASH_XXPRIME_1 ((Py_uhash_t)2654435761UL) +#define _PyHASH_XXPRIME_2 ((Py_uhash_t)2246822519UL) +#define _PyHASH_XXPRIME_5 ((Py_uhash_t)374761393UL) +#define _PyHASH_XXROTATE(x) ((x << 13) | (x >> 19)) /* Rotate left 13 bits */ +#endif + +static Py_hash_t +record_hash(ApgRecordObject *v) +{ + Py_ssize_t i; + Py_uhash_t acc = _PyHASH_XXPRIME_5; + Py_ssize_t len = Py_SIZE(v); + PyObject **els = v->ob_item; + for (i = 0; i < len; i++) { + Py_uhash_t lane = PyObject_Hash(els[i]); + if (lane == (Py_uhash_t)-1) { + return -1; + } + acc += lane * _PyHASH_XXPRIME_2; + acc = _PyHASH_XXROTATE(acc); + acc *= _PyHASH_XXPRIME_1; + } + + /* Add input length, mangled to keep the historical value of hash(()). */ + acc += len ^ (_PyHASH_XXPRIME_5 ^ 3527539UL); + + if (acc == (Py_uhash_t)-1) { + return 1546275796; + } + return (Py_hash_t)acc; +} + +#else + static Py_hash_t record_hash(ApgRecordObject *v) { @@ -150,6 +192,8 @@ record_hash(ApgRecordObject *v) return (Py_hash_t)x; } +#endif + static PyObject * record_richcompare(PyObject *v, PyObject *w, int op) From cc3ae9fe0e160f52f83388a7a6b9140ed86bf174 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:38:19 -0500 Subject: [PATCH 09/14] CI: Enable Python 3.8 --- .ci/appveyor.yml | 2 ++ .ci/travis-build-wheels.sh | 7 +++++-- .ci/travis-release.sh | 7 +++++-- .travis.yml | 14 +++++++++++++- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/.ci/appveyor.yml b/.ci/appveyor.yml index 7a2ec05d..6723c440 100644 --- a/.ci/appveyor.yml +++ b/.ci/appveyor.yml @@ -18,6 +18,8 @@ environment: - PYTHON: "C:\\Python36-x64\\python.exe" - PYTHON: "C:\\Python37\\python.exe" - PYTHON: "C:\\Python37-x64\\python.exe" + - PYTHON: "C:\\Python38\\python.exe" + - PYTHON: "C:\\Python38-x64\\python.exe" branches: # Avoid building PR branches. diff --git a/.ci/travis-build-wheels.sh b/.ci/travis-build-wheels.sh index 751a22ae..3705c7d0 100755 --- a/.ci/travis-build-wheels.sh +++ b/.ci/travis-build-wheels.sh @@ -39,9 +39,12 @@ pip install -U -r ".ci/requirements-publish.txt" if [ "${TRAVIS_OS_NAME}" == "linux" ]; then for pyver in ${RELEASE_PYTHON_VERSIONS}; do ML_PYTHON_VERSION=$(python3 -c \ - "print('cp{maj}{min}-cp{maj}{min}m'.format( \ + "print('cp{maj}{min}-cp{maj}{min}{s}'.format( \ maj='${pyver}'.split('.')[0], \ - min='${pyver}'.split('.')[1]))") + min='${pyver}'.split('.')[1], + s='m' if tuple('${pyver}'.split('.')) < ('3', '8') \ + else ''))") + for arch in x86_64 i686; do ML_IMAGE="quay.io/pypa/manylinux1_${arch}" diff --git a/.ci/travis-release.sh b/.ci/travis-release.sh index c9c1e936..4b6a999a 100755 --- a/.ci/travis-release.sh +++ b/.ci/travis-release.sh @@ -30,8 +30,11 @@ P="${PYMODULE}-${PACKAGE_VERSION}" expected_wheels=() for pyver in ${RELEASE_PYTHON_VERSIONS}; do - pyver="${pyver//./}" - abitag="cp${pyver}-cp${pyver}m" + abitag=$(python -c \ + "print('cp{maj}{min}-cp{maj}{min}{s}'.format( \ + maj='${pyver}'.split('.')[0], \ + min='${pyver}'.split('.')[1], + s='m' if tuple('${pyver}'.split('.')) < ('3', '8') else ''))") for plat in "${release_platforms[@]}"; do expected_wheels+=("${P}-${abitag}-${plat}.whl") done diff --git a/.travis.yml b/.travis.yml index 63ca04d5..be029141 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: generic env: global: - PYMODULE=asyncpg - - RELEASE_PYTHON_VERSIONS="3.5 3.6 3.7" + - RELEASE_PYTHON_VERSIONS="3.5 3.6 3.7 3.8" - S3_UPLOAD_USERNAME=oss-ci-bot - S3_UPLOAD_BUCKET=magicstack-oss-releases @@ -124,6 +124,15 @@ matrix: addons: apt: {packages: [postgresql-12]} + - os: linux + dist: xenial + sudo: true + language: python + python: "3.8" + env: BUILD=tests PGVERSION=12 + addons: + apt: {packages: [postgresql-12]} + # Build manylinux wheels. Each wheel will be tested, # so there is no need for BUILD=tests here. # Also use this job to publish the releases and build @@ -147,6 +156,9 @@ matrix: - os: osx env: BUILD=tests,wheels PYTHON_VERSION=3.7.4 PGVERSION=10 + - os: osx + env: BUILD=tests,wheels PYTHON_VERSION=3.8.0 PGVERSION=10 + cache: pip From 9ee0a439468539456346404a5aa751844f33ce9e Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 16:57:46 -0500 Subject: [PATCH 10/14] CI: Don't run uvloop tests on built wheels Latest uvloop doesn't run on manylinux1. --- Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/Makefile b/Makefile index 698f011a..d1033b66 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,6 @@ test: testinstalled: cd /tmp && $(PYTHON) $(ROOT)/tests/__init__.py - cd /tmp && USE_UVLOOP=1 $(PYTHON) $(ROOT)/tests/__init__.py quicktest: From 40277948f32484c9aff6184905700d40dfe4407c Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 19 Nov 2019 17:10:25 -0500 Subject: [PATCH 11/14] Handle Stream.wait_closed() windows bug gracefully --- asyncpg/compat.py | 11 +++++++++++ asyncpg/connect_utils.py | 3 +-- asyncpg/connection.py | 3 +-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/asyncpg/compat.py b/asyncpg/compat.py index ff4f27b4..99a561d0 100644 --- a/asyncpg/compat.py +++ b/asyncpg/compat.py @@ -79,3 +79,14 @@ def current_asyncio_task(loop): else: def current_asyncio_task(loop): return asyncio.Task.current_task(loop) + + +async def wait_closed(stream): + # Not all asyncio versions have StreamWriter.wait_closed(). + if hasattr(stream, 'wait_closed'): + try: + await stream.wait_closed() + except ConnectionResetError: + # On Windows wait_closed() sometimes propagates + # ConnectionResetError which is totally unnecessary. + pass diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 94cdc7ed..906afac5 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -611,8 +611,7 @@ async def _negotiate_ssl_connection(host, port, conn_factory, *, loop, ssl, sock = sock.dup() # Must come before tr.close() finally: writer.close() - if hasattr(writer, 'wait_closed'): - await writer.wait_closed() + await compat.wait_closed(writer) try: return await conn_factory(sock=sock) # Must come after tr.close() diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 2f3c34dd..298b55fa 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1223,8 +1223,7 @@ async def _cancel(self, waiter): waiter.set_result(None) if w is not None: w.close() - if hasattr(w, 'wait_closed'): - await w.wait_closed() + await compat.wait_closed(w) def _cancel_current_command(self, waiter): self._cancellations.add(self._loop.create_task(self._cancel(waiter))) From a9b3713027880dc58b47f70cf43b00ccdfc80089 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Wed, 20 Nov 2019 12:34:36 -0500 Subject: [PATCH 12/14] Skip test_adversity on Windows & Python 3.8 The way the test rig is written seems to be incompatible with ProactorEventLoop which is the dafault on Windows in Python 3.8 --- tests/test_adversity.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_adversity.py b/tests/test_adversity.py index 54b4f481..fa0f6ab1 100644 --- a/tests/test_adversity.py +++ b/tests/test_adversity.py @@ -8,12 +8,18 @@ import asyncio import os +import platform import unittest +import sys from asyncpg import _testbase as tb @unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing') +@unittest.skipIf( + platform.system() == 'Windows' and + sys.version_info >= (3, 8), + 'not compatible with ProactorEventLoop which is default in Python 3.8') class TestConnectionLoss(tb.ProxiedClusterTestCase): @tb.with_timeout(30.0) async def test_connection_close_timeout(self): From 3375fe9050593bb2e9c8621cfbfa5e1e6a3a93e7 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Wed, 20 Nov 2019 12:35:40 -0500 Subject: [PATCH 13/14] Use loop.start_tls() to upgrade connections to SSL The old way of TLS upgrade (openining a connection, asking postgres to do TLS and then duping the underlying socket) seems not to work anymore on Windows with Python 3.8. --- asyncpg/connect_utils.py | 175 +++++++++++++++++++++++++-------------- asyncpg/connection.py | 22 ++--- 2 files changed, 117 insertions(+), 80 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 906afac5..5e0a13be 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -504,6 +504,95 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, passfile, return addrs, params, config +class TLSUpgradeProto(asyncio.Protocol): + def __init__(self, loop, host, port, ssl_context, ssl_is_advisory): + self.on_data = _create_future(loop) + self.host = host + self.port = port + self.ssl_context = ssl_context + self.ssl_is_advisory = ssl_is_advisory + + def data_received(self, data): + if data == b'S': + self.on_data.set_result(True) + elif (self.ssl_is_advisory and + self.ssl_context.verify_mode == ssl_module.CERT_NONE and + data == b'N'): + # ssl_is_advisory will imply that ssl.verify_mode == CERT_NONE, + # since the only way to get ssl_is_advisory is from + # sslmode=prefer (or sslmode=allow). But be extra sure to + # disallow insecure connections when the ssl context asks for + # real security. + self.on_data.set_result(False) + else: + self.on_data.set_exception( + ConnectionError( + 'PostgreSQL server at "{host}:{port}" ' + 'rejected SSL upgrade'.format( + host=self.host, port=self.port))) + + def connection_lost(self, exc): + if not self.on_data.done(): + if exc is None: + exc = ConnectionError('unexpected connection_lost() call') + self.on_data.set_exception(exc) + + +async def _create_ssl_connection(protocol_factory, host, port, *, + loop, ssl_context, ssl_is_advisory=False): + + if ssl_context is True: + ssl_context = ssl_module.create_default_context() + + tr, pr = await loop.create_connection( + lambda: TLSUpgradeProto(loop, host, port, + ssl_context, ssl_is_advisory), + host, port) + + tr.write(struct.pack('!ll', 8, 80877103)) # SSLRequest message. + + try: + do_ssl_upgrade = await pr.on_data + except (Exception, asyncio.CancelledError): + tr.close() + raise + + if hasattr(loop, 'start_tls'): + if do_ssl_upgrade: + try: + new_tr = await loop.start_tls( + tr, pr, ssl_context, server_hostname=host) + except (Exception, asyncio.CancelledError): + tr.close() + raise + else: + new_tr = tr + + pg_proto = protocol_factory() + pg_proto.connection_made(new_tr) + new_tr.set_protocol(pg_proto) + + return new_tr, pg_proto + else: + conn_factory = functools.partial( + loop.create_connection, protocol_factory) + + if do_ssl_upgrade: + conn_factory = functools.partial( + conn_factory, ssl=ssl_context, server_hostname=host) + + sock = _get_socket(tr) + sock = sock.dup() + _set_nodelay(sock) + tr.close() + + try: + return await conn_factory(sock=sock) + except (Exception, asyncio.CancelledError): + sock.close() + raise + + async def _connect_addr(*, addr, loop, timeout, params, config, connection_class): assert loop is not None @@ -526,8 +615,6 @@ async def _connect_addr(*, addr, loop, timeout, params, config, else: connector = loop.create_connection(proto_factory, *addr) - connector = asyncio.ensure_future(connector) - before = time.monotonic() try: tr, pr = await asyncio.wait_for( @@ -575,79 +662,41 @@ async def _connect(*, loop, timeout, connection_class, **kwargs): raise last_error -async def _negotiate_ssl_connection(host, port, conn_factory, *, loop, ssl, - server_hostname, ssl_is_advisory=False): - # Note: ssl_is_advisory only affects behavior when the server does not - # accept SSLRequests. If the SSLRequest is accepted but either the SSL - # negotiation fails or the PostgreSQL user isn't permitted to use SSL, - # there's nothing that would attempt to reconnect with a non-SSL socket. - reader, writer = await asyncio.open_connection(host, port) - - tr = writer.transport - try: - sock = _get_socket(tr) - _set_nodelay(sock) - - writer.write(struct.pack('!ll', 8, 80877103)) # SSLRequest message. - await writer.drain() - resp = await reader.readexactly(1) - - if resp == b'S': - conn_factory = functools.partial( - conn_factory, ssl=ssl, server_hostname=server_hostname) - elif (ssl_is_advisory and - ssl.verify_mode == ssl_module.CERT_NONE and - resp == b'N'): - # ssl_is_advisory will imply that ssl.verify_mode == CERT_NONE, - # since the only way to get ssl_is_advisory is from sslmode=prefer - # (or sslmode=allow). But be extra sure to disallow insecure - # connections when the ssl context asks for real security. - pass - else: - raise ConnectionError( - 'PostgreSQL server at "{}:{}" rejected SSL upgrade'.format( - host, port)) - - sock = sock.dup() # Must come before tr.close() - finally: - writer.close() - await compat.wait_closed(writer) - - try: - return await conn_factory(sock=sock) # Must come after tr.close() - except (Exception, asyncio.CancelledError): - sock.close() - raise +async def _cancel(*, loop, addr, params: _ConnectionParameters, + backend_pid, backend_secret): + class CancelProto(asyncio.Protocol): -async def _create_ssl_connection(protocol_factory, host, port, *, - loop, ssl_context, ssl_is_advisory=False): - return await _negotiate_ssl_connection( - host, port, - functools.partial(loop.create_connection, protocol_factory), - loop=loop, - ssl=ssl_context, - server_hostname=host, - ssl_is_advisory=ssl_is_advisory) + def __init__(self): + self.on_disconnect = _create_future(loop) + def connection_lost(self, exc): + if not self.on_disconnect.done(): + self.on_disconnect.set_result(True) -async def _open_connection(*, loop, addr, params: _ConnectionParameters): if isinstance(addr, str): - r, w = await asyncio.open_unix_connection(addr) + tr, pr = await loop.create_unix_connection(CancelProto, addr) else: if params.ssl: - r, w = await _negotiate_ssl_connection( + tr, pr = await _create_ssl_connection( + CancelProto, *addr, - asyncio.open_connection, loop=loop, - ssl=params.ssl, - server_hostname=addr[0], + ssl_context=params.ssl, ssl_is_advisory=params.ssl_is_advisory) else: - r, w = await asyncio.open_connection(*addr) - _set_nodelay(_get_socket(w.transport)) + tr, pr = await loop.create_connection( + CancelProto, *addr) + _set_nodelay(_get_socket(tr)) + + # Pack a CancelRequest message + msg = struct.pack('!llll', 16, 80877102, backend_pid, backend_secret) - return r, w + try: + tr.write(msg) + await pr.on_disconnect + finally: + tr.close() def _get_socket(transport): diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 298b55fa..ef1b595d 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -10,7 +10,6 @@ import collections import collections.abc import itertools -import struct import sys import time import traceback @@ -1186,24 +1185,16 @@ async def _cleanup_stmts(self): await self._protocol.close_statement(stmt, protocol.NO_TIMEOUT) async def _cancel(self, waiter): - r = w = None - try: # Open new connection to the server - r, w = await connect_utils._open_connection( - loop=self._loop, addr=self._addr, params=self._params) - - # Pack CancelRequest message - msg = struct.pack('!llll', 16, 80877102, - self._protocol.backend_pid, - self._protocol.backend_secret) - - w.write(msg) - await r.read() # Wait until EOF + await connect_utils._cancel( + loop=self._loop, addr=self._addr, params=self._params, + backend_pid=self._protocol.backend_pid, + backend_secret=self._protocol.backend_secret) except ConnectionResetError as ex: # On some systems Postgres will reset the connection # after processing the cancellation command. - if r is None and not waiter.done(): + if not waiter.done(): waiter.set_exception(ex) except asyncio.CancelledError: # There are two scenarios in which the cancellation @@ -1221,9 +1212,6 @@ async def _cancel(self, waiter): compat.current_asyncio_task(self._loop)) if not waiter.done(): waiter.set_result(None) - if w is not None: - w.close() - await compat.wait_closed(w) def _cancel_current_command(self, waiter): self._cancellations.add(self._loop.create_task(self._cancel(waiter))) From 5e585efdcc7922e8d3c0fedccf8e43aac646bc7d Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Wed, 20 Nov 2019 13:00:18 -0500 Subject: [PATCH 14/14] Silence 3.8 warnings about implicit __int__ conversion --- asyncpg/pgproto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncpg/pgproto b/asyncpg/pgproto index 4de72c5d..484e3520 160000 --- a/asyncpg/pgproto +++ b/asyncpg/pgproto @@ -1 +1 @@ -Subproject commit 4de72c5de696e5892a4209d94b9e3af0043ca7d5 +Subproject commit 484e3520d8cb0514b7596a8f9eaa80f3f7b79d0c