Skip to content

Commit

Permalink
Add partitionID in labels
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 11, 2021
1 parent 5e11446 commit d452033
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
13 changes: 8 additions & 5 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,26 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
// the SLA for producers.
childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla)

s.messagesProducedInFlight.Inc()
pID := strconv.Itoa(partition)
s.messagesProducedInFlight.WithLabelValues(pID).Inc()
s.client.Produce(childCtx, record, func(r *kgo.Record, err error) {
defer cancel()
ackDuration := time.Since(startTime)
s.messagesProducedInFlight.Dec()
s.messagesProducedTotal.Inc()
s.messagesProducedInFlight.WithLabelValues(pID).Dec()
s.messagesProducedTotal.WithLabelValues(pID).Inc()
// We add 0 in order to ensure that the "failed" metric series for that partition id is initialized as well.
s.messagesProducedFailed.WithLabelValues(pID).Add(0)

if err != nil {
s.messagesProducedFailed.Inc()
s.messagesProducedFailed.WithLabelValues(pID).Inc()
s.logger.Info("failed to produce message to end-to-end topic",
zap.String("topic_name", r.Topic),
zap.Int32("partition", r.Partition),
zap.Error(err))
return
}

s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds())
s.endToEndAckLatency.WithLabelValues(pID).Observe(ackDuration.Seconds())
s.messageTracker.addToTracker(msg)
})
}
Expand Down
26 changes: 17 additions & 9 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ type Service struct {
partitionCount int // number of partitions of our test topic, used to send messages to all partitions

// Metrics
messagesProducedInFlight prometheus.Gauge
messagesProducedTotal prometheus.Counter
messagesProducedFailed prometheus.Counter
messagesProducedInFlight *prometheus.GaugeVec
messagesProducedTotal *prometheus.CounterVec
messagesProducedFailed *prometheus.CounterVec
messagesReceived prometheus.Counter
offsetCommits prometheus.Counter
lostMessages prometheus.Counter
Expand Down Expand Up @@ -99,13 +99,21 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
Help: help,
})
}
makeGauge := func(name string, help string) prometheus.Gauge {
return promauto.NewGauge(prometheus.GaugeOpts{
makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec {
return promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: metricNamespace,
Subsystem: "end_to_end",
Name: name,
Help: help,
})
}, labelNames)
}
makeGaugeVec := func(name string, labelNames []string, help string) *prometheus.GaugeVec {
return promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Subsystem: "end_to_end",
Name: name,
Help: help,
}, labelNames)
}
makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec {
return promauto.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -119,9 +127,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k

// Low-level info
// Users can construct alerts like "can't produce messages" themselves from those
svc.messagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet")
svc.messagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.messagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.messagesProducedInFlight = makeGaugeVec("messages_produced_in_flight", []string{"partition_id"}, "Number of messages that kminion's end-to-end test produced but has not received an answer for yet")
svc.messagesProducedTotal = makeCounterVec("messages_produced_total", []string{"partition_id"}, "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.messagesProducedFailed = makeCounterVec("messages_produced_failed_total", []string{"partition_id"}, "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.messagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets")
svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration")
Expand Down

0 comments on commit d452033

Please sign in to comment.