Skip to content

Commit

Permalink
Add metrics for produced messages
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 10, 2021
1 parent c0b7922 commit 1c46b56
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 55 deletions.
5 changes: 2 additions & 3 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// When we successfully send a mesasge, it will be added to this tracker.
// Later, when we receive the message back in the consumer, the message is marked as completed and removed from the tracker.
// If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost.
// A lost message is reported in the `roundtrip_latency_seconds` metric with infinite duration,
// A lost message is reported in the `roundtrip_latency_seconds` metric with i00000000000000nfinite duration,
// but it would probably be a good idea to also have a metric that reports the number of lost messages.
//
// When we fail to send a message, it isn't tracked.
Expand Down Expand Up @@ -85,8 +85,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
t.cache.Delete(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(key string, msg *EndToEndMessage) {

func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) {
if msg.hasArrived {
// message did, in fact, arrive (doesn't matter here if soon enough of barely expired)
// don't log anything
Expand Down
60 changes: 25 additions & 35 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/google/uuid"
Expand All @@ -25,61 +26,50 @@ func (m *EndToEndMessage) creationTime() time.Time {

// Sends a EndToEndMessage to every partition
func (s *Service) produceLatencyMessages(ctx context.Context) {

for i := 0; i < s.partitionCount; i++ {
err := s.produceSingleMessage(ctx, i)
if err != nil {
s.logger.Error("failed to produce to end-to-end topic",
s.logger.Error("failed to produce message to end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.Int("partition", i),
zap.Error(err))
}
}

}

func (s *Service) produceSingleMessage(ctx context.Context, partition int) error {

topicName := s.config.TopicManagement.Name
record, msg := createEndToEndRecord(s.minionID, topicName, partition)

for {
select {
case <-ctx.Done():
return nil
default:
startTime := time.Now()
s.endToEndMessagesProduced.Inc()

errCh := make(chan error)
s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
ackDuration := time.Since(startTime)

errCh <- err

// only notify ack if it is successful
if err == nil {
// notify service about ack
s.onAck(r.Partition, ackDuration)

// add to tracker
s.messageTracker.addToTracker(msg)
}
})

err := <-errCh
if err != nil {
s.logger.Error("error producing record", zap.Error(err))
return err
}
return nil
startTime := time.Now()

errCh := make(chan error)
s.endToEndMessagesProducedInFlight.Inc()
s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
ackDuration := time.Since(startTime)
s.endToEndMessagesProducedInFlight.Dec()
s.endToEndMessagesProducedTotal.Inc()

errCh <- err

if err == nil {
s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds())
s.messageTracker.addToTracker(msg)
} else {
s.endToEndMessagesProducedFailed.Inc()
}
})

err := <-errCh
if err != nil {
s.logger.Error("error producing record", zap.Error(err))
return err
}
return nil

}

func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) {

message := &EndToEndMessage{
MinionID: minionID,
MessageID: uuid.NewString(),
Expand Down
36 changes: 20 additions & 16 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ type Service struct {
partitionCount int // number of partitions of our test topic, used to send messages to all partitions

// Metrics
endToEndMessagesProduced prometheus.Counter
endToEndMessagesAcked prometheus.Counter
endToEndMessagesReceived prometheus.Counter
endToEndCommits prometheus.Counter
endToEndMessagesProducedInFlight prometheus.Gauge
endToEndMessagesProducedTotal prometheus.Counter
endToEndMessagesProducedFailed prometheus.Counter
endToEndMessagesReceived prometheus.Counter
endToEndCommits prometheus.Counter

endToEndAckLatency *prometheus.HistogramVec
endToEndRoundtripLatency *prometheus.HistogramVec
endToEndCommitLatency *prometheus.HistogramVec
}

// NewService creates a new instance of the e2e moinitoring service (wow)
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string, ctx context.Context) (*Service, error) {
func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) {
minionID := uuid.NewString()
groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID)

Expand Down Expand Up @@ -97,6 +98,14 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
Help: help,
})
}
makeGauge := func(name string, help string) prometheus.Gauge {
return promauto.NewGauge(prometheus.GaugeOpts{
Namespace: metricNamespace,
Subsystem: "end_to_end",
Name: name,
Help: help,
})
}
makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec {
return promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
Expand All @@ -109,17 +118,18 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN

// Low-level info
// Users can construct alerts like "can't produce messages" themselves from those
svc.endToEndMessagesProduced = makeCounter("messages_produced_total", "Number of messages that kminion's end-to-end test has tried to send to kafka")
svc.endToEndMessagesAcked = makeCounter("messages_acked_total", "Number of messages kafka acknowledged as produced")
svc.endToEndMessagesProducedInFlight = makeGauge("messages_produced_in_flight", "Number of messages that kminion's end-to-end test produced but has not received an answer for yet")
svc.endToEndMessagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.endToEndMessagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.endToEndCommits = makeCounter("commits_total", "Counts how many times kminions end-to-end test has committed messages")

// Latency Histograms
// More detailed info about how long stuff took
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages
svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partitionId"}, "Time until we received an ack for a produced message")
svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partitionId"}, "Time it took between sending (producing) and receiving (consuming) a message")
svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"groupCoordinatorBrokerId"}, "Time kafka took to respond to kminion's offset commit")
svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partition_id"}, "Time until we received an ack for a produced message")
svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partition_id"}, "Time it took between sending (producing) and receiving (consuming) a message")
svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"group_coordinator_broker_id"}, "Time kafka took to respond to kminion's offset commit")

return svc, nil
}
Expand Down Expand Up @@ -193,12 +203,6 @@ func (s *Service) startOffsetCommits(ctx context.Context) {

}

// called from e2e when a message is acknowledged
func (s *Service) onAck(partitionId int32, duration time.Duration) {
s.endToEndMessagesAcked.Inc()
s.endToEndAckLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds())
}

// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
if duration > s.config.Consumer.RoundtripSla {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func main() {
// Create end to end testing service
if cfg.Minion.EndToEnd.Enabled {
e2eService, err := e2e.NewService(
ctx,
cfg.Minion.EndToEnd,
logger,
kafkaSvc,
cfg.Exporter.Namespace,
ctx,
)
if err != nil {
logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err))
Expand Down

0 comments on commit 1c46b56

Please sign in to comment.