From 8bc83f736e3052ec4b6a8a2ab122ceaf6183e9e6 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Tue, 28 Dec 2021 19:37:27 +0300 Subject: [PATCH 1/6] tests: explicitly close loops --- tests/_testutil.py | 12 ++++++++---- tests/conftest.py | 6 ++++-- tests/test_consumer.py | 1 + tests/test_producer.py | 1 + 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/_testutil.py b/tests/_testutil.py index 7aac6ec6..5a4b5f02 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -422,10 +422,14 @@ def random_string(length): def wait_kafka(kafka_host, kafka_port, timeout=60): - loop = asyncio.get_event_loop() - return loop.run_until_complete( - _wait_kafka(kafka_host, kafka_port, timeout) - ) + loop = asyncio.new_event_loop() + try: + res = loop.run_until_complete( + _wait_kafka(kafka_host, kafka_port, timeout) + ) + finally: + loop.close() + return res async def _wait_kafka(kafka_host, kafka_port, timeout): diff --git a/tests/conftest.py b/tests/conftest.py index 0d6083e2..4c6db502 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,9 @@ def docker(request): image = request.config.getoption('--docker-image') if not image: return None - return libdocker.from_env() + client = libdocker.from_env() + yield client + client.close() @pytest.fixture(scope='class') @@ -231,7 +233,7 @@ def kafka_server(request, docker, docker_ip_address, kafka_sasl_ssl_port, container ) finally: - container.remove(force=True) + container.stop() else: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f37f0578..4034577e 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -130,6 +130,7 @@ def test_create_consumer_no_running_loop(self): loop.run_until_complete(consumer.getone()) finally: loop.run_until_complete(consumer.stop()) + loop.close() @run_until_complete async def test_consumer_context_manager(self): diff --git a/tests/test_producer.py b/tests/test_producer.py index 5a890259..5da7684c 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -148,6 +148,7 @@ def test_create_producer_no_running_loop(self): self.assertEqual(resp.offset, 0) finally: loop.run_until_complete(producer.stop()) + loop.close() @run_until_complete async def test_producer_context_manager(self): From 4289a8ce500f8a9a4651a96b9fb14b62d9bf42cc Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 29 Dec 2021 15:00:24 +0300 Subject: [PATCH 2/6] --amend --- pytest.ini | 1 + tests/conftest.py | 11 ++++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pytest.ini b/pytest.ini index 6171b335..6bbb1cdf 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,7 @@ [pytest] filterwarnings = error + # https://github.com/docker/docker-py/issues/1293 ignore:.*docker.sock.*:ResourceWarning markers = ssl: Tests that require SSL certificates to run diff --git a/tests/conftest.py b/tests/conftest.py index 4c6db502..1a250007 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,11 +50,12 @@ def pytest_configure(config): @pytest.fixture(scope='session') def docker(request): image = request.config.getoption('--docker-image') - if not image: - return None - client = libdocker.from_env() - yield client - client.close() + if image: + client = libdocker.from_env() + yield client + client.close() + else: + yield None @pytest.fixture(scope='class') From 512e0d97a10190a3d163d617676fe4979abf3148 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 29 Dec 2021 12:36:48 +0300 Subject: [PATCH 3/6] Clean up if timed out while connecting --- aiokafka/conn.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/aiokafka/conn.py b/aiokafka/conn.py index b8c5a788..bdc84e53 100644 --- a/aiokafka/conn.py +++ b/aiokafka/conn.py @@ -227,11 +227,15 @@ async def connect(self): self._idle_handle = loop.call_soon( self._idle_check, weakref.ref(self)) - if self._version_hint and self._version_hint >= (0, 10): - await self._do_version_lookup() + try: + if self._version_hint and self._version_hint >= (0, 10): + await self._do_version_lookup() - if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]: - await self._do_sasl_handshake() + if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]: + await self._do_sasl_handshake() + except: # noqa: E722 + self.close() + raise return reader, writer From 6a9e639a16caf91d4a91b3d678abdf409d2324a6 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 29 Dec 2021 12:52:42 +0300 Subject: [PATCH 4/6] Fix AttributeError in tests --- tests/test_consumer.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 4034577e..5a68c299 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1291,6 +1291,7 @@ async def test_rebalance_listener_with_coroutines(self): await self.send_messages(1, list(range(10, 20))) main_self = self + faults = [] class SimpleRebalanceListener(ConsumerRebalanceListener): def __init__(self, consumer): @@ -1306,16 +1307,24 @@ async def on_partitions_revoked(self, revoked): # Confirm that coordinator is actually waiting for callback to # complete await asyncio.sleep(0.2) - main_self.assertTrue( - self.consumer._coordinator.needs_join_prepare) + try: + main_self.assertTrue( + self.consumer._coordinator._rejoin_needed_fut.done()) + except Exception as exc: + # Exceptions here are intercepted by GroupCoordinator + faults.append(exc) async def on_partitions_assigned(self, assigned): self.assign_mock(assigned) # Confirm that coordinator is actually waiting for callback to # complete await asyncio.sleep(0.2) - main_self.assertFalse( - self.consumer._coordinator.needs_join_prepare) + try: + main_self.assertFalse( + self.consumer._coordinator._rejoin_needed_fut.done()) + except Exception as exc: + # Exceptions here are intercepted by GroupCoordinator + faults.append(exc) tp0 = TopicPartition(self.topic, 0) tp1 = TopicPartition(self.topic, 1) @@ -1334,6 +1343,8 @@ async def on_partitions_assigned(self, assigned): self.assertEqual(msg.value, b"10") listener1.revoke_mock.assert_called_with(set()) listener1.assign_mock.assert_called_with({tp0, tp1}) + if faults: + raise faults[0] # By adding a 2nd consumer we trigger rebalance consumer2 = AIOKafkaConsumer( @@ -1368,6 +1379,8 @@ async def on_partitions_assigned(self, assigned): self.assertEqual(listener2.revoke_mock.call_count, 1) listener2.assign_mock.assert_called_with(c2_assignment) self.assertEqual(listener2.assign_mock.call_count, 1) + if faults: + raise faults[0] @run_until_complete async def test_rebalance_listener_no_deadlock_callbacks(self): From 49ce49bf89820a57293c18f9f4acc924539dc8d1 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 29 Dec 2021 15:48:26 +0300 Subject: [PATCH 5/6] Move away from deprecated distutils --- aiokafka/util.py | 10 +++++++--- pytest.ini | 1 + setup.py | 11 ++++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/aiokafka/util.py b/aiokafka/util.py index 21de4b2b..5dab710c 100644 --- a/aiokafka/util.py +++ b/aiokafka/util.py @@ -1,10 +1,10 @@ import asyncio import os from asyncio import AbstractEventLoop -from distutils.version import StrictVersion -from typing import Awaitable, Dict, Tuple, TypeVar, Union +from typing import Awaitable, Dict, Tuple, TypeVar, Union, cast import async_timeout +from packaging.version import Version from .structs import OffsetAndMetadata, TopicPartition @@ -40,7 +40,11 @@ async def wait_for(fut: Awaitable[T], timeout: Union[None, int, float] = None) - def parse_kafka_version(api_version: str) -> Tuple[int, int, int]: - version = StrictVersion(api_version).version + parsed = Version(api_version).release + if not 2 <= len(parsed) <= 3: + raise ValueError(api_version) + version = cast(Tuple[int, int, int], (parsed + (0,))[:3]) + if not (0, 9) <= version < (3, 0): raise ValueError(api_version) return version diff --git a/pytest.ini b/pytest.ini index 6bbb1cdf..53d0874d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,5 +3,6 @@ filterwarnings = error # https://github.com/docker/docker-py/issues/1293 ignore:.*docker.sock.*:ResourceWarning + ignore:distutils Version classes are deprecated:DeprecationWarning:docker markers = ssl: Tests that require SSL certificates to run diff --git a/setup.py b/setup.py index 95c80c20..ab2e09d9 100644 --- a/setup.py +++ b/setup.py @@ -2,11 +2,11 @@ import platform import re import sys -from distutils.command.bdist_rpm import bdist_rpm as _bdist_rpm -from distutils.command.build_ext import build_ext -from distutils.errors import CCompilerError, DistutilsExecError, DistutilsPlatformError from setuptools import Extension, setup +from setuptools.command.bdist_rpm import bdist_rpm as _bdist_rpm +from setuptools.command.build_ext import build_ext +from setuptools.errors import CCompilerError, ExecError, PlatformError # Those are needed to build _hton for windows @@ -88,13 +88,13 @@ class ve_build_ext(build_ext): def run(self): try: build_ext.run(self) - except (DistutilsPlatformError, FileNotFoundError): + except (PlatformError, FileNotFoundError): raise BuildFailed() def build_extension(self, ext): try: build_ext.build_extension(self, ext) - except (CCompilerError, DistutilsExecError, DistutilsPlatformError, ValueError): + except (CCompilerError, ExecError, PlatformError, ValueError): raise BuildFailed() @@ -102,6 +102,7 @@ def build_extension(self, ext): "async-timeout", "kafka-python>=2.0.0", "dataclasses>=0.5; python_version<'3.7'", + "packaging", ] PY_VER = sys.version_info From 451d045c8a1d40700d8a9010130c862670ca98d5 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 29 Dec 2021 16:14:09 +0300 Subject: [PATCH 6/6] Move away from deprecated distutils --- pytest.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pytest.ini b/pytest.ini index 53d0874d..c3d76d9c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,5 +4,7 @@ filterwarnings = # https://github.com/docker/docker-py/issues/1293 ignore:.*docker.sock.*:ResourceWarning ignore:distutils Version classes are deprecated:DeprecationWarning:docker + # Actually comes from docker importing distutils on Windows + ignore:the imp module is deprecated in favour of importlib:DeprecationWarning:pywintypes markers = ssl: Tests that require SSL certificates to run