Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: move logic out of client for easier unit testing #19183

Merged
merged 10 commits into from
Dec 5, 2024
314 changes: 85 additions & 229 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition
from confluent_kafka.admin import AdminClient

from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID


class KafkaClient:
def __init__(self, config, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self.topic_partition_cache = {}
self._consumer = None

@property
def kafka_client(self):
Expand All @@ -31,7 +29,7 @@ def kafka_client(self):

return self._kafka_client

def __create_consumer(self, consumer_group):
def open_consumer(self, consumer_group):
nubtron marked this conversation as resolved.
Show resolved Hide resolved
config = {
"bootstrap.servers": self.config._kafka_connect_str,
"group.id": consumer_group,
Expand All @@ -41,7 +39,12 @@ def __create_consumer(self, consumer_group):
}
config.update(self.__get_authentication_config())

return Consumer(config, logger=self.log)
self._consumer = Consumer(config, logger=self.log)
self.log.debug("Consumer instance %s created for group %s", self._consumer, consumer_group)

def close_consumer(self):
self.log.debug("Closing consumer instance %s", self._consumer)
self._consumer.close()

def __get_authentication_config(self):
config = {
Expand Down Expand Up @@ -79,252 +82,105 @@ def __get_authentication_config(self):

return config

def get_highwater_offsets(self, consumer_offsets):
self.log.debug('Getting highwater offsets')

cluster_id = ""
highwater_offsets = {}
topics_with_consumer_offset = set()
topic_partition_with_consumer_offset = set()

if not self.config._monitor_all_broker_highwatermarks:
for _, topic, partition in consumer_offsets:
topics_with_consumer_offset.add(topic)
topic_partition_with_consumer_offset.add((topic, partition))

topic_partition_checked = set()

for consumer_group, _topic, _partition in consumer_offsets:
self.log.debug('CONSUMER GROUP: %s', consumer_group)
if (_topic, _partition) in topic_partition_checked:
self.log.debug('Highwater offset already collected for topic %s with partition %s', _topic, _partition)
continue

topic_partitions_for_highwater_offsets = set()

consumer = self.__create_consumer(consumer_group)
self.log.debug("Consumer instance %s created for group %s", consumer, consumer_group)
cluster_metadata = consumer.list_topics(timeout=self.config._request_timeout)
try:
cluster_id = cluster_metadata.cluster_id
except AttributeError:
self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group)
topics = cluster_metadata.topics

for topic in topics:
if topic in KAFKA_INTERNAL_TOPICS:
self.log.debug("Skipping internal topic %s", topic)
continue
if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset:
self.log.debug("Skipping non-relevant topic %s", topic)
continue

for partition in topics[topic].partitions:
if (
self.config._monitor_all_broker_highwatermarks
or (topic, partition) in topic_partition_with_consumer_offset
):
# Setting offset to -1 will return the latest highwater offset while calling offsets_for_times
# Reference: https://github.com/fede1024/rust-rdkafka/issues/460
topic_partitions_for_highwater_offsets.add(
TopicPartition(topic=topic, partition=partition, offset=-1)
)
self.log.debug('TOPIC: %s', topic)
self.log.debug('PARTITION: %s', partition)
else:
self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic)

if len(topic_partitions_for_highwater_offsets) > 0:
self.log.debug(
'Querying %s highwater offsets for consumer group %s',
len(topic_partitions_for_highwater_offsets),
consumer_group,
)
for topic_partition_with_highwater_offset in consumer.offsets_for_times(
partitions=list(topic_partitions_for_highwater_offsets),
timeout=self.config._request_timeout,
):
self.log.debug('Topic partition with highwater offset: %s', topic_partition_with_highwater_offset)
topic = topic_partition_with_highwater_offset.topic
partition = topic_partition_with_highwater_offset.partition
offset = topic_partition_with_highwater_offset.offset
highwater_offsets[(topic, partition)] = offset
self.log.debug("Adding %s %s to checked set to facilitate early exit", topic, partition)
topic_partition_checked.add((topic, partition))
else:
self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group)

self.log.debug("Closing consumer instance %s", consumer)
consumer.close()

self.log.debug('Got %s highwater offsets', len(highwater_offsets))
return highwater_offsets, cluster_id
def consumer_get_cluster_id_and_list_topics(self, consumer_group):
cluster_metadata = self._consumer.list_topics(timeout=self.config._request_timeout)
try:
# TODO: remove this try-except, the attribute is always present.
nubtron marked this conversation as resolved.
Show resolved Hide resolved
cluster_id = cluster_metadata.cluster_id
except AttributeError:
self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group)
return "", []
return (cluster_id, [(name, list(metadata.partitions)) for name, metadata in cluster_metadata.topics.items()])

def consumer_offsets_for_times(self, partitions):
topicpartitions_for_querying = [
# Setting offset to -1 will return the latest highwater offset while calling offsets_for_times
# Reference: https://github.com/fede1024/rust-rdkafka/issues/460
TopicPartition(topic=topic, partition=partition, offset=-1)
for topic, partition in partitions
]
return [
(tp.topic, tp.partition, tp.offset)
for tp in self._consumer.offsets_for_times(
partitions=topicpartitions_for_querying, timeout=self.config._request_timeout
)
]

def get_partitions_for_topic(self, topic):
if partitions := self.topic_partition_cache.get(topic):
return partitions

try:
cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout)
except KafkaException as e:
self.log.error("Received exception when getting partitions for topic %s: %s", topic, e)
return None
else:
topic_metadata = cluster_metadata.topics[topic]
partitions = list(topic_metadata.partitions.keys())
self.topic_partition_cache[topic] = partitions
return partitions
return []
topic_metadata = cluster_metadata.topics[topic]
return list(topic_metadata.partitions)

def request_metadata_update(self):
# https://github.com/confluentinc/confluent-kafka-python/issues/594
self.kafka_client.list_topics(None, timeout=self.config._request_timeout)

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))
def list_consumer_groups(self):
groups = []
try:
groups_res = self.kafka_client.list_consumer_groups().result()
for valid_group in groups_res.valid:
self.log.debug("Discovered consumer group: %s", valid_group.group_id)
groups.append(valid_group.group_id)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return groups

def list_consumer_group_offsets(self, groups):
"""
For every group and (optionally) its topics and partitions retrieve consumer offsets.

futures = self._get_consumer_offset_futures(consumer_groups)
self.log.debug('%s futures to be waited on', len(futures))
As input expects a list of tuples: (consumer_group_id, topic_partitions).
topic_partitions are either None to indicate we want all topics and partitions OR a list of (topic, partition).

for future in as_completed(futures):
Returns a list of tuples with members:
1. group id
2. list of tuples: (topic, partition, offset)
"""
futures = []
for consumer_group, topic_partitions in groups:
topic_partitions = (
topic_partitions if topic_partitions is None else [TopicPartition(t, p) for t, p in topic_partitions]
)
futures.append(
self.kafka_client.list_consumer_group_offsets(
[ConsumerGroupTopicPartitions(group_id=consumer_group, topic_partitions=topic_partitions)]
)[consumer_group]
)
offsets = []
for completed in as_completed(futures):
try:
response_offset_info = future.result()
response_offset_info = completed.result()
except KafkaException as e:
self.log.debug("Failed to read consumer offsets for future %s: %s", future, e)
else:
consumer_group = response_offset_info.group_id
topic_partitions = response_offset_info.topic_partitions

self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group)
self.log.debug('RESULT TOPIC PARTITIONS: %s', topic_partitions)

for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition
offset = topic_partition.offset

self.log.debug('RESULTS TOPIC: %s', topic)
self.log.debug('RESULTS PARTITION: %s', partition)
self.log.debug('RESULTS OFFSET: %s', offset)

if topic_partition.error:
self.log.debug(
"Encountered error: %s. Occurred with topic: %s; partition: [%s]",
topic_partition.error.str(),
topic_partition.topic,
str(topic_partition.partition),
)
continue

if offset == OFFSET_INVALID:
continue

if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex:
consumer_offsets[(consumer_group, topic, partition)] = offset
else:
to_match = f"{consumer_group},{topic},{partition}"
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
# Get all consumer groups to monitor
consumer_groups = []
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
consumer_groups_future = self.kafka_client.list_consumer_groups()
try:
list_consumer_groups_result = consumer_groups_future.result()
for valid_consumer_group in list_consumer_groups_result.valid:
self.log.debug("Discovered consumer group: %s", valid_consumer_group.group_id)

consumer_groups.extend(
valid_consumer_group.group_id
for valid_consumer_group in list_consumer_groups_result.valid
if valid_consumer_group.group_id != ""
)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return consumer_groups
else:
return self.config._consumer_groups

def get_consumer_group_state(self, consumer_group):
consumer_group_state = ""
# Get the consumer group state if present
consumer_groups_future = self._describe_consumer_groups(consumer_group)
consumer_groups_result = consumer_groups_future[consumer_group].result()
self.log.debug(
"Consumer group: %s in state %s",
consumer_groups_result.group_id,
consumer_groups_result.state,
)
consumer_group_result_state = str(consumer_groups_result.state)
consumer_group_state = consumer_group_result_state.split('.')[1]

return consumer_group_state

def _list_consumer_group_offsets(self, cg_tp):
"""
:returns: A dict of futures for each group, keyed by the group id.
The future result() method returns :class:`ConsumerGroupTopicPartitions`.

:rtype: dict[str, future]
"""
return self.kafka_client.list_consumer_group_offsets([cg_tp])
self.log.debug("Failed to read consumer offsets for future %s: %s", completed, e)
continue
tpo = []
for tp in response_offset_info.topic_partitions:
if tp.error:
self.log.debug(
"Encountered error: %s. Occurred with topic: %s; partition: [%s]",
tp.error.str(),
tp.topic,
str(tp.partition),
)
continue
tpo.append((tp.topic, tp.partition, tp.offset))
offsets.append((response_offset_info.group_id, tpo))
return offsets

def _describe_consumer_groups(self, consumer_group):
def describe_consumer_groups(self, consumer_group):
"""
:returns: A dict of futures for each group, keyed by the group_id.
The future result() method returns :class:`ConsumerGroupDescription`.

:rtype: dict[str, future]
"""
return self.kafka_client.describe_consumer_groups([consumer_group])
desc = self.kafka_client.describe_consumer_groups([consumer_group])[consumer_group].result()
return (desc.group_id, desc.state.value)

def close_admin_client(self):
self._kafka_client = None

def _get_consumer_offset_futures(self, consumer_groups):
futures = []

# If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later)
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
for consumer_group in consumer_groups:
futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group]
)
return futures

for consumer_group in consumer_groups:
# If topics are specified
topics = consumer_groups.get(consumer_group)
if not topics:
futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group]
)
continue

for topic in topics:
# If partitions are defined
if partitions := topics[topic]:
topic_partitions = [TopicPartition(topic, partition) for partition in partitions]
# If partitions are not defined
else:
# get all the partitions for this topic
partitions = self.get_partitions_for_topic(topic)

topic_partitions = [TopicPartition(topic, partition) for partition in partitions]

futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group, topic_partitions))[
consumer_group
]
)

return futures
Loading
Loading