Skip to content

Commit

Permalink
Use all Kafka bootstrap servers from Clowder config
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Nov 30, 2023
1 parent e0994cd commit 69525db
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 30 deletions.
2 changes: 1 addition & 1 deletion ccx_messaging/publishers/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def publish(self, input_msg: dict, report: str):
raise NotImplementedError()

def error(self, input_msg: dict, ex: Exception):
"""Handle pipeline errores by logging them."""
"""Handle pipeline errors by logging them."""
# The super call is probably unnecessary because the default behavior
# is to do nothing, but let's call it in case it ever does anything.
super().error(input_msg, ex)
Expand Down
65 changes: 36 additions & 29 deletions ccx_messaging/utils/clowder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,42 @@ def apply_clowder_config(manifest):
pt_watcher = watcher
break

clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0]
kafka_url = f"{clowder_broker_config.hostname}:{clowder_broker_config.port}"
logger.debug("Kafka URL: %s", kafka_url)

kafka_broker_config = {"bootstrap.servers": kafka_url}

if clowder_broker_config.cacert:
# Current Kafka library is not able to handle the CA file, only a path to it
# FIXME: Duplicating parameters in order to be used by both Kafka libraries
ssl_ca_location = app_common_python.LoadedConfig.kafka_ca()
kafka_broker_config["ssl.ca.location"] = ssl_ca_location

if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl":
kafka_broker_config.update(
{
"sasl.mechanisms": clowder_broker_config.sasl.saslMechanism,
"sasl.username": clowder_broker_config.sasl.username,
"sasl.password": clowder_broker_config.sasl.password,
"security.protocol": clowder_broker_config.sasl.securityProtocol,
}
)

config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config
config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config

if pt_watcher:
pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config

logger.info("Kafka configuration updated from Clowder configuration")
numBrokers = app_common_python.LoadedConfig.kafka.brokers
if numBrokers > 0:
broker_addresses = []
for broker in app_common_python.LoadedConfig.kafka.brokers:
if broker.Port:
broker_addresses.append(f"{broker.Hostname}:{broker.Port}")
else:
broker_addresses.append(broker.Hostname)
bootstrap_servers = "".join(broker_addresses, ",")
logger.debug("Kafka bootstrap server URLs: %s", bootstrap_servers)
kafka_broker_config = {"bootstrap.servers": bootstrap_servers}

clowder_broker_config = app_common_python.LoadedConfig.kafka.brokers[0]
if clowder_broker_config.cacert:
# Current Kafka library is not able to handle the CA file, only a path to it
# FIXME: Duplicating parameters in order to be used by both Kafka libraries
ssl_ca_location = app_common_python.LoadedConfig.kafka_ca()
kafka_broker_config["ssl.ca.location"] = ssl_ca_location

if BrokerConfigAuthtypeEnum.valueAsString(clowder_broker_config.authtype) == "sasl":
kafka_broker_config.update(
{
"sasl.mechanisms": clowder_broker_config.sasl.saslMechanism,
"sasl.username": clowder_broker_config.sasl.username,
"sasl.password": clowder_broker_config.sasl.password,
"security.protocol": clowder_broker_config.sasl.securityProtocol,
}
)

config["service"]["consumer"]["kwargs"]["kafka_broker_config"] = kafka_broker_config
config["service"]["publisher"]["kwargs"]["kafka_broker_config"] = kafka_broker_config

if pt_watcher:
pt_watcher["kwargs"]["kafka_broker_config"] = kafka_broker_config

logger.info("Kafka configuration updated from Clowder configuration")

consumer_topic = config["service"]["consumer"]["kwargs"].get("incoming_topic")
dlq_topic = config["service"]["consumer"]["kwargs"].get("dead_letter_queue_topic")
Expand Down

0 comments on commit 69525db

Please sign in to comment.