diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index d90b028e8af81..a1008d88c8596 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -100,6 +100,9 @@ func (p *Reader) start(ctx context.Context) error { return errors.Wrap(err, "creating kafka reader client") } + p.metrics.phase.WithLabelValues("starting").Set(1) + p.metrics.phase.WithLabelValues("running").Set(0) + // We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. lastCommittedOffset := p.fetchLastCommittedOffset(ctx) p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ @@ -141,6 +144,9 @@ func (p *Reader) start(ctx context.Context) error { // data from Kafka, and send it to the consumer. func (p *Reader) run(ctx context.Context) error { level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) + p.metrics.phase.WithLabelValues("starting").Set(0) + p.metrics.phase.WithLabelValues("running").Set(1) + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -513,6 +519,7 @@ func isErrFetch(fetch kgo.Fetch) bool { } type readerMetrics struct { + phase *prometheus.GaugeVec receiveDelayWhenStarting prometheus.Observer receiveDelayWhenRunning prometheus.Observer recordsPerFetch prometheus.Histogram @@ -538,6 +545,10 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics { }, []string{"phase"}) return readerMetrics{ + phase: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_phase", + Help: "The current phase of the consumer.", + }, []string{"phase"}), receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), kprom: client.NewReaderClientMetrics("partition-reader", reg),