Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus requires that all meters with the same name have the same set of tag keys #3508

Closed
dazito opened this issue Oct 31, 2022 · 5 comments
Labels
duplicate A duplicate of another issue

Comments

@dazito
Copy link

dazito commented Oct 31, 2022

Describe the bug
While I was investigating a missing kafka metric in our monitoring system I found out it was caused by our application (spring boot using micrometer) not publishing the metric kafka.consumer.fetch.manager.records.consumed.total.

The application has two kafka consumers, lets call them query-routing and query-tracking consumers, and they are configured via @KafkaListener annotation and each kafka consumer has it's own instance of ConcurrentKafkaListenerContainerFactory.

The query-router consumer is configured as:

@Configuration
@EnableKafka
public class QueryRoutingConfiguration {
    
    @Bean(name = "queryRoutingContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, RoutingInfo> kafkaListenerContainerFactory(MeterRegistry meterRegistry) {
        
        Map<String, Object> consumerConfigs = new HashMap<>();
        // For brevity I removed the configs as they are trivial configs like bootstrap servers and serializers

        DefaultKafkaConsumerFactory<String, RoutingInfo> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs);
        
        consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

        ConcurrentKafkaListenerContainerFactory<String, RoutingInfo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setIdleEventInterval(5000L);

        return factory;
    }
}

And the query-tracking consumer is configured as:

@Configuration
@EnableKafka
public class QueryTrackingConfiguration {

    private static final FixedBackOff NO_ATTEMPTS = new FixedBackOff(Duration.ofSeconds(0).toMillis(), 0L);
    

    @Bean(name = "queryTrackingContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, QueryTrackingMessage> kafkaListenerContainerFactory(MeterRegistry meterRegistry) {

        Map<String, Object> consumerConfigs = new HashMap<>();
        // For brevity I removed the configs as they are trivial configs like bootstrap servers and serializers
        DefaultKafkaConsumerFactory<String, QueryTrackingMessage> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs);

        consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

        ConcurrentKafkaListenerContainerFactory<String, QueryTrackingMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);

        DefaultErrorHandler deusErrorHandler = new DefaultErrorHandler(NO_ATTEMPTS);
        factory.setCommonErrorHandler(deusErrorHandler);

        return factory;
    }
}

The MeterRegistryConfigurator bean configuaration is set as:

@Configuration
public class MeterRegistryConfigurator {
    private static final Logger LOG = LoggerFactory.getLogger(MeterRegistryConfigurator.class);
    private static final String PREFIX = "dps";

    @Bean
    MeterRegistryCustomizer<MeterRegistry> meterRegistryCustomizer() {
        return registry -> registry.config()
            .onMeterAdded(meter -> LOG.info("onMeterAdded: {}", meter.getId().getName()))
            .onMeterRemoved(meter -> LOG.info("onMeterRemoved: {}", meter.getId().getName()))
            .onMeterRegistrationFailed(
                (id, s) -> LOG.info("onMeterRegistrationFailed - id '{}' value '{}'", id.getName(), s))
            .meterFilter(PrefixMetricFilter.withPrefix(PREFIX))
            .meterFilter(
                MeterFilter.deny(id ->
                    id.getName().startsWith(PREFIX + ".jvm")
                        || id.getName().startsWith(PREFIX + ".system")
                        || id.getName().startsWith(PREFIX + ".process")
                        || id.getName().startsWith(PREFIX + ".logback")
                        || id.getName().startsWith(PREFIX + ".tomcat"))
            )
            .meterFilter(MeterFilter.ignoreTags("host", "host.name"))
            .namingConvention(NamingConvention.snakeCase);
    }
}

The @KafkaListener for each consumer is set as

@KafkaListener(
    id = "query-routing",
    idIsGroup = true,
    topics = "${query-routing.consumer.topic}",
    groupId = "${query-routing.consumer.groupId}",
    containerFactory = "queryRoutingContainerFactory")
public void listenForMessages(ConsumerRecord<String, RoutingInfo> record) {
    // Handle each record ...
}

and

@KafkaListener(
    id = "query-tracking",
    idIsGroup = true,
    topics = "${query-tracking.consumer.topic}",
    groupId = "${query-tracking.consumer.groupId}",
    containerFactory = "queryTrackingContainerFactory"
)
public void listenForMessages(List<ConsumerRecord<String, QueryTrackingMessage>> consumerRecords, Acknowledgment ack) {
    // Handle each record ...
}

When the application starts up, going to the actuator/prometheus endpoing I can see the metric for both consumers:

# HELP dps_kafka_consumer_fetch_manager_records_consumed_total The total number of records consumed
# TYPE dps_kafka_consumer_fetch_manager_records_consumed_total counter
dps_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-qf-query-tracking-consumer-1",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-qf-query-tracking-consumer-1",} 7.0
dps_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-QF-Routing-f5d0d9f1-e261-407b-954d-5d217211dee0-2",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-QF-Routing-f5d0d9f1-e261-407b-954d-5d217211dee0-2",} 0.0

But a few seconds later there is a new call to io.micrometer.core.instrument.binder.kafka.KafkaMetrics#checkAndBindMetrics which will remove a set of metrics (including kafka.consumer.fetch.manager.records.consumed.total)

onMeterRegistrationFailed - dps.kafka.consumer.fetch.manager.records.consumed.total string Prometheus requires that all meters with the same name have the same set of tag keys. There is already an existing meter named 'dps.kafka.consumer.fetch.manager.records.consumed.total' containing tag keys [client_id, kafka_version, spring_id]. The meter you are attempting to register has keys [client_id, kafka_version, spring_id, topic].

Going again to actuator/prometheus will only show the metric for the query-routing consumer:

# HELP deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total The total number of records consumed for a topic
# TYPE deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total counter
deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-QF-Routing-0a739a21-4764-411a-9cc6-0e60293b40b4-2",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-QF-Routing-0a739a21-4764-411a-9cc6-0e60293b40b4-2",theKey="routing",topic="QF_query_routing_v1",} 0.0

As you can see above the metric for the query-tracking consumer is gone. As the log says, The meter you are attempting to register has keys [client_id, kafka_version, spring_id, topic]. The issue is I cannot find where is this metric with a topic key being registered which will trigger io.micrometer.core.instrument.binder.kafka.KafkaMetrics#checkAndBindMetrics which will remove the metric for the query-tracking consumer.

Environment

  • Micrometer version: 1.9.5
  • Micrometer registry: prometheus
  • OS: linux running on a docker image inside a kubernetes pod
  • Java version: 11

To Reproduce
How to reproduce the bug:
Please check description above

Expected behavior
I would expect the kafka metrics for both consumers to be published into spring actuator, even if both metrics have different set of tags.

Additional context

@dazito
Copy link
Author

dazito commented Oct 31, 2022

Some of my findings related to this issue:

At bootstrap, the MeterRegistry will register/bind a set of metrics but without a topic tag.
This binding is performed by io.micrometer.core.instrument.binder.kafka.KafkaMetrics#bindTo

As you can see in the bindTo method, there is a scheduler


scheduler.scheduleAtFixedRate(() -> checkAndBindMetrics(registry), getRefreshIntervalInMillis(),
                getRefreshIntervalInMillis(), TimeUnit.MILLISECONDS);

which will be firing every ~30 seconds to look for new metrics to bind.

So, the first time this scheduled timer runs (~30 seconds after bootstrap) it will find the query-tracking and query-routing consumer metrics and it will try to bind them but now both query-tracking and query-routing consumer metrics have an extra tag named topic (which holds the topic name) and when it tries to bind these new metrics the MeterRegistry will reject them because there is already a metric registered (from the bootstrap) with the same name but with different tags (it is missing the topic tag) and therefore will reject the binding for the new tags.

@garyrussell
Copy link

I assume the extra tag appears after the consumer is assigned partitions from the topic, which happens after initialization.

https://stackoverflow.com/questions/74226882/meter-registration-fails-on-spring-boot-kafka-consumer-with-prometheus-meterregi/74227336#74227336

@eledhwen
Copy link

I believe I have encountered the same issue in a slightly different context:

We have a bunch of components producing similarly structured counters, and for a given counter name, if all calls use the same set of labels, everything works as expected:

//provided a valid MeterRegistry registry

registry.counter("my_metric", Set.of(
  Tag.of("some_label", "value_1"),
  Tag.of("other_label", "value_2")
)).increment(1);

/* Someplace else */
registry.counter("my_metric", Set.of(
  Tag.of("some_label", "value_2"),
  Tag.of("other_label", "value_3")
)).increment(1);

However, if some calls use a different set of labels, like, e.g.:

//provided a valid MeterRegistry registry

registry.counter("my_metric", Set.of(
  Tag.of("some_label", "value_1"),
  Tag.of("other_label", "value_2")
)).increment(1);

/* Someplace else */
registry.counter("my_metric", Set.of(
  Tag.of("some_label", "value_1")
)).increment(1);

Some of those sets will be absent from the Prometheus endpoint but not from the MeterRegistry itself (i.e. in Spring they do show up in /actuator/metrics). I didn't dive too deep into this issue, but clearly those metrics are properly computed and tracked, they're just dropped when generating the Prometheus response.

@cachescrubber
Copy link
Contributor

I think this is a duplicate of #877

@marcingrzejszczak
Copy link
Contributor

Seems like a duplicate of #877

@marcingrzejszczak marcingrzejszczak closed this as not planned Won't fix, can't repro, duplicate, stale Dec 20, 2023
@marcingrzejszczak marcingrzejszczak added the duplicate A duplicate of another issue label Dec 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duplicate A duplicate of another issue
Projects
None yet
Development

No branches or pull requests

6 participants