From bfe97d724f34277baa4cd9f9b25764e718997c46 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 11 Jul 2024 16:56:25 +0100 Subject: [PATCH] feat: Add metrics to WAL Manager (#13490) --- pkg/ingester-rf1/ingester.go | 2 +- pkg/storage/wal/manager.go | 42 +++++++++++++- pkg/storage/wal/manager_test.go | 99 +++++++++++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 9 deletions(-) diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 8d1b63f74e89..581da7c8a438 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -257,7 +257,7 @@ func New(cfg Config, clientConfig client.Config, MaxAge: wal.DefaultMaxAge, MaxSegments: wal.DefaultMaxSegments, MaxSegmentSize: wal.DefaultMaxSegmentSize, - }) + }, wal.NewMetrics(registerer)) if err != nil { return nil, err } diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index e1a21f504b34..d8cc04b2d960 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/logproto" @@ -81,6 +83,29 @@ type Config struct { MaxSegmentSize int64 } +type Metrics struct { + NumAvailable prometheus.Gauge + NumPending prometheus.Gauge + NumFlushing prometheus.Gauge +} + +func NewMetrics(r prometheus.Registerer) *Metrics { + return &Metrics{ + NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "wal_segments_available", + Help: "The number of WAL segments accepting writes.", + }), + NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "wal_segments_pending", + Help: "The number of WAL segments waiting to be flushed.", + }), + NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "wal_segments_flushing", + Help: "The number of WAL segments being flushed.", + }), + } +} + // Manager buffers segments in memory, and keeps track of which segments are // available and which are waiting to be flushed. The maximum number of // segments that can be buffered in memory, and their maximum age and maximum @@ -97,7 +122,8 @@ type Config struct { // and returned to the available list. This allows the manager to apply back-pressure // and avoid congestion collapse due to excessive timeouts and retries. type Manager struct { - cfg Config + cfg Config + metrics *Metrics // available is a list of segments that are available and accepting data. // All segments other than the segment at the front of the list are empty, @@ -135,13 +161,16 @@ type PendingItem struct { Writer *SegmentWriter } -func NewManager(cfg Config) (*Manager, error) { +func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { m := Manager{ cfg: cfg, + metrics: metrics, available: list.New(), pending: list.New(), shutdown: make(chan struct{}), } + m.metrics.NumPending.Set(0) + m.metrics.NumFlushing.Set(0) for i := int64(0); i < cfg.MaxSegments; i++ { w, err := NewWalSegmentWriter() if err != nil { @@ -151,6 +180,7 @@ func NewManager(cfg Config) (*Manager, error) { r: &AppendResult{done: make(chan struct{})}, w: w, }) + m.metrics.NumAvailable.Inc() } return &m, nil } @@ -171,7 +201,9 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { // the closed list to be flushed. if time.Since(it.firstAppendedAt) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize { m.pending.PushBack(it) + m.metrics.NumPending.Inc() m.available.Remove(el) + m.metrics.NumAvailable.Dec() } return it.r, nil } @@ -189,7 +221,9 @@ func (m *Manager) NextPending() (*PendingItem, error) { it := el.Value.(*item) if !it.firstAppendedAt.IsZero() && time.Since(it.firstAppendedAt) >= m.cfg.MaxAge { m.pending.PushBack(it) + m.metrics.NumPending.Inc() m.available.Remove(el) + m.metrics.NumAvailable.Dec() } } // If the pending list is still empty return nil. @@ -200,6 +234,8 @@ func (m *Manager) NextPending() (*PendingItem, error) { el := m.pending.Front() it := el.Value.(*item) m.pending.Remove(el) + m.metrics.NumPending.Dec() + m.metrics.NumFlushing.Inc() return &PendingItem{Result: it.r, Writer: it.w}, nil } @@ -209,6 +245,8 @@ func (m *Manager) Put(it *PendingItem) error { m.mu.Lock() defer m.mu.Unlock() it.Writer.Reset() + m.metrics.NumFlushing.Dec() + m.metrics.NumAvailable.Inc() m.available.PushBack(&item{ r: &AppendResult{done: make(chan struct{})}, w: it.Writer, diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index 25e2e2cdd988..461cc05f1243 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -6,17 +6,19 @@ import ( "testing" "time" - "github.com/grafana/loki/v3/pkg/logproto" - + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" ) func TestManager_Append(t *testing.T) { m, err := NewManager(Config{ MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }) + }, NewMetrics(nil)) require.NoError(t, err) // Append some data. @@ -96,7 +98,7 @@ func TestManager_Append_ErrFull(t *testing.T) { m, err := NewManager(Config{ MaxSegments: 10, MaxSegmentSize: 1024, // 1KB - }) + }, NewMetrics(nil)) require.NoError(t, err) // Should be able to write to all 10 segments of 1KB each. @@ -140,7 +142,7 @@ func TestManager_NextPending(t *testing.T) { MaxAge: DefaultMaxAge, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }) + }, NewMetrics(nil)) require.NoError(t, err) // There should be no items as no data has been written. @@ -195,7 +197,7 @@ func TestManager_Put(t *testing.T) { m, err := NewManager(Config{ MaxSegments: 10, MaxSegmentSize: 1024, // 1KB - }) + }, NewMetrics(nil)) require.NoError(t, err) // There should be 10 available segments, and 0 pending. @@ -242,3 +244,88 @@ func TestManager_Put(t *testing.T) { // The segment should be reset. require.Equal(t, int64(0), it.Writer.InputSize()) } + +func TestManager_Metrics(t *testing.T) { + r := prometheus.NewRegistry() + m, err := NewManager(Config{ + MaxSegments: 1, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(r)) + require.NoError(t, err) + + metricNames := []string{ + "wal_segments_available", + "wal_segments_flushing", + "wal_segments_pending", + } + expected := ` +# HELP wal_segments_available The number of WAL segments accepting writes. +# TYPE wal_segments_available gauge +wal_segments_available 1 +# HELP wal_segments_flushing The number of WAL segments being flushed. +# TYPE wal_segments_flushing gauge +wal_segments_flushing 0 +# HELP wal_segments_pending The number of WAL segments waiting to be flushed. +# TYPE wal_segments_pending gauge +wal_segments_pending 0 +` + require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) + + // Appending 1KB of data. + lbs := labels.Labels{{Name: "foo", Value: "bar"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("b", 1024)}} + _, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + + // This should move the segment from the available to the pending list. + expected = ` +# HELP wal_segments_available The number of WAL segments accepting writes. +# TYPE wal_segments_available gauge +wal_segments_available 0 +# HELP wal_segments_flushing The number of WAL segments being flushed. +# TYPE wal_segments_flushing gauge +wal_segments_flushing 0 +# HELP wal_segments_pending The number of WAL segments waiting to be flushed. +# TYPE wal_segments_pending gauge +wal_segments_pending 1 +` + require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) + + // Get the segment from the pending list. + it, err := m.NextPending() + require.NoError(t, err) + require.NotNil(t, it) + expected = ` +# HELP wal_segments_available The number of WAL segments accepting writes. +# TYPE wal_segments_available gauge +wal_segments_available 0 +# HELP wal_segments_flushing The number of WAL segments being flushed. +# TYPE wal_segments_flushing gauge +wal_segments_flushing 1 +# HELP wal_segments_pending The number of WAL segments waiting to be flushed. +# TYPE wal_segments_pending gauge +wal_segments_pending 0 +` + require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) + + // Reset the segment and put it back in the available list. + require.NoError(t, m.Put(it)) + expected = ` +# HELP wal_segments_available The number of WAL segments accepting writes. +# TYPE wal_segments_available gauge +wal_segments_available 1 +# HELP wal_segments_flushing The number of WAL segments being flushed. +# TYPE wal_segments_flushing gauge +wal_segments_flushing 0 +# HELP wal_segments_pending The number of WAL segments waiting to be flushed. +# TYPE wal_segments_pending gauge +wal_segments_pending 0 +` + require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) + +}