Skip to content

Commit

Permalink
Merge pull request #54 from RedHatInsights/remove_kafka_credentials_f…
Browse files Browse the repository at this point in the history
…rom_logs

Remove sensitive info from logs
  • Loading branch information
joselsegura authored Mar 27, 2023
2 parents c8fa05c + 929bac1 commit cd48c7d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
14 changes: 13 additions & 1 deletion ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,19 @@ def __init__(
if "retry.backoff.ms" not in kwargs:
kwargs["retry.backoff.ms"] = 1000

LOG.debug("Confluent Kafka consumer configuration arguments: %s", kwargs)
LOG.debug(
"Confluent Kafka consumer configuration arguments: "
"Group: %s. "
"Server: %s. "
"Topic: %s. "
"Security protocol: %s. "
"",
kwargs.get("group.id"),
kwargs.get("bootstrap.servers"),
incoming_topic,
kwargs.get("security.protocol"),
)

self.consumer = ConfluentConsumer(kwargs)
self.consumer.subscribe([incoming_topic])

Expand Down
12 changes: 11 additions & 1 deletion ccx_messaging/publishers/data_pipeline_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,17 @@ def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None,
self.bootstrap_servers = kwargs["bootstrap_servers"]

kwargs = producer_config(kwargs)
LOG.debug("Kafka publisher configuration arguments: %s", kwargs)
LOG.debug(
"Confluent Kafka publisher configuration arguments: "
"Server: %s. "
"Topic: %s. "
"Security protocol: %s. "
"",
kwargs.get("bootstrap_servers"),
self.topic,
kwargs.get("security_protocol"),
)

self.producer = KafkaProducer(**kwargs)
LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers)
self.outdata_schema_version = 2
Expand Down
9 changes: 9 additions & 0 deletions ccx_messaging/publishers/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def __init__(self, outgoing_topic: str, kafka_broker_config: dict = None, **kwar
if "bootstrap.servers" not in kwargs:
raise KafkaException("Broker not configured")

log.debug(
"Confluent Kafka consumer configuration arguments: "
"Server: %s. "
"Topic: %s. "
"Security protocol: %s.",
kwargs.get("bootstrap.servers"),
self.topic,
kwargs.get("security.protocol"),
)
self.producer = Producer(kwargs)
log.info(
"Producing to topic '%s' on brokers %s", self.topic, kwargs.get("bootstrap.servers")
Expand Down

0 comments on commit cd48c7d

Please sign in to comment.