diff --git a/ccx_messaging/publishers/kafka_publisher.py b/ccx_messaging/publishers/kafka_publisher.py index bf02b5fa..e2c9aab0 100644 --- a/ccx_messaging/publishers/kafka_publisher.py +++ b/ccx_messaging/publishers/kafka_publisher.py @@ -84,7 +84,7 @@ def publish(self, input_msg: dict, report: str): raise NotImplementedError() def error(self, input_msg: dict, ex: Exception): - """Handle pipeline errores by logging them.""" + """Handle pipeline errors by logging them.""" # The super call is probably unnecessary because the default behavior # is to do nothing, but let's call it in case it ever does anything. super().error(input_msg, ex) diff --git a/ccx_messaging/utils/clowder.py b/ccx_messaging/utils/clowder.py index 5837b10f..18562b71 100644 --- a/ccx_messaging/utils/clowder.py +++ b/ccx_messaging/utils/clowder.py @@ -36,35 +36,42 @@ def apply_clowder_config(manifest): pt_watcher = watcher break - clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0] - kafka_url = f"{clowder_broker_config.hostname}:{clowder_broker_config.port}" - logger.debug("Kafka URL: %s", kafka_url) - - kafka_broker_config = {"bootstrap.servers": kafka_url} - - if clowder_broker_config.cacert: - # Current Kafka library is not able to handle the CA file, only a path to it - # FIXME: Duplicating parameters in order to be used by both Kafka libraries - ssl_ca_location = app_common_python.LoadedConfig.kafka_ca() - kafka_broker_config["ssl.ca.location"] = ssl_ca_location - - if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl": - kafka_broker_config.update( - { - "sasl.mechanisms": clowder_broker_config.sasl.saslMechanism, - "sasl.username": clowder_broker_config.sasl.username, - "sasl.password": clowder_broker_config.sasl.password, - "security.protocol": clowder_broker_config.sasl.securityProtocol, - } - ) - - config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config - config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config - - if pt_watcher: - pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config - - logger.info("Kafka configuration updated from Clowder configuration") + numBrokers = app_common_python.LoadedConfig.kafka.brokers + if numBrokers > 0: + broker_addresses = [] + for broker in app_common_python.LoadedConfig.kafka.brokers: + if broker.Port: + broker_addresses.append(f"{broker.Hostname}:{broker.Port}") + else: + broker_addresses.append(broker.Hostname) + bootstrap_servers = "".join(broker_addresses, ",") + logger.debug("Kafka bootstrap server URLs: %s", bootstrap_servers) + kafka_broker_config = {"bootstrap.servers": bootstrap_servers} + + clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0] + if clowder_broker_config.cacert: + # Current Kafka library is not able to handle the CA file, only a path to it + # FIXME: Duplicating parameters in order to be used by both Kafka libraries + ssl_ca_location = app_common_python.LoadedConfig.kafka_ca() + kafka_broker_config["ssl.ca.location"] = ssl_ca_location + + if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl": + kafka_broker_config.update( + { + "sasl.mechanisms": clowder_broker_config.sasl.saslMechanism, + "sasl.username": clowder_broker_config.sasl.username, + "sasl.password": clowder_broker_config.sasl.password, + "security.protocol": clowder_broker_config.sasl.securityProtocol, + } + ) + + config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config + config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config + + if pt_watcher: + pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config + + logger.info("Kafka configuration updated from Clowder configuration") consumer_topic = config["service"]["consumer"]["kwargs"].get("incoming_topic") dlq_topic = config["service"]["consumer"]["kwargs"].get("dead_letter_queue_topic")