diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 288354acd49db..6c93f8e3887fd 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -6,15 +6,13 @@ from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition from confluent_kafka.admin import AdminClient -from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID - class KafkaClient: def __init__(self, config, log) -> None: self.config = config self.log = log self._kafka_client = None - self.topic_partition_cache = {} + self._consumer = None @property def kafka_client(self): @@ -31,7 +29,7 @@ def kafka_client(self): return self._kafka_client - def __create_consumer(self, consumer_group): + def open_consumer(self, consumer_group): config = { "bootstrap.servers": self.config._kafka_connect_str, "group.id": consumer_group, @@ -41,7 +39,12 @@ def __create_consumer(self, consumer_group): } config.update(self.__get_authentication_config()) - return Consumer(config, logger=self.log) + self._consumer = Consumer(config, logger=self.log) + self.log.debug("Consumer instance %s created for group %s", self._consumer, consumer_group) + + def close_consumer(self): + self.log.debug("Closing consumer instance %s", self._consumer) + self._consumer.close() def __get_authentication_config(self): config = { @@ -79,252 +82,105 @@ def __get_authentication_config(self): return config - def get_highwater_offsets(self, consumer_offsets): - self.log.debug('Getting highwater offsets') - - cluster_id = "" - highwater_offsets = {} - topics_with_consumer_offset = set() - topic_partition_with_consumer_offset = set() - - if not self.config._monitor_all_broker_highwatermarks: - for _, topic, partition in consumer_offsets: - topics_with_consumer_offset.add(topic) - topic_partition_with_consumer_offset.add((topic, partition)) - - topic_partition_checked = set() - - for consumer_group, _topic, _partition in consumer_offsets: - self.log.debug('CONSUMER GROUP: %s', consumer_group) - if (_topic, _partition) in topic_partition_checked: - self.log.debug('Highwater offset already collected for topic %s with partition %s', _topic, _partition) - continue - - topic_partitions_for_highwater_offsets = set() - - consumer = self.__create_consumer(consumer_group) - self.log.debug("Consumer instance %s created for group %s", consumer, consumer_group) - cluster_metadata = consumer.list_topics(timeout=self.config._request_timeout) - try: - cluster_id = cluster_metadata.cluster_id - except AttributeError: - self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group) - topics = cluster_metadata.topics - - for topic in topics: - if topic in KAFKA_INTERNAL_TOPICS: - self.log.debug("Skipping internal topic %s", topic) - continue - if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset: - self.log.debug("Skipping non-relevant topic %s", topic) - continue - - for partition in topics[topic].partitions: - if ( - self.config._monitor_all_broker_highwatermarks - or (topic, partition) in topic_partition_with_consumer_offset - ): - # Setting offset to -1 will return the latest highwater offset while calling offsets_for_times - # Reference: https://github.com/fede1024/rust-rdkafka/issues/460 - topic_partitions_for_highwater_offsets.add( - TopicPartition(topic=topic, partition=partition, offset=-1) - ) - self.log.debug('TOPIC: %s', topic) - self.log.debug('PARTITION: %s', partition) - else: - self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic) - - if len(topic_partitions_for_highwater_offsets) > 0: - self.log.debug( - 'Querying %s highwater offsets for consumer group %s', - len(topic_partitions_for_highwater_offsets), - consumer_group, - ) - for topic_partition_with_highwater_offset in consumer.offsets_for_times( - partitions=list(topic_partitions_for_highwater_offsets), - timeout=self.config._request_timeout, - ): - self.log.debug('Topic partition with highwater offset: %s', topic_partition_with_highwater_offset) - topic = topic_partition_with_highwater_offset.topic - partition = topic_partition_with_highwater_offset.partition - offset = topic_partition_with_highwater_offset.offset - highwater_offsets[(topic, partition)] = offset - self.log.debug("Adding %s %s to checked set to facilitate early exit", topic, partition) - topic_partition_checked.add((topic, partition)) - else: - self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group) - - self.log.debug("Closing consumer instance %s", consumer) - consumer.close() - - self.log.debug('Got %s highwater offsets', len(highwater_offsets)) - return highwater_offsets, cluster_id + def consumer_get_cluster_id_and_list_topics(self, consumer_group): + cluster_metadata = self._consumer.list_topics(timeout=self.config._request_timeout) + try: + # TODO: remove this try-except, the attribute is always present. + cluster_id = cluster_metadata.cluster_id + except AttributeError: + self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group) + return "", [] + return (cluster_id, [(name, list(metadata.partitions)) for name, metadata in cluster_metadata.topics.items()]) + + def consumer_offsets_for_times(self, partitions): + topicpartitions_for_querying = [ + # Setting offset to -1 will return the latest highwater offset while calling offsets_for_times + # Reference: https://github.com/fede1024/rust-rdkafka/issues/460 + TopicPartition(topic=topic, partition=partition, offset=-1) + for topic, partition in partitions + ] + return [ + (tp.topic, tp.partition, tp.offset) + for tp in self._consumer.offsets_for_times( + partitions=topicpartitions_for_querying, timeout=self.config._request_timeout + ) + ] def get_partitions_for_topic(self, topic): - if partitions := self.topic_partition_cache.get(topic): - return partitions - try: cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout) except KafkaException as e: self.log.error("Received exception when getting partitions for topic %s: %s", topic, e) - return None - else: - topic_metadata = cluster_metadata.topics[topic] - partitions = list(topic_metadata.partitions.keys()) - self.topic_partition_cache[topic] = partitions - return partitions + return [] + topic_metadata = cluster_metadata.topics[topic] + return list(topic_metadata.partitions) def request_metadata_update(self): # https://github.com/confluentinc/confluent-kafka-python/issues/594 self.kafka_client.list_topics(None, timeout=self.config._request_timeout) - def get_consumer_offsets(self): - # {(consumer_group, topic, partition): offset} - self.log.debug('Getting consumer offsets') - consumer_offsets = {} - - consumer_groups = self._get_consumer_groups() - self.log.debug('Identified %s consumer groups', len(consumer_groups)) + def list_consumer_groups(self): + groups = [] + try: + groups_res = self.kafka_client.list_consumer_groups().result() + for valid_group in groups_res.valid: + self.log.debug("Discovered consumer group: %s", valid_group.group_id) + groups.append(valid_group.group_id) + except Exception as e: + self.log.error("Failed to collect consumer groups: %s", e) + return groups + + def list_consumer_group_offsets(self, groups): + """ + For every group and (optionally) its topics and partitions retrieve consumer offsets. - futures = self._get_consumer_offset_futures(consumer_groups) - self.log.debug('%s futures to be waited on', len(futures)) + As input expects a list of tuples: (consumer_group_id, topic_partitions). + topic_partitions are either None to indicate we want all topics and partitions OR a list of (topic, partition). - for future in as_completed(futures): + Returns a list of tuples with members: + 1. group id + 2. list of tuples: (topic, partition, offset) + """ + futures = [] + for consumer_group, topic_partitions in groups: + topic_partitions = ( + topic_partitions if topic_partitions is None else [TopicPartition(t, p) for t, p in topic_partitions] + ) + futures.append( + self.kafka_client.list_consumer_group_offsets( + [ConsumerGroupTopicPartitions(group_id=consumer_group, topic_partitions=topic_partitions)] + )[consumer_group] + ) + offsets = [] + for completed in as_completed(futures): try: - response_offset_info = future.result() + response_offset_info = completed.result() except KafkaException as e: - self.log.debug("Failed to read consumer offsets for future %s: %s", future, e) - else: - consumer_group = response_offset_info.group_id - topic_partitions = response_offset_info.topic_partitions - - self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group) - self.log.debug('RESULT TOPIC PARTITIONS: %s', topic_partitions) - - for topic_partition in topic_partitions: - topic = topic_partition.topic - partition = topic_partition.partition - offset = topic_partition.offset - - self.log.debug('RESULTS TOPIC: %s', topic) - self.log.debug('RESULTS PARTITION: %s', partition) - self.log.debug('RESULTS OFFSET: %s', offset) - - if topic_partition.error: - self.log.debug( - "Encountered error: %s. Occurred with topic: %s; partition: [%s]", - topic_partition.error.str(), - topic_partition.topic, - str(topic_partition.partition), - ) - continue - - if offset == OFFSET_INVALID: - continue - - if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex: - consumer_offsets[(consumer_group, topic, partition)] = offset - else: - to_match = f"{consumer_group},{topic},{partition}" - if self.config._consumer_groups_compiled_regex.match(to_match): - consumer_offsets[(consumer_group, topic, partition)] = offset - - self.log.debug('Got %s consumer offsets', len(consumer_offsets)) - return consumer_offsets - - def _get_consumer_groups(self): - # Get all consumer groups to monitor - consumer_groups = [] - if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex: - consumer_groups_future = self.kafka_client.list_consumer_groups() - try: - list_consumer_groups_result = consumer_groups_future.result() - for valid_consumer_group in list_consumer_groups_result.valid: - self.log.debug("Discovered consumer group: %s", valid_consumer_group.group_id) - - consumer_groups.extend( - valid_consumer_group.group_id - for valid_consumer_group in list_consumer_groups_result.valid - if valid_consumer_group.group_id != "" - ) - except Exception as e: - self.log.error("Failed to collect consumer groups: %s", e) - return consumer_groups - else: - return self.config._consumer_groups - - def get_consumer_group_state(self, consumer_group): - consumer_group_state = "" - # Get the consumer group state if present - consumer_groups_future = self._describe_consumer_groups(consumer_group) - consumer_groups_result = consumer_groups_future[consumer_group].result() - self.log.debug( - "Consumer group: %s in state %s", - consumer_groups_result.group_id, - consumer_groups_result.state, - ) - consumer_group_result_state = str(consumer_groups_result.state) - consumer_group_state = consumer_group_result_state.split('.')[1] - - return consumer_group_state - - def _list_consumer_group_offsets(self, cg_tp): - """ - :returns: A dict of futures for each group, keyed by the group id. - The future result() method returns :class:`ConsumerGroupTopicPartitions`. - - :rtype: dict[str, future] - """ - return self.kafka_client.list_consumer_group_offsets([cg_tp]) + self.log.debug("Failed to read consumer offsets for future %s: %s", completed, e) + continue + tpo = [] + for tp in response_offset_info.topic_partitions: + if tp.error: + self.log.debug( + "Encountered error: %s. Occurred with topic: %s; partition: [%s]", + tp.error.str(), + tp.topic, + str(tp.partition), + ) + continue + tpo.append((tp.topic, tp.partition, tp.offset)) + offsets.append((response_offset_info.group_id, tpo)) + return offsets - def _describe_consumer_groups(self, consumer_group): + def describe_consumer_groups(self, consumer_group): """ :returns: A dict of futures for each group, keyed by the group_id. The future result() method returns :class:`ConsumerGroupDescription`. :rtype: dict[str, future] """ - return self.kafka_client.describe_consumer_groups([consumer_group]) + desc = self.kafka_client.describe_consumer_groups([consumer_group])[consumer_group].result() + return (desc.group_id, desc.state.value) def close_admin_client(self): self._kafka_client = None - - def _get_consumer_offset_futures(self, consumer_groups): - futures = [] - - # If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later) - if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex: - for consumer_group in consumer_groups: - futures.append( - self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group] - ) - return futures - - for consumer_group in consumer_groups: - # If topics are specified - topics = consumer_groups.get(consumer_group) - if not topics: - futures.append( - self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group] - ) - continue - - for topic in topics: - # If partitions are defined - if partitions := topics[topic]: - topic_partitions = [TopicPartition(topic, partition) for partition in partitions] - # If partitions are not defined - else: - # get all the partitions for this topic - partitions = self.get_partitions_for_topic(topic) - - topic_partitions = [TopicPartition(topic, partition) for partition in partitions] - - futures.append( - self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group, topic_partitions))[ - consumer_group - ] - ) - - return futures diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index ca5887039aed5..554ef4f6d3373 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -8,6 +8,7 @@ from datadog_checks.base import AgentCheck, is_affirmative from datadog_checks.kafka_consumer.client import KafkaClient from datadog_checks.kafka_consumer.config import KafkaConfig +from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID MAX_TIMESTAMPS = 1000 @@ -22,6 +23,7 @@ def __init__(self, name, init_config, instances): self._data_streams_enabled = is_affirmative(self.instance.get('data_streams_enabled', False)) self._max_timestamps = int(self.instance.get('timestamp_history_size', MAX_TIMESTAMPS)) self.client = KafkaClient(self.config, self.log) + self.topic_partition_cache = {} self.check_initializations.insert(0, self.config.validate_config) def check(self, _): @@ -40,7 +42,7 @@ def check(self, _): try: # Fetch consumer offsets # Expected format: {(consumer_group, topic, partition): offset} - consumer_offsets = self.client.get_consumer_offsets() + consumer_offsets = self.get_consumer_offsets() except Exception: self.log.exception("There was a problem collecting consumer offsets from Kafka.") # don't raise because we might get valid broker offsets @@ -54,7 +56,7 @@ def check(self, _): if len(consumer_offsets) < self._context_limit: # Fetch highwater offsets # Expected format: ({(topic, partition): offset}, cluster_id) - highwater_offsets, cluster_id = self.client.get_highwater_offsets(consumer_offsets) + highwater_offsets, cluster_id = self.get_highwater_offsets(consumer_offsets) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) self._add_broker_timestamps(broker_timestamps, highwater_offsets) @@ -95,6 +97,74 @@ def check(self, _): if self.config._close_admin_client: self.client.close_admin_client() + def get_consumer_offsets(self): + # {(consumer_group, topic, partition): offset} + self.log.debug('Getting consumer offsets') + consumer_offsets = {} + + consumer_groups = self._get_consumer_groups() + self.log.debug('Identified %s consumer groups', len(consumer_groups)) + + offsets = self._get_offsets_for_groups(consumer_groups) + self.log.debug('%s futures to be waited on', len(offsets)) + + for consumer_group, topic_partitions in offsets: + + self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group) + + for topic, partition, offset in topic_partitions: + self.log.debug('RESULTS TOPIC: %s', topic) + self.log.debug('RESULTS PARTITION: %s', partition) + self.log.debug('RESULTS OFFSET: %s', offset) + + if offset == OFFSET_INVALID: + continue + + if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex: + consumer_offsets[(consumer_group, topic, partition)] = offset + else: + to_match = f"{consumer_group},{topic},{partition}" + if self.config._consumer_groups_compiled_regex.match(to_match): + consumer_offsets[(consumer_group, topic, partition)] = offset + + self.log.debug('Got %s consumer offsets', len(consumer_offsets)) + return consumer_offsets + + def _get_consumer_groups(self): + # Get all consumer groups to monitor + if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex: + return [grp for grp in self.client.list_consumer_groups() if grp] + else: + return self.config._consumer_groups + + def _get_offsets_for_groups(self, consumer_groups): + groups = [] + + # If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later) + if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex: + for consumer_group in consumer_groups: + groups.append((consumer_group, None)) + return self.client.list_consumer_group_offsets(groups) + + for consumer_group in consumer_groups: + # If topics are specified + topics = consumer_groups.get(consumer_group) + if not topics: + groups.append((consumer_group, None)) + continue + + for topic, partitions in topics.items(): + if not partitions: + if topic in self.topic_partition_cache: + partitions = self.topic_partition_cache[topic] + else: + partitions = self.topic_partition_cache[topic] = self.client.get_partitions_for_topic(topic) + topic_partitions = [(topic, p) for p in partitions] + + groups.append((consumer_group, topic_partitions)) + + return self.client.list_consumer_group_offsets(groups) + def _load_broker_timestamps(self, persistent_cache_key): """Loads broker timestamps from persistent cache.""" broker_timestamps = defaultdict(dict) @@ -139,7 +209,7 @@ def report_consumer_offsets_and_lag( reported_contexts = 0 self.log.debug("Reporting consumer offsets and lag metrics") for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items(): - consumer_group_state = self.client.get_consumer_group_state(consumer_group) + consumer_group_state = self.get_consumer_group_state(consumer_group) if reported_contexts >= contexts_limit: self.log.debug( "Reported contexts number %s greater than or equal to contexts limit of %s, returning", @@ -207,7 +277,7 @@ def report_consumer_offsets_and_lag( self.gauge('estimated_consumer_lag', lag, tags=consumer_group_tags) reported_contexts += 1 else: - if partitions is None: + if not partitions: msg = ( "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " "in the cluster, so skipping reporting these offsets." @@ -221,6 +291,82 @@ def report_consumer_offsets_and_lag( self.client.request_metadata_update() # force metadata update on next poll() self.log.debug('%s consumer offsets reported', reported_contexts) + def get_consumer_group_state(self, consumer_group): + consumer_group_state = "" + # Get the consumer group state if present + group_id, consumer_group_state = self.client.describe_consumer_groups(consumer_group) + self.log.debug( + "Consumer group: %s in state %s", + group_id, + consumer_group_state, + ) + return consumer_group_state + + def get_highwater_offsets(self, consumer_offsets): + self.log.debug('Getting highwater offsets') + + cluster_id = "" + highwater_offsets = {} + topics_with_consumer_offset = set() + topic_partition_with_consumer_offset = set() + + if not self.config._monitor_all_broker_highwatermarks: + for _, topic, partition in consumer_offsets: + topics_with_consumer_offset.add(topic) + topic_partition_with_consumer_offset.add((topic, partition)) + + topic_partition_checked = set() + + for consumer_group, _topic, _partition in consumer_offsets: + self.log.debug('CONSUMER GROUP: %s', consumer_group) + if (_topic, _partition) in topic_partition_checked: + self.log.debug('Highwater offset already collected for topic %s with partition %s', _topic, _partition) + continue + + topic_partitions_for_highwater_offsets = set() + + self.client.open_consumer(consumer_group) + cluster_id, topics = self.client.consumer_get_cluster_id_and_list_topics(consumer_group) + + for topic, partitions in topics: + if topic in KAFKA_INTERNAL_TOPICS: + self.log.debug("Skipping internal topic %s", topic) + continue + if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset: + self.log.debug("Skipping non-relevant topic %s", topic) + continue + + for partition in partitions: + if ( + self.config._monitor_all_broker_highwatermarks + or (topic, partition) in topic_partition_with_consumer_offset + ): + topic_partitions_for_highwater_offsets.add((topic, partition)) + self.log.debug('TOPIC: %s', topic) + self.log.debug('PARTITION: %s', partition) + else: + self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic) + + if len(topic_partitions_for_highwater_offsets) > 0: + self.log.debug( + 'Querying %s highwater offsets for consumer group %s', + len(topic_partitions_for_highwater_offsets), + consumer_group, + ) + for topic, partition, offset in self.client.consumer_offsets_for_times( + partitions=topic_partitions_for_highwater_offsets + ): + highwater_offsets[(topic, partition)] = offset + self.log.debug("Adding %s %s to checked set to facilitate early exit", topic, partition) + topic_partition_checked.add((topic, partition)) + else: + self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group) + + self.client.close_consumer() + + self.log.debug('Got %s highwater offsets', len(highwater_offsets)) + return highwater_offsets, cluster_id + def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" event_dict = { diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 569a4e8915c9e..f38bda65e8c23 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -1,20 +1,50 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -import concurrent.futures import logging from contextlib import nullcontext as does_not_raise import mock import pytest -from confluent_kafka.admin._group import ConsumerGroupListing, ListConsumerGroupsResult from datadog_checks.kafka_consumer import KafkaCheck +from datadog_checks.kafka_consumer.client import KafkaClient from datadog_checks.kafka_consumer.kafka_consumer import _get_interpolated_timestamp pytestmark = [pytest.mark.unit] +def fake_consumer_offsets_for_times(partitions): + """In our testing environment the offset is 80 for all partitions and topics.""" + + return [(t, p, 80) for t, p in partitions] + + +def seed_mock_client(): + """Set some common defaults for the mock client to kafka.""" + client = mock.create_autospec(KafkaClient) + client.list_consumer_groups.return_value = ["consumer_group1"] + client.get_partitions_for_topic.return_value = ['partition1'] + client.list_consumer_group_offsets.return_value = [("consumer_group1", [("topic1", "partition1", 2)])] + client.describe_consumer_groups.return_value = ('consumer_group', 'STABLE') + client.consumer_get_cluster_id_and_list_topics.return_value = ( + "cluster_id", + # topics + [ + # Used in unit tets + ('topic1', ["partition1"]), + ('topic2', ["partition2"]), + # Copied from integration tests + ('dc', [0, 1]), + ('unconsumed_topic', [0, 1]), + ('marvel', [0, 1]), + ('__consumer_offsets', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ], + ) + client.consumer_offsets_for_times = fake_consumer_offsets_for_times + return client + + @pytest.mark.parametrize( 'legacy_config, kafka_client_config, value', [ @@ -149,24 +179,15 @@ def test_oauth_config( # TODO: After these tests are finished and the revamp is complete, # the tests should be refactored to be parameters instead of separate tests -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") -def test_when_consumer_lag_less_than_zero_then_emit_event( - mock_generic_client, check, kafka_instance, dd_run_check, aggregator -): +def test_when_consumer_lag_less_than_zero_then_emit_event(check, kafka_instance, dd_run_check, aggregator): # Given - # consumer_offset = {(consumer_group, topic, partition): offset} - consumer_offset = {("consumer_group1", "topic1", "partition1"): 2} - # highwater_offset = {(topic, partition): offset} - highwater_offset = {("topic1", "partition1"): 1} - mock_client = mock.MagicMock() - mock_client.get_consumer_offsets.return_value = consumer_offset - mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id") - mock_client.get_partitions_for_topic.return_value = ['partition1'] - mock_client.get_consumer_group_state.return_value = "STABLE" - mock_generic_client.return_value = mock_client + mock_client = seed_mock_client() + # We need the consumer offset to be higher than the highwater offset. + mock_client.list_consumer_group_offsets.return_value = [("consumer_group1", [("topic1", "partition1", 81)])] + kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = mock_client # When - kafka_consumer_check = check(kafka_instance) dd_run_check(kafka_consumer_check) # Then @@ -216,24 +237,16 @@ def test_when_consumer_lag_less_than_zero_then_emit_event( ) -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") -def test_when_partition_is_none_then_emit_warning_log( - mock_generic_client, check, kafka_instance, dd_run_check, aggregator, caplog -): +def test_when_no_partitions_then_emit_warning_log(check, kafka_instance, dd_run_check, aggregator, caplog): # Given - # consumer_offset = {(consumer_group, topic, partition): offset} - consumer_offset = {("consumer_group1", "topic1", "partition1"): 2} - # highwater_offset = {(topic, partition): offset} - highwater_offset = {("topic1", "partition1"): 1} - mock_client = mock.MagicMock() - mock_client.get_consumer_offsets.return_value = consumer_offset - mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id") - mock_client.get_partitions_for_topic.return_value = None - mock_generic_client.return_value = mock_client caplog.set_level(logging.WARNING) - # When + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = [] kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = mock_client + + # When dd_run_check(kafka_consumer_check) # Then @@ -261,24 +274,18 @@ def test_when_partition_is_none_then_emit_warning_log( assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_partition_not_in_partitions_then_emit_warning_log( - mock_generic_client, check, kafka_instance, dd_run_check, aggregator, caplog + check, kafka_instance, dd_run_check, aggregator, caplog ): # Given - # consumer_offset = {(consumer_group, topic, partition): offset} - consumer_offset = {("consumer_group1", "topic1", "partition1"): 2} - # highwater_offset = {(topic, partition): offset} - highwater_offset = {("topic1", "partition1"): 1} - mock_client = mock.MagicMock() - mock_client.get_consumer_offsets.return_value = consumer_offset - mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id") - mock_client.get_partitions_for_topic.return_value = ['partition2'] - mock_generic_client.return_value = mock_client caplog.set_level(logging.WARNING) - # When + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = ['partition2'] kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = mock_client + + # When dd_run_check(kafka_consumer_check) # Then @@ -306,54 +313,44 @@ def test_when_partition_not_in_partitions_then_emit_warning_log( assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_highwater_metric_count_hit_context_limit_then_no_more_highwater_metrics( - mock_generic_client, kafka_instance, dd_run_check, aggregator, caplog + check, kafka_instance, dd_run_check, aggregator, caplog ): # Given - # consumer_offset = {(consumer_group, topic, partition): offset} - consumer_offset = {("consumer_group1", "topic1", "partition1"): 2} - # highwater_offset = {(topic, partition): offset} - highwater_offset = {("topic1", "partition1"): 3, ("topic2", "partition2"): 3} - mock_client = mock.MagicMock() - mock_client.get_consumer_offsets.return_value = consumer_offset - mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id") - mock_client.get_partitions_for_topic.return_value = ['partition1'] - mock_generic_client.return_value = mock_client caplog.set_level(logging.WARNING) + mock_client = seed_mock_client() + kafka_consumer_check = check(kafka_instance, init_config={'max_partition_contexts': 2}) + kafka_consumer_check.client = mock_client + # When - kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 2}, [kafka_instance]) dd_run_check(kafka_consumer_check) # Then - aggregator.assert_metric("kafka.broker_offset", count=2) - aggregator.assert_metric("kafka.consumer_offset", count=0) + aggregator.assert_metric("kafka.broker_offset", count=1) + aggregator.assert_metric("kafka.consumer_offset", count=1) aggregator.assert_metric("kafka.consumer_lag", count=0) - expected_warning = "Discovered 3 metric contexts" + expected_warning = "Discovered 2 metric contexts" assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_consumer_metric_count_hit_context_limit_then_no_more_consumer_metrics( - mock_generic_client, kafka_instance, dd_run_check, aggregator, caplog + check, kafka_instance, dd_run_check, aggregator, caplog ): # Given - # consumer_offset = {(consumer_group, topic, partition): offset} - consumer_offset = {("consumer_group1", "topic1", "partition1"): 2, ("consumer_group1", "topic2", "partition2"): 2} - # highwater_offset = {(topic, partition): offset} - highwater_offset = {("topic1", "partition1"): 3, ("topic2", "partition2"): 3} - mock_client = mock.MagicMock() - mock_client.get_consumer_offsets.return_value = consumer_offset - mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id") - mock_client.get_partitions_for_topic.return_value = ['partition1'] - mock_generic_client.return_value = mock_client caplog.set_level(logging.DEBUG) + mock_client = seed_mock_client() + mock_client.list_consumer_group_offsets.return_value = [ + ("consumer_group1", [("topic1", "partition1", 2)]), + ("consumer_group1", [("topic2", "partition2", 2)]), + ] + kafka_consumer_check = check(kafka_instance, init_config={'max_partition_contexts': 3}) + kafka_consumer_check.client = mock_client + # When - kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 3}, [kafka_instance]) dd_run_check(kafka_consumer_check) # Then @@ -369,19 +366,13 @@ def test_when_consumer_metric_count_hit_context_limit_then_no_more_consumer_metr def test_when_empty_string_consumer_group_then_skip(kafka_instance): - consumer_groups_result = ListConsumerGroupsResult( - valid=[ - ConsumerGroupListing(group_id="", is_simple_consumer_group=True), # Should be filtered out - ConsumerGroupListing(group_id="my_consumer", is_simple_consumer_group=True), - ] - ) - kafka_instance['monitor_unlisted_consumer_groups'] = True - future = concurrent.futures.Future() - future.set_result(consumer_groups_result) - - with mock.patch("datadog_checks.kafka_consumer.client.AdminClient.list_consumer_groups", return_value=future): + kafka_instance["monitor_unlisted_consumer_groups"] = True + with mock.patch( + "datadog_checks.kafka_consumer.kafka_consumer.KafkaClient.list_consumer_groups", + return_value=["", "my_consumer"], + ): kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - assert kafka_consumer_check.client._get_consumer_groups() == ["my_consumer"] + assert kafka_consumer_check._get_consumer_groups() == ["my_consumer"] def test_get_interpolated_timestamp():