Skip to content

Commit

Permalink
feat: add gauge to track the partition_id (#14713)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Nov 1, 2024
1 parent c0856bf commit a142b3d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
11 changes: 9 additions & 2 deletions pkg/kafka/partition/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type readerMetrics struct {
partition prometheus.Gauge
phase *prometheus.GaugeVec
receiveDelay *prometheus.HistogramVec
recordsPerFetch prometheus.Histogram
Expand All @@ -25,6 +26,10 @@ type readerMetrics struct {
// newReaderMetrics initializes and returns a new set of metrics for the PartitionReader.
func newReaderMetrics(r prometheus.Registerer) readerMetrics {
return readerMetrics{
partition: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_partition_id",
Help: "The partition ID assigned to this reader.",
}),
phase: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_phase",
Help: "The current phase of the consumer.",
Expand Down Expand Up @@ -60,12 +65,14 @@ func newReaderMetrics(r prometheus.Registerer) readerMetrics {
}
}

func (m *readerMetrics) reportStarting() {
func (m *readerMetrics) reportStarting(partitionID int32) {
m.partition.Set(float64(partitionID))
m.phase.WithLabelValues(phaseStarting).Set(1)
m.phase.WithLabelValues(phaseRunning).Set(0)
}

func (m *readerMetrics) reportRunning() {
func (m *readerMetrics) reportRunning(partitionID int32) {
m.partition.Set(float64(partitionID))
m.phase.WithLabelValues(phaseStarting).Set(0)
m.phase.WithLabelValues(phaseRunning).Set(1)
}
4 changes: 2 additions & 2 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *Reader) start(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
p.metrics.reportStarting()
p.metrics.reportStarting(p.partitionID)

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
Expand Down Expand Up @@ -142,7 +142,7 @@ func (p *Reader) start(ctx context.Context) error {
// data from Kafka, and send it to the consumer.
func (p *Reader) run(ctx context.Context) error {
level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup)
p.metrics.reportRunning()
p.metrics.reportRunning(p.partitionID)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit a142b3d

Please sign in to comment.