diff --git a/e2e/producer.go b/e2e/producer.go index 98850f6..fecb60b 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -31,15 +31,15 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { // the SLA for producers. childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla) - s.endToEndMessagesProducedInFlight.Inc() + s.messagesProducedInFlight.Inc() s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { defer cancel() ackDuration := time.Since(startTime) - s.endToEndMessagesProducedInFlight.Dec() - s.endToEndMessagesProducedTotal.Inc() + s.messagesProducedInFlight.Dec() + s.messagesProducedTotal.Inc() if err != nil { - s.endToEndMessagesProducedFailed.Inc() + s.messagesProducedFailed.Inc() s.logger.Info("failed to produce message to end-to-end topic", zap.String("topic_name", r.Topic), zap.Int32("partition", r.Partition), diff --git a/e2e/service.go b/e2e/service.go index 5a44823..128cd48 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -30,12 +30,12 @@ type Service struct { partitionCount int // number of partitions of our test topic, used to send messages to all partitions // Metrics - endToEndMessagesProducedInFlight prometheus.Gauge - endToEndMessagesProducedTotal prometheus.Counter - endToEndMessagesProducedFailed prometheus.Counter - endToEndMessagesReceived prometheus.Counter - endToEndCommits prometheus.Counter - lostMessages prometheus.Counter + messagesProducedInFlight prometheus.Gauge + messagesProducedTotal prometheus.Counter + messagesProducedFailed prometheus.Counter + messagesReceived prometheus.Counter + offsetCommits prometheus.Counter + lostMessages prometheus.Counter endToEndAckLatency *prometheus.HistogramVec endToEndRoundtripLatency *prometheus.HistogramVec @@ -119,11 +119,11 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k // Low-level info // Users can construct alerts like "can't produce messages" themselves from those - svc.endToEndMessagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") - svc.endToEndMessagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") - svc.endToEndMessagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure") - svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") - svc.endToEndCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets") + svc.messagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") + svc.messagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") + svc.messagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure") + svc.messagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") + svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets") svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration") // Latency Histograms @@ -211,7 +211,7 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { return // message is too old } - s.endToEndMessagesReceived.Inc() + s.messagesReceived.Inc() s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) } @@ -229,5 +229,5 @@ func (s *Service) onOffsetCommit(brokerId int32, duration time.Duration) { return } - s.endToEndCommits.Inc() + s.offsetCommits.Inc() }