From 654337863043f3a83a9bf2dfe7d7fb4868adb5e2 Mon Sep 17 00:00:00 2001 From: Jakub Drobena Date: Wed, 8 Nov 2023 14:39:11 +0100 Subject: [PATCH 1/3] Changed compression level to constant --- ccx_messaging/publishers/kafka_publisher.py | 4 ++-- test/publishers/kafka_publisher_test.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ccx_messaging/publishers/kafka_publisher.py b/ccx_messaging/publishers/kafka_publisher.py index 6376fc8..8446276 100644 --- a/ccx_messaging/publishers/kafka_publisher.py +++ b/ccx_messaging/publishers/kafka_publisher.py @@ -23,7 +23,7 @@ from ccx_messaging.error import CCXMessagingError log = logging.getLogger(__name__) - +DEFAULT_COMPRESSION = 9 class KafkaPublisher(Publisher): """ @@ -70,7 +70,7 @@ def __init__(self, outgoing_topic: str, kafka_broker_config: dict = None, **kwar def produce(self, outgoing_message: bytes): """Send the message though the Kafka producer.""" if self.compression: - self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=9)) + self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=DEFAULT_COMPRESSION)) else: self.producer.produce(self.topic, outgoing_message) self.producer.poll(0) diff --git a/test/publishers/kafka_publisher_test.py b/test/publishers/kafka_publisher_test.py index e119aed..4c54e13 100644 --- a/test/publishers/kafka_publisher_test.py +++ b/test/publishers/kafka_publisher_test.py @@ -106,6 +106,7 @@ } ), ] +DEFAULT_COMPRESSION = 9 def test_init(): """Check that init creates a valid object.""" @@ -125,7 +126,7 @@ def test_init_compression(): @pytest.mark.parametrize("input", VALID_INPUT_MSG) def test_compressing_enabled(input): input = bytes(json.dumps(input) + "\n",'utf-8') - expected_output = gzip.compress(input) + expected_output = gzip.compress(input,compresslevel=DEFAULT_COMPRESSION) kakfa_config = { "bootstrap.servers": "kafka:9092", "compression" : "gzip" From 58cf03c7e7ecd59a580bff1e3b97c8f04af74528 Mon Sep 17 00:00:00 2001 From: Jakub Drobena Date: Wed, 8 Nov 2023 14:48:03 +0100 Subject: [PATCH 2/3] Changed compression level to constant --- ccx_messaging/publishers/kafka_publisher.py | 4 ++-- test/publishers/kafka_publisher_test.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ccx_messaging/publishers/kafka_publisher.py b/ccx_messaging/publishers/kafka_publisher.py index 8446276..402e710 100644 --- a/ccx_messaging/publishers/kafka_publisher.py +++ b/ccx_messaging/publishers/kafka_publisher.py @@ -23,7 +23,7 @@ from ccx_messaging.error import CCXMessagingError log = logging.getLogger(__name__) -DEFAULT_COMPRESSION = 9 +BEST_COMPRESSION = 9 class KafkaPublisher(Publisher): """ @@ -70,7 +70,7 @@ def __init__(self, outgoing_topic: str, kafka_broker_config: dict = None, **kwar def produce(self, outgoing_message: bytes): """Send the message though the Kafka producer.""" if self.compression: - self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=DEFAULT_COMPRESSION)) + self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=BEST_COMPRESSION)) else: self.producer.produce(self.topic, outgoing_message) self.producer.poll(0) diff --git a/test/publishers/kafka_publisher_test.py b/test/publishers/kafka_publisher_test.py index 4c54e13..bdebaa3 100644 --- a/test/publishers/kafka_publisher_test.py +++ b/test/publishers/kafka_publisher_test.py @@ -106,7 +106,7 @@ } ), ] -DEFAULT_COMPRESSION = 9 +BEST_COMPRESSION = 9 def test_init(): """Check that init creates a valid object.""" @@ -126,7 +126,7 @@ def test_init_compression(): @pytest.mark.parametrize("input", VALID_INPUT_MSG) def test_compressing_enabled(input): input = bytes(json.dumps(input) + "\n",'utf-8') - expected_output = gzip.compress(input,compresslevel=DEFAULT_COMPRESSION) + expected_output = gzip.compress(input,compresslevel=BEST_COMPRESSION) kakfa_config = { "bootstrap.servers": "kafka:9092", "compression" : "gzip" From fd63c80b280b6dda163f5eb34e26f73d9db4e1e7 Mon Sep 17 00:00:00 2001 From: Jakub Drobena Date: Thu, 9 Nov 2023 11:36:59 +0100 Subject: [PATCH 3/3] Masking timestamp bytes --- test/publishers/kafka_publisher_test.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/test/publishers/kafka_publisher_test.py b/test/publishers/kafka_publisher_test.py index bdebaa3..a35cfd5 100644 --- a/test/publishers/kafka_publisher_test.py +++ b/test/publishers/kafka_publisher_test.py @@ -108,6 +108,15 @@ ] BEST_COMPRESSION = 9 +def timeStampMasking(message): + message=list(message) + message[4] = 0 + message[5] = 0 + message[6] = 0 + message[7] = 0 + message = bytes(message) + return message + def test_init(): """Check that init creates a valid object.""" kakfa_config = { @@ -126,7 +135,7 @@ def test_init_compression(): @pytest.mark.parametrize("input", VALID_INPUT_MSG) def test_compressing_enabled(input): input = bytes(json.dumps(input) + "\n",'utf-8') - expected_output = gzip.compress(input,compresslevel=BEST_COMPRESSION) + expected_output = timeStampMasking(gzip.compress(input,compresslevel=BEST_COMPRESSION)) kakfa_config = { "bootstrap.servers": "kafka:9092", "compression" : "gzip" @@ -134,7 +143,9 @@ def test_compressing_enabled(input): pub = KafkaPublisher(outgoing_topic="topic-name", **kakfa_config) pub.producer = MagicMock() pub.produce(input) - pub.producer.produce.assert_called_with("topic-name",expected_output) + outgoing_topic = pub.producer.produce.call_args[0][0] + outgoing_message = timeStampMasking(pub.producer.produce.call_args[0][1]) + assert outgoing_message == expected_output and outgoing_topic == "topic-name" @pytest.mark.parametrize("input", VALID_INPUT_MSG) def test_compressing_disabled(input):