diff --git a/ccx_messaging/publishers/kafka_publisher.py b/ccx_messaging/publishers/kafka_publisher.py index 8446276..402e710 100644 --- a/ccx_messaging/publishers/kafka_publisher.py +++ b/ccx_messaging/publishers/kafka_publisher.py @@ -23,7 +23,7 @@ from ccx_messaging.error import CCXMessagingError log = logging.getLogger(__name__) -DEFAULT_COMPRESSION = 9 +BEST_COMPRESSION = 9 class KafkaPublisher(Publisher): """ @@ -70,7 +70,7 @@ def __init__(self, outgoing_topic: str, kafka_broker_config: dict = None, **kwar def produce(self, outgoing_message: bytes): """Send the message though the Kafka producer.""" if self.compression: - self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=DEFAULT_COMPRESSION)) + self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=BEST_COMPRESSION)) else: self.producer.produce(self.topic, outgoing_message) self.producer.poll(0) diff --git a/test/publishers/kafka_publisher_test.py b/test/publishers/kafka_publisher_test.py index 4c54e13..bdebaa3 100644 --- a/test/publishers/kafka_publisher_test.py +++ b/test/publishers/kafka_publisher_test.py @@ -106,7 +106,7 @@ } ), ] -DEFAULT_COMPRESSION = 9 +BEST_COMPRESSION = 9 def test_init(): """Check that init creates a valid object.""" @@ -126,7 +126,7 @@ def test_init_compression(): @pytest.mark.parametrize("input", VALID_INPUT_MSG) def test_compressing_enabled(input): input = bytes(json.dumps(input) + "\n",'utf-8') - expected_output = gzip.compress(input,compresslevel=DEFAULT_COMPRESSION) + expected_output = gzip.compress(input,compresslevel=BEST_COMPRESSION) kakfa_config = { "bootstrap.servers": "kafka:9092", "compression" : "gzip"