From 8c3c17a8c9ac9b7abc66da18e34d6053bc5591fa Mon Sep 17 00:00:00 2001 From: Mario Date: Thu, 2 Jan 2025 14:01:30 +0100 Subject: [PATCH] Fix registering of kafka read client metrics --- modules/blockbuilder/blockbuilder.go | 2 +- modules/generator/generator.go | 2 +- pkg/ingest/reader_client.go | 2 +- pkg/ingest/writer_client.go | 4 ++++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 29ca0895c3c..a248b52e88d 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -117,7 +117,7 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { b.kafkaClient, err = ingest.NewReaderClient( b.cfg.IngestStorageConfig.Kafka, - ingest.NewReaderClientMetrics(blockBuilderServiceName, nil), + ingest.NewReaderClientMetrics(blockBuilderServiceName, prometheus.DefaultRegisterer), b.logger, ) if err != nil { diff --git a/modules/generator/generator.go b/modules/generator/generator.go index d0cf926ce8a..afb859c428e 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -164,7 +164,7 @@ func (g *Generator) starting(ctx context.Context) (err error) { if g.cfg.Ingest.Enabled { g.kafkaClient, err = ingest.NewReaderClient( g.cfg.Ingest.Kafka, - ingest.NewReaderClientMetrics("generator", nil), + ingest.NewReaderClientMetrics("generator", prometheus.DefaultRegisterer), g.logger, ) if err != nil { diff --git a/pkg/ingest/reader_client.go b/pkg/ingest/reader_client.go index a594f165fed..7467a9542ca 100644 --- a/pkg/ingest/reader_client.go +++ b/pkg/ingest/reader_client.go @@ -39,7 +39,7 @@ func NewReaderClient(kafkaCfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo } func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics { - return kprom.NewMetrics("loki_ingest_storage_reader", + return kprom.NewMetrics("tempo_ingest_storage_reader", kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)), // Do not export the client ID, because we use it to specify options to the backend. kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) diff --git a/pkg/ingest/writer_client.go b/pkg/ingest/writer_client.go index 2f1542f5af4..4f4769e0041 100644 --- a/pkg/ingest/writer_client.go +++ b/pkg/ingest/writer_client.go @@ -201,6 +201,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi bufferedProduceBytes: promauto.With(reg).NewSummary( prometheus.SummaryOpts{ Namespace: "tempo", + Subsystem: "distributor", Name: "buffered_produce_bytes", Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, @@ -210,16 +211,19 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi bufferedProduceBytesLimit: promauto.With(reg).NewGauge( prometheus.GaugeOpts{ Namespace: "tempo", + Subsystem: "distributor", Name: "buffered_produce_bytes_limit", Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", }), produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Namespace: "tempo", + Subsystem: "distributor", Name: "produce_requests_total", Help: "Total number of produce requests issued to Kafka.", }), produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", + Subsystem: "distributor", Name: "produce_failures_total", Help: "Total number of failed produce requests issued to Kafka.", }, []string{"reason"}),