Skip to content

Commit

Permalink
Merge pull request #50 from RedHatInsights/improve_clowder_config_inj…
Browse files Browse the repository at this point in the history
…ection

Improve Kafka clowder configuration injection
  • Loading branch information
joselsegura authored Mar 22, 2023
2 parents 968cf71 + 1313d3f commit f1d40b6
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 48 deletions.
7 changes: 5 additions & 2 deletions ccx_messaging/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.schemas import INPUT_MESSAGE_SCHEMA, IDENTITY_SCHEMA
from ccx_messaging.utils.kafka_config import producer_config
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,6 +64,7 @@ def __init__(
downloader,
engine,
incoming_topic,
kafka_broker_config=None,
dead_letter_queue_topic=None,
max_record_age=7200,
retry_backoff_ms=1000,
Expand All @@ -89,6 +90,9 @@ def __init__(

super().__init__(publisher, downloader, engine, requeuer=requeuer)

kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config)
kwargs.update(kafka_broker_cfg)

self.consumer = KafkaConsumer(
incoming_topic,
value_deserializer=self.deserialize,
Expand All @@ -113,7 +117,6 @@ def __init__(

if self.dead_letter_queue_topic is not None:
dlq_producer_config = producer_config(kwargs)

self.dlq_producer = KafkaProducer(**dlq_producer_config)

def _consume(self, msg):
Expand Down
28 changes: 15 additions & 13 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.ingress import parse_ingress_message
from ccx_messaging.utils.kafka_config import producer_config
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration


LOG = logging.getLogger(__name__)
Expand All @@ -26,36 +26,37 @@ def __init__(
downloader,
engine,
incoming_topic,
kafka_broker_config=None,
platform_service=None,
dead_letter_queue_topic=None,
max_record_age=7200,
retry_backoff_ms=1000,
processing_timeout_s=0,
**kwargs,
):
"""Initialise the KafkaConsumer object and related handlers."""
requeuer = kwargs.pop("requeuer", None)
super().__init__(publisher, downloader, engine, requeuer=requeuer)

# Confluent initialization
LOG.info(
"Consuming topic '%s' from brokers %s as group '%s'",
incoming_topic,
kwargs.get("bootstrap_servers", None),
kwargs.get("group_id", None),
kwargs.get("bootstrap.servers", None),
kwargs.get("group.id", None),
)

# Confluent initialization
config = {
"bootstrap.servers": kwargs.get("bootstrap_servers", ""),
"group.id": kwargs.get("group_id", None),
"retry.backoff.ms": retry_backoff_ms,
}
if kafka_broker_config:
kwargs.update(kafka_broker_config)

if "retry.backoff.ms" not in kwargs:
kwargs["retry.backoff.ms"] = 1000

self.consumer = ConfluentConsumer(config)
LOG.debug("Confluent Kafka consumer configuration arguments: %s", kwargs)
self.consumer = ConfluentConsumer(kwargs)
self.consumer.subscribe([incoming_topic])

# Self handled vars
self.log_pattern = f"topic: {incoming_topic}, group_id: {kwargs.get('group_id', None)}"
self.log_pattern = f"topic: {incoming_topic}, group.id: {kwargs.get('group.id', None)}"

# Service to filter in messages
self.platform_service = platform_service
Expand All @@ -74,7 +75,8 @@ def __init__(
self.dead_letter_queue_topic = dead_letter_queue_topic

if self.dead_letter_queue_topic is not None:
dlq_producer_config = producer_config(kwargs)
config = translate_kafka_configuration(kafka_broker_config)
dlq_producer_config = producer_config(config)
self.dlq_producer = KafkaProducer(**dlq_producer_config)

def get_url(self, input_msg: dict) -> str:
Expand Down
18 changes: 13 additions & 5 deletions ccx_messaging/publishers/data_pipeline_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from kafka import KafkaProducer

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.utils.kafka_config import producer_config
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration

LOG = logging.getLogger(__name__)

Expand All @@ -37,16 +37,24 @@ class DataPipelinePublisher(Publisher):
Custom error handling for the whole pipeline is implemented here.
"""

def __init__(self, outgoing_topic, bootstrap_servers, **kwargs):
def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs):
"""Construct a new `DataPipelinePublisher` given `kwargs` from the config YAML."""
self.topic = outgoing_topic
self.bootstrap_servers = bootstrap_servers
if self.topic is None:
raise KeyError("outgoing_topic")

self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers, **producer_config(kwargs)
)
kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config)
kwargs.update(kafka_broker_cfg)

if "bootstrap_servers" not in kwargs:
kwargs["bootstrap_servers"] = self.bootstrap_servers
else:
self.bootstrap_servers = kwargs["bootstrap_servers"]

kwargs = producer_config(kwargs)
LOG.debug("Kafka publisher configuration arguments: %s", kwargs)
self.producer = KafkaProducer(**kwargs)
LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers)
self.outdata_schema_version = 2

Expand Down
11 changes: 9 additions & 2 deletions ccx_messaging/publishers/sha_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kafka import KafkaProducer

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration

LOG = logging.getLogger(__name__)

Expand All @@ -36,14 +37,20 @@ class SHAPublisher(Publisher):
Custom error handling for the whole pipeline is implemented here.
"""

def __init__(self, outgoing_topic, bootstrap_servers, **kwargs):
def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs):
"""Construct a new `SHAPublisher` given `kwargs` from the config YAML."""
self.topic = outgoing_topic
self.bootstrap_servers = bootstrap_servers
if self.topic is None:
raise KeyError("outgoing_topic")

self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, **kwargs)
kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config)
kwargs.update(kafka_broker_cfg)

if "bootstrap_servers" not in kwargs:
kwargs["bootstrap_servers"] = self.bootstrap_servers

self.producer = KafkaProducer(**producer_config(kwargs))
LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers)
self.outdata_schema_version = 2

Expand Down
30 changes: 12 additions & 18 deletions ccx_messaging/utils/clowder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,52 +30,46 @@ def apply_clowder_config(manifest):

# Find the Payload Tracker watcher, as it might be affected by config changes
pt_watcher_name = "ccx_messaging.watchers.payload_tracker_watcher.PayloadTrackerWatcher"
watcher = None
pt_watcher = None
for watcher in config["service"]["watchers"]:
if watcher["name"] == pt_watcher_name:
pt_watcher = watcher
break

clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0]
kafka_url = f"{clowder_broker_config.hostname}:{clowder_broker_config.port}"
logger.debug("Kafka URL: %s", kafka_url)

kafka_broker_config = {
"bootstrap_servers": kafka_url,
}
kafka_broker_config = {"bootstrap.servers": kafka_url}

if clowder_broker_config.cacert:
# Current Kafka library is not able to handle the CA file, only a path to it
# FIXME: Duplicating parameters in order to be used by both Kafka libraries
ssl_ca_location = app_common_python.LoadedConfig.kafka_ca()
kafka_broker_config["ssl_cafile"] = ssl_ca_location
kafka_broker_config["ssl.ca.location"] = ssl_ca_location

if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl":
kafka_broker_config.update(
{
"sasl_mechanism": clowder_broker_config.sasl.saslMechanism,
"sasl.mechanism": clowder_broker_config.sasl.saslMechanism,
"sasl_plain_username": clowder_broker_config.sasl.username,
"sasl.mechanisms": clowder_broker_config.sasl.saslMechanism,
"sasl.username": clowder_broker_config.sasl.username,
"sasl_plain_password": clowder_broker_config.sasl.password,
"sasl.password": clowder_broker_config.sasl.password,
"security_protocol": clowder_broker_config.sasl.securityProtocol,
"security.protocol": clowder_broker_config.sasl.securityProtocol,
}
)

config["service"]["consumer"]["kwargs"].update(kafka_broker_config)
config["service"]["publisher"]["kwargs"].update(kafka_broker_config)
config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config
config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config

if watcher:
watcher["kwargs"].update(kafka_broker_config)
if pt_watcher:
pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config

logger.info("Kafka configuration updated from Clowder configuration")

consumer_topic = config["service"]["consumer"]["kwargs"].get("incoming_topic")
dlq_topic = config["service"]["consumer"]["kwargs"].get("dead_letter_queue_topic")
producer_topic = config["service"]["publisher"]["kwargs"].pop("outgoing_topic")
payload_tracker_topic = watcher["kwargs"].pop("topic")
producer_topic = config["service"]["publisher"]["kwargs"].get("outgoing_topic")
payload_tracker_topic = pt_watcher["kwargs"].pop("topic") if pt_watcher else None

if consumer_topic in app_common_python.KafkaTopics:
topic_cfg = app_common_python.KafkaTopics[consumer_topic]
Expand All @@ -93,9 +87,9 @@ def apply_clowder_config(manifest):
else:
logger.warn("The publisher topic cannot be found in Clowder mapping. It can cause errors")

if payload_tracker_topic in app_common_python.KafkaTopics:
if pt_watcher and payload_tracker_topic in app_common_python.KafkaTopics:
topic_cfg = app_common_python.KafkaTopics[payload_tracker_topic]
watcher["kwargs"]["topic"] = topic_cfg.name
pt_watcher["kwargs"]["topic"] = topic_cfg.name
else:
logger.warn(
"The Payload Tracker watcher topic cannot be found in Clowder mapping. "
Expand Down
25 changes: 25 additions & 0 deletions ccx_messaging/utils/kafka_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,28 @@ def producer_config(config):
"""Clean up the provided configuration in order to be used by a Kafka producer."""
producer_allowed_arguments = list(KafkaProducer.DEFAULT_CONFIG.keys())
return {key: value for key, value in config.items() if key in producer_allowed_arguments}


def translate_kafka_configuration(config: dict) -> dict:
"""Transform a dict with default Kafka configuration to kafka-python configuration."""
lib_config = {}

if not config:
return {}

keys_translation = {
"bootstrap.servers": "bootstrap_servers",
"ssl.ca.location": "ssl_cafile",
"sasl.mechanisms": "sasl_mechanism",
"sasl.username": "sasl_plain_username",
"sasl.password": "sasl_plain_password",
"security.protocol": "security_protocol",
}

for kafka_key, lib_key in keys_translation.items():
if kafka_key not in config:
continue

lib_config[lib_key] = config[kafka_key]

return lib_config
23 changes: 18 additions & 5 deletions ccx_messaging/watchers/payload_tracker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from kafka import KafkaProducer

from ccx_messaging.utils.kafka_config import producer_config
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration
from ccx_messaging.watchers.consumer_watcher import ConsumerWatcher

LOG = logging.getLogger(__name__)
Expand All @@ -29,16 +29,29 @@
class PayloadTrackerWatcher(ConsumerWatcher):
"""`Watcher` implementation to handle Payload Tracker updates."""

def __init__(self, bootstrap_servers, topic, service_name="ccx-data-pipeline", **kwargs):
def __init__(
self,
bootstrap_servers,
topic,
service_name="ccx-data-pipeline",
kafka_broker_config=None,
**kwargs
):
"""Construct a `PayloadTrackerWatcher` object."""
self.topic = topic

if not self.topic:
raise KeyError("topic")

self.kafka_prod = KafkaProducer(
bootstrap_servers=bootstrap_servers, **producer_config(kwargs)
)
kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config)
kwargs.update(kafka_broker_cfg)

if "bootstrap_servers" not in kwargs:
kwargs["bootstrap_servers"] = bootstrap_servers

kwargs = producer_config(kwargs)
LOG.debug("Kafka publisher for PayloadTrackerWatcher configuration arguments: %s", kwargs)
self.kafka_prod = KafkaProducer(**kwargs)
self.service_name = service_name

LOG.info(
Expand Down
2 changes: 1 addition & 1 deletion test/consumers/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ccx_messaging.consumers.consumer import AnemicConsumer
from ccx_messaging.error import CCXMessagingError

from test.utils import (
from test.test_utils import (
mock_consumer_record,
mock_consumer_process_no_action_catch_exception,
)
Expand Down
8 changes: 6 additions & 2 deletions test/consumers/kafka_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,13 @@ def test_consumer_init_direct(topic, group, server):
"""Test of our Consumer constructor, using direct configuration options."""
with patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") as mock_consumer_init:
with patch("os.environ", new=dict()):
KafkaConsumer(None, None, None, topic, group_id=group, bootstrap_servers=[server])
kwargs = {
"group.id": group,
"bootstrap.servers": server,
}
KafkaConsumer(None, None, None, topic, **kwargs)
config = {
"bootstrap.servers": [server],
"bootstrap.servers": server,
"group.id": group,
"retry.backoff.ms": 1000,
}
Expand Down
File renamed without changes.
15 changes: 15 additions & 0 deletions test/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2023 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the ccx_messaging.utils submodule."""
Loading

0 comments on commit f1d40b6

Please sign in to comment.