From 1313d3f055b2a273168dbb0e9857c2d7ce343dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Segura=20Lucas?= Date: Tue, 21 Mar 2023 16:32:53 +0100 Subject: [PATCH] Improve Kafka clowder configuration injection --- ccx_messaging/consumers/consumer.py | 7 ++- ccx_messaging/consumers/kafka_consumer.py | 28 ++++----- .../publishers/data_pipeline_publisher.py | 18 ++++-- ccx_messaging/publishers/sha_publisher.py | 11 +++- ccx_messaging/utils/clowder.py | 30 ++++------ ccx_messaging/utils/kafka_config.py | 25 ++++++++ .../watchers/payload_tracker_watcher.py | 23 ++++++-- test/consumers/consumer_test.py | 2 +- test/consumers/kafka_consumer_test.py | 8 ++- test/{utils.py => test_utils.py} | 0 test/utils/__init__.py | 15 +++++ test/utils/kafka_config_test.py | 57 +++++++++++++++++++ 12 files changed, 176 insertions(+), 48 deletions(-) rename test/{utils.py => test_utils.py} (100%) create mode 100644 test/utils/__init__.py create mode 100644 test/utils/kafka_config_test.py diff --git a/ccx_messaging/consumers/consumer.py b/ccx_messaging/consumers/consumer.py index e8d1bbd..f550b79 100644 --- a/ccx_messaging/consumers/consumer.py +++ b/ccx_messaging/consumers/consumer.py @@ -31,7 +31,7 @@ from ccx_messaging.error import CCXMessagingError from ccx_messaging.schemas import INPUT_MESSAGE_SCHEMA, IDENTITY_SCHEMA -from ccx_messaging.utils.kafka_config import producer_config +from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration LOG = logging.getLogger(__name__) @@ -64,6 +64,7 @@ def __init__( downloader, engine, incoming_topic, + kafka_broker_config=None, dead_letter_queue_topic=None, max_record_age=7200, retry_backoff_ms=1000, @@ -89,6 +90,9 @@ def __init__( super().__init__(publisher, downloader, engine, requeuer=requeuer) + kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) + kwargs.update(kafka_broker_cfg) + self.consumer = KafkaConsumer( incoming_topic, value_deserializer=self.deserialize, @@ -113,7 +117,6 @@ def __init__( if self.dead_letter_queue_topic is not None: dlq_producer_config = producer_config(kwargs) - self.dlq_producer = KafkaProducer(**dlq_producer_config) def _consume(self, msg): diff --git a/ccx_messaging/consumers/kafka_consumer.py b/ccx_messaging/consumers/kafka_consumer.py index 44381dc..9f547aa 100644 --- a/ccx_messaging/consumers/kafka_consumer.py +++ b/ccx_messaging/consumers/kafka_consumer.py @@ -10,7 +10,7 @@ from ccx_messaging.error import CCXMessagingError from ccx_messaging.ingress import parse_ingress_message -from ccx_messaging.utils.kafka_config import producer_config +from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration LOG = logging.getLogger(__name__) @@ -26,10 +26,10 @@ def __init__( downloader, engine, incoming_topic, + kafka_broker_config=None, platform_service=None, dead_letter_queue_topic=None, max_record_age=7200, - retry_backoff_ms=1000, processing_timeout_s=0, **kwargs, ): @@ -37,25 +37,26 @@ def __init__( requeuer = kwargs.pop("requeuer", None) super().__init__(publisher, downloader, engine, requeuer=requeuer) + # Confluent initialization LOG.info( "Consuming topic '%s' from brokers %s as group '%s'", incoming_topic, - kwargs.get("bootstrap_servers", None), - kwargs.get("group_id", None), + kwargs.get("bootstrap.servers", None), + kwargs.get("group.id", None), ) - # Confluent initialization - config = { - "bootstrap.servers": kwargs.get("bootstrap_servers", ""), - "group.id": kwargs.get("group_id", None), - "retry.backoff.ms": retry_backoff_ms, - } + if kafka_broker_config: + kwargs.update(kafka_broker_config) + + if "retry.backoff.ms" not in kwargs: + kwargs["retry.backoff.ms"] = 1000 - self.consumer = ConfluentConsumer(config) + LOG.debug("Confluent Kafka consumer configuration arguments: %s", kwargs) + self.consumer = ConfluentConsumer(kwargs) self.consumer.subscribe([incoming_topic]) # Self handled vars - self.log_pattern = f"topic: {incoming_topic}, group_id: {kwargs.get('group_id', None)}" + self.log_pattern = f"topic: {incoming_topic}, group.id: {kwargs.get('group.id', None)}" # Service to filter in messages self.platform_service = platform_service @@ -74,7 +75,8 @@ def __init__( self.dead_letter_queue_topic = dead_letter_queue_topic if self.dead_letter_queue_topic is not None: - dlq_producer_config = producer_config(kwargs) + config = translate_kafka_configuration(kafka_broker_config) + dlq_producer_config = producer_config(config) self.dlq_producer = KafkaProducer(**dlq_producer_config) def get_url(self, input_msg: dict) -> str: diff --git a/ccx_messaging/publishers/data_pipeline_publisher.py b/ccx_messaging/publishers/data_pipeline_publisher.py index b7fc6e9..b5632eb 100644 --- a/ccx_messaging/publishers/data_pipeline_publisher.py +++ b/ccx_messaging/publishers/data_pipeline_publisher.py @@ -21,7 +21,7 @@ from kafka import KafkaProducer from ccx_messaging.error import CCXMessagingError -from ccx_messaging.utils.kafka_config import producer_config +from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration LOG = logging.getLogger(__name__) @@ -37,16 +37,24 @@ class DataPipelinePublisher(Publisher): Custom error handling for the whole pipeline is implemented here. """ - def __init__(self, outgoing_topic, bootstrap_servers, **kwargs): + def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs): """Construct a new `DataPipelinePublisher` given `kwargs` from the config YAML.""" self.topic = outgoing_topic self.bootstrap_servers = bootstrap_servers if self.topic is None: raise KeyError("outgoing_topic") - self.producer = KafkaProducer( - bootstrap_servers=self.bootstrap_servers, **producer_config(kwargs) - ) + kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) + kwargs.update(kafka_broker_cfg) + + if "bootstrap_servers" not in kwargs: + kwargs["bootstrap_servers"] = self.bootstrap_servers + else: + self.bootstrap_servers = kwargs["bootstrap_servers"] + + kwargs = producer_config(kwargs) + LOG.debug("Kafka publisher configuration arguments: %s", kwargs) + self.producer = KafkaProducer(**kwargs) LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers) self.outdata_schema_version = 2 diff --git a/ccx_messaging/publishers/sha_publisher.py b/ccx_messaging/publishers/sha_publisher.py index 4f1e70b..97d8a61 100644 --- a/ccx_messaging/publishers/sha_publisher.py +++ b/ccx_messaging/publishers/sha_publisher.py @@ -21,6 +21,7 @@ from kafka import KafkaProducer from ccx_messaging.error import CCXMessagingError +from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration LOG = logging.getLogger(__name__) @@ -36,14 +37,20 @@ class SHAPublisher(Publisher): Custom error handling for the whole pipeline is implemented here. """ - def __init__(self, outgoing_topic, bootstrap_servers, **kwargs): + def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs): """Construct a new `SHAPublisher` given `kwargs` from the config YAML.""" self.topic = outgoing_topic self.bootstrap_servers = bootstrap_servers if self.topic is None: raise KeyError("outgoing_topic") - self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, **kwargs) + kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) + kwargs.update(kafka_broker_cfg) + + if "bootstrap_servers" not in kwargs: + kwargs["bootstrap_servers"] = self.bootstrap_servers + + self.producer = KafkaProducer(**producer_config(kwargs)) LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers) self.outdata_schema_version = 2 diff --git a/ccx_messaging/utils/clowder.py b/ccx_messaging/utils/clowder.py index 5c6f581..5837b10 100644 --- a/ccx_messaging/utils/clowder.py +++ b/ccx_messaging/utils/clowder.py @@ -30,52 +30,46 @@ def apply_clowder_config(manifest): # Find the Payload Tracker watcher, as it might be affected by config changes pt_watcher_name = "ccx_messaging.watchers.payload_tracker_watcher.PayloadTrackerWatcher" - watcher = None + pt_watcher = None for watcher in config["service"]["watchers"]: if watcher["name"] == pt_watcher_name: + pt_watcher = watcher break clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0] kafka_url = f"{clowder_broker_config.hostname}:{clowder_broker_config.port}" logger.debug("Kafka URL: %s", kafka_url) - kafka_broker_config = { - "bootstrap_servers": kafka_url, - } + kafka_broker_config = {"bootstrap.servers": kafka_url} if clowder_broker_config.cacert: # Current Kafka library is not able to handle the CA file, only a path to it # FIXME: Duplicating parameters in order to be used by both Kafka libraries ssl_ca_location = app_common_python.LoadedConfig.kafka_ca() - kafka_broker_config["ssl_cafile"] = ssl_ca_location kafka_broker_config["ssl.ca.location"] = ssl_ca_location if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl": kafka_broker_config.update( { - "sasl_mechanism": clowder_broker_config.sasl.saslMechanism, - "sasl.mechanism": clowder_broker_config.sasl.saslMechanism, - "sasl_plain_username": clowder_broker_config.sasl.username, + "sasl.mechanisms": clowder_broker_config.sasl.saslMechanism, "sasl.username": clowder_broker_config.sasl.username, - "sasl_plain_password": clowder_broker_config.sasl.password, "sasl.password": clowder_broker_config.sasl.password, - "security_protocol": clowder_broker_config.sasl.securityProtocol, "security.protocol": clowder_broker_config.sasl.securityProtocol, } ) - config["service"]["consumer"]["kwargs"].update(kafka_broker_config) - config["service"]["publisher"]["kwargs"].update(kafka_broker_config) + config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config + config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config - if watcher: - watcher["kwargs"].update(kafka_broker_config) + if pt_watcher: + pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config logger.info("Kafka configuration updated from Clowder configuration") consumer_topic = config["service"]["consumer"]["kwargs"].get("incoming_topic") dlq_topic = config["service"]["consumer"]["kwargs"].get("dead_letter_queue_topic") - producer_topic = config["service"]["publisher"]["kwargs"].pop("outgoing_topic") - payload_tracker_topic = watcher["kwargs"].pop("topic") + producer_topic = config["service"]["publisher"]["kwargs"].get("outgoing_topic") + payload_tracker_topic = pt_watcher["kwargs"].pop("topic") if pt_watcher else None if consumer_topic in app_common_python.KafkaTopics: topic_cfg = app_common_python.KafkaTopics[consumer_topic] @@ -93,9 +87,9 @@ def apply_clowder_config(manifest): else: logger.warn("The publisher topic cannot be found in Clowder mapping. It can cause errors") - if payload_tracker_topic in app_common_python.KafkaTopics: + if pt_watcher and payload_tracker_topic in app_common_python.KafkaTopics: topic_cfg = app_common_python.KafkaTopics[payload_tracker_topic] - watcher["kwargs"]["topic"] = topic_cfg.name + pt_watcher["kwargs"]["topic"] = topic_cfg.name else: logger.warn( "The Payload Tracker watcher topic cannot be found in Clowder mapping. " diff --git a/ccx_messaging/utils/kafka_config.py b/ccx_messaging/utils/kafka_config.py index b687485..c907ce4 100644 --- a/ccx_messaging/utils/kafka_config.py +++ b/ccx_messaging/utils/kafka_config.py @@ -7,3 +7,28 @@ def producer_config(config): """Clean up the provided configuration in order to be used by a Kafka producer.""" producer_allowed_arguments = list(KafkaProducer.DEFAULT_CONFIG.keys()) return {key: value for key, value in config.items() if key in producer_allowed_arguments} + + +def translate_kafka_configuration(config: dict) -> dict: + """Transform a dict with default Kafka configuration to kafka-python configuration.""" + lib_config = {} + + if not config: + return {} + + keys_translation = { + "bootstrap.servers": "bootstrap_servers", + "ssl.ca.location": "ssl_cafile", + "sasl.mechanisms": "sasl_mechanism", + "sasl.username": "sasl_plain_username", + "sasl.password": "sasl_plain_password", + "security.protocol": "security_protocol", + } + + for kafka_key, lib_key in keys_translation.items(): + if kafka_key not in config: + continue + + lib_config[lib_key] = config[kafka_key] + + return lib_config diff --git a/ccx_messaging/watchers/payload_tracker_watcher.py b/ccx_messaging/watchers/payload_tracker_watcher.py index b7ec10f..1cbe78d 100644 --- a/ccx_messaging/watchers/payload_tracker_watcher.py +++ b/ccx_messaging/watchers/payload_tracker_watcher.py @@ -20,7 +20,7 @@ from kafka import KafkaProducer -from ccx_messaging.utils.kafka_config import producer_config +from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration from ccx_messaging.watchers.consumer_watcher import ConsumerWatcher LOG = logging.getLogger(__name__) @@ -29,16 +29,29 @@ class PayloadTrackerWatcher(ConsumerWatcher): """`Watcher` implementation to handle Payload Tracker updates.""" - def __init__(self, bootstrap_servers, topic, service_name="ccx-data-pipeline", **kwargs): + def __init__( + self, + bootstrap_servers, + topic, + service_name="ccx-data-pipeline", + kafka_broker_config=None, + **kwargs + ): """Construct a `PayloadTrackerWatcher` object.""" self.topic = topic if not self.topic: raise KeyError("topic") - self.kafka_prod = KafkaProducer( - bootstrap_servers=bootstrap_servers, **producer_config(kwargs) - ) + kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) + kwargs.update(kafka_broker_cfg) + + if "bootstrap_servers" not in kwargs: + kwargs["bootstrap_servers"] = bootstrap_servers + + kwargs = producer_config(kwargs) + LOG.debug("Kafka publisher for PayloadTrackerWatcher configuration arguments: %s", kwargs) + self.kafka_prod = KafkaProducer(**kwargs) self.service_name = service_name LOG.info( diff --git a/test/consumers/consumer_test.py b/test/consumers/consumer_test.py index 4513c9e..af4906d 100644 --- a/test/consumers/consumer_test.py +++ b/test/consumers/consumer_test.py @@ -29,7 +29,7 @@ from ccx_messaging.consumers.consumer import AnemicConsumer from ccx_messaging.error import CCXMessagingError -from test.utils import ( +from test.test_utils import ( mock_consumer_record, mock_consumer_process_no_action_catch_exception, ) diff --git a/test/consumers/kafka_consumer_test.py b/test/consumers/kafka_consumer_test.py index 229dbec..1362688 100644 --- a/test/consumers/kafka_consumer_test.py +++ b/test/consumers/kafka_consumer_test.py @@ -239,9 +239,13 @@ def test_consumer_init_direct(topic, group, server): """Test of our Consumer constructor, using direct configuration options.""" with patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") as mock_consumer_init: with patch("os.environ", new=dict()): - KafkaConsumer(None, None, None, topic, group_id=group, bootstrap_servers=[server]) + kwargs = { + "group.id": group, + "bootstrap.servers": server, + } + KafkaConsumer(None, None, None, topic, **kwargs) config = { - "bootstrap.servers": [server], + "bootstrap.servers": server, "group.id": group, "retry.backoff.ms": 1000, } diff --git a/test/utils.py b/test/test_utils.py similarity index 100% rename from test/utils.py rename to test/test_utils.py diff --git a/test/utils/__init__.py b/test/utils/__init__.py new file mode 100644 index 0000000..8116989 --- /dev/null +++ b/test/utils/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2023 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the ccx_messaging.utils submodule.""" diff --git a/test/utils/kafka_config_test.py b/test/utils/kafka_config_test.py new file mode 100644 index 0000000..1e0c333 --- /dev/null +++ b/test/utils/kafka_config_test.py @@ -0,0 +1,57 @@ +# Copyright 2023 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test for the ccx_messaging.utils.kafka_config module.""" + +from ccx_messaging.utils.kafka_config import translate_kafka_configuration + + +def test_translate_kafka_configuration(): + """Check the translation of a default Kafka config to kafka-python configuration.""" + kafka_config = { + "bootstrap.servers": "amazing-broker:9092", + "ssl.ca.location": "/tmp/awesome.crt", + "sasl.mechanisms": "PLAIN", + "sasl.username": "incredible_user", + "sasl.password": "supersecret_password", + "security.protocol": "sasl_ssl", + } + + expected_lib_config = { + "bootstrap_servers": "amazing-broker:9092", + "ssl_cafile": "/tmp/awesome.crt", + "sasl_mechanism": "PLAIN", + "sasl_plain_username": "incredible_user", + "sasl_plain_password": "supersecret_password", + "security_protocol": "sasl_ssl", + } + + assert translate_kafka_configuration(kafka_config) == expected_lib_config + + +def test_translate_kafka_configuration_unexpected_keys_ignored(): + """Check the translation ignore unexpected broker configuration prameters.""" + kafka_config = { + "retry_backoff_ms": 500, + "should_ignore_key": "I shouldn't be there", + "sasl.password": "supersecret_password", + "security.protocol": "sasl_ssl", + } + + expected_lib_config = { + "sasl_plain_password": "supersecret_password", + "security_protocol": "sasl_ssl", + } + + assert translate_kafka_configuration(kafka_config) == expected_lib_config