Skip to content

Commit

Permalink
Reconfigure tests to complete in a more timely manner and skip some i…
Browse files Browse the repository at this point in the history
…terations for Kafka 0.8.2 and Python 3.12 (#159)

* skip failing tests for PyPy since they work locally

* Reconfigure tests for PyPy and 3.12

* Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2

* Update test_partitioner.py

* Update test_producer.py

* Timeout tests after ten minutes

* Set 0.8.2.2 to be experimental from hereon

* Formally support PyPy 3.9
  • Loading branch information
wbarnha authored Mar 8, 2024
1 parent e796019 commit 38e159a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 4 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
include:
- python-version: "pypy3.9"
experimental: true
- python-version: "~3.13.0-0"
experimental: true
steps:
Expand Down Expand Up @@ -115,11 +114,11 @@ jobs:
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
kafka-version:
- "0.8.2.2"
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
Expand All @@ -128,6 +127,11 @@ jobs:
- "2.4.0"
- "2.5.0"
- "2.6.0"
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
continue-on-error: ${{ matrix.experimental }}
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import platform

import pytest

from logging import info
Expand Down Expand Up @@ -151,6 +153,9 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client):
group_description = kafka_admin_client.describe_consumer_groups(['test'])


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline."
)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
Expand Down
4 changes: 4 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
import platform
import threading
import time

Expand Down Expand Up @@ -40,6 +41,9 @@ def test_consumer_topics(kafka_broker, topic):
consumer.close()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline."
)
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
Expand Down
1 change: 1 addition & 0 deletions test/test_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest


from kafka.partitioner import DefaultPartitioner, murmur2


Expand Down
6 changes: 5 additions & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gc
import platform
import sys
import time
import threading

Expand All @@ -10,6 +11,7 @@
from test.testutil import env_kafka_version, random_string


@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
def test_buffer_pool():
pool = SimpleBufferPool(1000, 1000)

Expand All @@ -21,8 +23,8 @@ def test_buffer_pool():
buf2 = pool.allocate(1000, 1000)
assert buf2.read() == b''


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
if compression == 'lz4':
Expand Down Expand Up @@ -70,6 +72,7 @@ def test_end_to_end(kafka_broker, compression):

@pytest.mark.skipif(platform.python_implementation() != 'CPython',
reason='Test relies on CPython-specific gc policies')
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
def test_kafka_producer_gc_cleanup():
gc.collect()
threads = threading.active_count()
Expand All @@ -81,6 +84,7 @@ def test_kafka_producer_gc_cleanup():


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
Expand Down

0 comments on commit 38e159a

Please sign in to comment.