From d452033a71b625bb79aecdf49eabf050201f8e46 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Wed, 11 Aug 2021 16:25:49 +0200 Subject: [PATCH] Add partitionID in labels --- e2e/producer.go | 13 ++++++++----- e2e/service.go | 26 +++++++++++++++++--------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/e2e/producer.go b/e2e/producer.go index fecb60b..7bd00c0 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -31,15 +31,18 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { // the SLA for producers. childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla) - s.messagesProducedInFlight.Inc() + pID := strconv.Itoa(partition) + s.messagesProducedInFlight.WithLabelValues(pID).Inc() s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { defer cancel() ackDuration := time.Since(startTime) - s.messagesProducedInFlight.Dec() - s.messagesProducedTotal.Inc() + s.messagesProducedInFlight.WithLabelValues(pID).Dec() + s.messagesProducedTotal.WithLabelValues(pID).Inc() + // We add 0 in order to ensure that the "failed" metric series for that partition id is initialized as well. + s.messagesProducedFailed.WithLabelValues(pID).Add(0) if err != nil { - s.messagesProducedFailed.Inc() + s.messagesProducedFailed.WithLabelValues(pID).Inc() s.logger.Info("failed to produce message to end-to-end topic", zap.String("topic_name", r.Topic), zap.Int32("partition", r.Partition), @@ -47,7 +50,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { return } - s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds()) + s.endToEndAckLatency.WithLabelValues(pID).Observe(ackDuration.Seconds()) s.messageTracker.addToTracker(msg) }) } diff --git a/e2e/service.go b/e2e/service.go index 128cd48..be6f23b 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -30,9 +30,9 @@ type Service struct { partitionCount int // number of partitions of our test topic, used to send messages to all partitions // Metrics - messagesProducedInFlight prometheus.Gauge - messagesProducedTotal prometheus.Counter - messagesProducedFailed prometheus.Counter + messagesProducedInFlight *prometheus.GaugeVec + messagesProducedTotal *prometheus.CounterVec + messagesProducedFailed *prometheus.CounterVec messagesReceived prometheus.Counter offsetCommits prometheus.Counter lostMessages prometheus.Counter @@ -99,13 +99,21 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k Help: help, }) } - makeGauge := func(name string, help string) prometheus.Gauge { - return promauto.NewGauge(prometheus.GaugeOpts{ + makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec { + return promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: "end_to_end", Name: name, Help: help, - }) + }, labelNames) + } + makeGaugeVec := func(name string, labelNames []string, help string) *prometheus.GaugeVec { + return promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricNamespace, + Subsystem: "end_to_end", + Name: name, + Help: help, + }, labelNames) } makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec { return promauto.NewHistogramVec(prometheus.HistogramOpts{ @@ -119,9 +127,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k // Low-level info // Users can construct alerts like "can't produce messages" themselves from those - svc.messagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") - svc.messagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") - svc.messagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure") + svc.messagesProducedInFlight = makeGaugeVec("messages_produced_in_flight", []string{"partition_id"}, "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") + svc.messagesProducedTotal = makeCounterVec("messages_produced_total", []string{"partition_id"}, "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") + svc.messagesProducedFailed = makeCounterVec("messages_produced_failed_total", []string{"partition_id"}, "Number of messages failed to produce to Kafka because of a timeout or failure") svc.messagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets") svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration")