From 1c46b5656b96b08815073971ffd874b1d1ecf088 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Tue, 10 Aug 2021 10:15:36 +0200 Subject: [PATCH] Add metrics for produced messages --- e2e/message_tracker.go | 5 ++-- e2e/producer.go | 60 ++++++++++++++++++------------------------ e2e/service.go | 36 ++++++++++++++----------- main.go | 2 +- 4 files changed, 48 insertions(+), 55 deletions(-) diff --git a/e2e/message_tracker.go b/e2e/message_tracker.go index 6df56fb..8944a09 100644 --- a/e2e/message_tracker.go +++ b/e2e/message_tracker.go @@ -12,7 +12,7 @@ import ( // When we successfully send a mesasge, it will be added to this tracker. // Later, when we receive the message back in the consumer, the message is marked as completed and removed from the tracker. // If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost. -// A lost message is reported in the `roundtrip_latency_seconds` metric with infinite duration, +// A lost message is reported in the `roundtrip_latency_seconds` metric with i00000000000000nfinite duration, // but it would probably be a good idea to also have a metric that reports the number of lost messages. // // When we fail to send a message, it isn't tracked. @@ -85,8 +85,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) { t.cache.Delete(msg.MessageID) } -func (t *messageTracker) onMessageExpired(key string, msg *EndToEndMessage) { - +func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) { if msg.hasArrived { // message did, in fact, arrive (doesn't matter here if soon enough of barely expired) // don't log anything diff --git a/e2e/producer.go b/e2e/producer.go index f2088ef..d0ce2f6 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -3,6 +3,7 @@ package e2e import ( "context" "encoding/json" + "strconv" "time" "github.com/google/uuid" @@ -25,61 +26,50 @@ func (m *EndToEndMessage) creationTime() time.Time { // Sends a EndToEndMessage to every partition func (s *Service) produceLatencyMessages(ctx context.Context) { - for i := 0; i < s.partitionCount; i++ { err := s.produceSingleMessage(ctx, i) if err != nil { - s.logger.Error("failed to produce to end-to-end topic", + s.logger.Error("failed to produce message to end-to-end topic", zap.String("topic_name", s.config.TopicManagement.Name), zap.Int("partition", i), zap.Error(err)) } } - } func (s *Service) produceSingleMessage(ctx context.Context, partition int) error { - topicName := s.config.TopicManagement.Name record, msg := createEndToEndRecord(s.minionID, topicName, partition) - for { - select { - case <-ctx.Done(): - return nil - default: - startTime := time.Now() - s.endToEndMessagesProduced.Inc() - - errCh := make(chan error) - s.client.Produce(ctx, record, func(r *kgo.Record, err error) { - ackDuration := time.Since(startTime) - - errCh <- err - - // only notify ack if it is successful - if err == nil { - // notify service about ack - s.onAck(r.Partition, ackDuration) - - // add to tracker - s.messageTracker.addToTracker(msg) - } - }) - - err := <-errCh - if err != nil { - s.logger.Error("error producing record", zap.Error(err)) - return err - } - return nil + startTime := time.Now() + + errCh := make(chan error) + s.endToEndMessagesProducedInFlight.Inc() + s.client.Produce(ctx, record, func(r *kgo.Record, err error) { + ackDuration := time.Since(startTime) + s.endToEndMessagesProducedInFlight.Dec() + s.endToEndMessagesProducedTotal.Inc() + + errCh <- err + + if err == nil { + s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds()) + s.messageTracker.addToTracker(msg) + } else { + s.endToEndMessagesProducedFailed.Inc() } + }) + + err := <-errCh + if err != nil { + s.logger.Error("error producing record", zap.Error(err)) + return err } + return nil } func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) { - message := &EndToEndMessage{ MinionID: minionID, MessageID: uuid.NewString(), diff --git a/e2e/service.go b/e2e/service.go index f138ecc..486e699 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -30,10 +30,11 @@ type Service struct { partitionCount int // number of partitions of our test topic, used to send messages to all partitions // Metrics - endToEndMessagesProduced prometheus.Counter - endToEndMessagesAcked prometheus.Counter - endToEndMessagesReceived prometheus.Counter - endToEndCommits prometheus.Counter + endToEndMessagesProducedInFlight prometheus.Gauge + endToEndMessagesProducedTotal prometheus.Counter + endToEndMessagesProducedFailed prometheus.Counter + endToEndMessagesReceived prometheus.Counter + endToEndCommits prometheus.Counter endToEndAckLatency *prometheus.HistogramVec endToEndRoundtripLatency *prometheus.HistogramVec @@ -41,7 +42,7 @@ type Service struct { } // NewService creates a new instance of the e2e moinitoring service (wow) -func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string, ctx context.Context) (*Service, error) { +func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) { minionID := uuid.NewString() groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID) @@ -97,6 +98,14 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN Help: help, }) } + makeGauge := func(name string, help string) prometheus.Gauge { + return promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: metricNamespace, + Subsystem: "end_to_end", + Name: name, + Help: help, + }) + } makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec { return promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricNamespace, @@ -109,17 +118,18 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN // Low-level info // Users can construct alerts like "can't produce messages" themselves from those - svc.endToEndMessagesProduced = makeCounter("messages_produced_total", "Number of messages that kminion's end-to-end test has tried to send to kafka") - svc.endToEndMessagesAcked = makeCounter("messages_acked_total", "Number of messages kafka acknowledged as produced") + svc.endToEndMessagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") + svc.endToEndMessagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") + svc.endToEndMessagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure") svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") svc.endToEndCommits = makeCounter("commits_total", "Counts how many times kminions end-to-end test has committed messages") // Latency Histograms // More detailed info about how long stuff took // Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages - svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partitionId"}, "Time until we received an ack for a produced message") - svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partitionId"}, "Time it took between sending (producing) and receiving (consuming) a message") - svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"groupCoordinatorBrokerId"}, "Time kafka took to respond to kminion's offset commit") + svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partition_id"}, "Time until we received an ack for a produced message") + svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partition_id"}, "Time it took between sending (producing) and receiving (consuming) a message") + svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"group_coordinator_broker_id"}, "Time kafka took to respond to kminion's offset commit") return svc, nil } @@ -193,12 +203,6 @@ func (s *Service) startOffsetCommits(ctx context.Context) { } -// called from e2e when a message is acknowledged -func (s *Service) onAck(partitionId int32, duration time.Duration) { - s.endToEndMessagesAcked.Inc() - s.endToEndAckLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) -} - // called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again) func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { if duration > s.config.Consumer.RoundtripSla { diff --git a/main.go b/main.go index b04ed4f..1c6123c 100644 --- a/main.go +++ b/main.go @@ -73,11 +73,11 @@ func main() { // Create end to end testing service if cfg.Minion.EndToEnd.Enabled { e2eService, err := e2e.NewService( + ctx, cfg.Minion.EndToEnd, logger, kafkaSvc, cfg.Exporter.Namespace, - ctx, ) if err != nil { logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err))