diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index ce53fdac0a58c..6a7a03a254501 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -97,12 +97,9 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error { func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error { start := time.Now() - defer func() { - i.metrics.flushDuration.Observe(time.Since(start).Seconds()) - w.ReportMetrics() - }() i.metrics.flushesTotal.Add(1) + defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) }() buf := i.flushBuffers[j] defer buf.Reset() @@ -111,6 +108,9 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter return err } + stats := wal.GetSegmentStats(w, time.Now()) + wal.ReportSegmentStats(stats, i.metrics.segmentMetrics) + id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil { i.metrics.flushFailuresTotal.Inc() @@ -121,7 +121,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter Block: w.Meta(id), }); err != nil { i.metrics.flushFailuresTotal.Inc() - return fmt.Errorf("metastore add block: %w", err) + return fmt.Errorf("failed to update metastore: %w", err) } return nil diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 8182449c6d596..0b5a6c5fd724a 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -244,7 +244,7 @@ func New(cfg Config, clientConfig client.Config, MaxAge: cfg.MaxSegmentAge, MaxSegments: int64(cfg.MaxSegments), MaxSegmentSize: int64(cfg.MaxSegmentSize), - }, wal.NewMetrics(registerer)) + }, wal.NewManagerMetrics(registerer)) if err != nil { return nil, err } diff --git a/pkg/ingester-rf1/metrics.go b/pkg/ingester-rf1/metrics.go index 817fc9b46d648..91b3d398b7b3c 100644 --- a/pkg/ingester-rf1/metrics.go +++ b/pkg/ingester-rf1/metrics.go @@ -3,18 +3,37 @@ package ingesterrf1 import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/storage/wal" ) -type flushMetrics struct { +type ingesterMetrics struct { + autoForgetUnhealthyIngestersTotal prometheus.Counter + limiterEnabled prometheus.Gauge + // Shutdown marker for ingester scale down. + shutdownMarker prometheus.Gauge flushesTotal prometheus.Counter flushFailuresTotal prometheus.Counter flushQueues prometheus.Gauge flushDuration prometheus.Histogram - flushSizeBytes prometheus.Histogram + flushSize prometheus.Histogram + segmentMetrics *wal.SegmentMetrics } -func newFlushMetrics(r prometheus.Registerer) *flushMetrics { - return &flushMetrics{ +func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { + return &ingesterMetrics{ + autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total", + Help: "Total number of ingesters automatically forgotten.", + }), + limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_rf1_limiter_enabled", + Help: "1 if the limiter is enabled, otherwise 0.", + }), + shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_rf1_shutdown_marker", + Help: "1 if prepare shutdown has been called, 0 otherwise.", + }), flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_ingester_rf1_flushes_total", Help: "The total number of flushes.", @@ -33,37 +52,12 @@ func newFlushMetrics(r prometheus.Registerer) *flushMetrics { Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), NativeHistogramBucketFactor: 1.1, }), - flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "loki_ingester_rf1_flush_size_bytes", Help: "The flush size (as written to object storage).", Buckets: prometheus.ExponentialBuckets(100, 10, 8), NativeHistogramBucketFactor: 1.1, }), - } -} - -type ingesterMetrics struct { - autoForgetUnhealthyIngestersTotal prometheus.Counter - limiterEnabled prometheus.Gauge - // Shutdown marker for ingester scale down. - shutdownMarker prometheus.Gauge - *flushMetrics -} - -func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { - return &ingesterMetrics{ - autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total", - Help: "Total number of ingesters automatically forgotten.", - }), - limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "loki_ingester_rf1_limiter_enabled", - Help: "1 if the limiter is enabled, otherwise 0.", - }), - shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "loki_ingester_rf1_shutdown_marker", - Help: "1 if prepare shutdown has been called, 0 otherwise.", - }), - flushMetrics: newFlushMetrics(r), + segmentMetrics: wal.NewSegmentMetrics(r), } } diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index 34f7ce7bcf7fa..fc23cb21e742f 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -12,12 +12,6 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" ) -const ( - DefaultMaxAge = 500 * time.Millisecond - DefaultMaxSegments = 10 - DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB. -) - var ( // ErrClosed is returned when the WAL is closed. It is a permanent error // as once closed, a WAL cannot be re-opened. @@ -109,31 +103,24 @@ type Manager struct { clock quartz.Clock } -// segment is similar to PendingSegment, however it is an internal struct used -// in the available and pending lists. It contains a single-use result that is -// returned to callers appending to the WAL and a re-usable segment that is reset -// after each flush. +// segment is an internal struct used in the available and pending lists. It +// contains a single-use result that is returned to callers appending to the +// WAL and a re-usable segment that is reset after each flush. type segment struct { r *AppendResult w *SegmentWriter - - // moved is the time the segment was moved to the pending list. It is used - // to calculate the age of the segment. A segment is moved when it has - // exceeded the maximum age or the maximum size. - moved time.Time } // PendingSegment contains a result and the segment to be flushed. type PendingSegment struct { Result *AppendResult Writer *SegmentWriter - Moved time.Time } -func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { +func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error) { m := Manager{ cfg: cfg, - metrics: metrics.ManagerMetrics, + metrics: metrics, available: list.New(), pending: list.New(), clock: quartz.NewReal(), @@ -142,7 +129,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { m.metrics.NumPending.Set(0) m.metrics.NumFlushing.Set(0) for i := int64(0); i < cfg.MaxSegments; i++ { - w, err := NewWalSegmentWriter(metrics.SegmentMetrics) + w, err := NewWalSegmentWriter() if err != nil { return nil, err } @@ -205,11 +192,7 @@ func (m *Manager) NextPending() (*PendingSegment, error) { m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingSegment{ - Result: s.r, - Writer: s.w, - Moved: s.moved, - }, nil + return &PendingSegment{Result: s.r, Writer: s.w}, nil } // Put resets the segment and puts it back in the available list to accept @@ -229,7 +212,6 @@ func (m *Manager) Put(s *PendingSegment) { // move the element from the available list to the pending list and sets the // relevant metrics. func (m *Manager) move(el *list.Element, s *segment) { - s.moved = m.clock.Now() m.pending.PushBack(s) m.metrics.NumPending.Inc() m.available.Remove(el) diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index 34d7565e03b54..93e10fbaa06a9 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -20,7 +20,7 @@ func TestManager_Append(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Append some data. @@ -59,7 +59,7 @@ func TestManager_AppendFailed(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Append some data. @@ -92,7 +92,7 @@ func TestManager_AppendFailedWALClosed(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 10, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Append some data. @@ -126,7 +126,7 @@ func TestManager_AppendFailedWALFull(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 10, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Should be able to write 100KB of data, 10KB per segment. @@ -161,7 +161,7 @@ func TestManager_AppendMaxAgeExceeded(t *testing.T) { MaxAge: 100 * time.Millisecond, MaxSegments: 1, MaxSegmentSize: 8 * 1024 * 1024, // 8MB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Create a mock clock. @@ -208,7 +208,7 @@ func TestManager_AppendMaxSizeExceeded(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Append 512B of data. @@ -250,7 +250,7 @@ func TestManager_NextPending(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // There should be no segments waiting to be flushed as no data has been @@ -286,7 +286,7 @@ func TestManager_NextPendingAge(t *testing.T) { MaxAge: 100 * time.Millisecond, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Create a mock clock. @@ -311,7 +311,7 @@ func TestManager_NextPendingAge(t *testing.T) { s, err := m.NextPending() require.NoError(t, err) require.NotNil(t, s) - require.Equal(t, 100*time.Millisecond, s.Writer.Age(s.Moved)) + require.Equal(t, 100*time.Millisecond, s.Writer.Age(clock.Now())) m.Put(s) // Append 1KB of data using two separate append requests, 1ms apart. @@ -342,7 +342,7 @@ func TestManager_NextPendingAge(t *testing.T) { s, err = m.NextPending() require.NoError(t, err) require.NotNil(t, s) - require.Equal(t, time.Millisecond, s.Writer.Age(s.Moved)) + require.Equal(t, time.Millisecond, s.Writer.Age(clock.Now())) } func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { @@ -350,7 +350,7 @@ func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { MaxAge: 100 * time.Millisecond, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Create a mock clock. @@ -392,7 +392,7 @@ func TestManager_NextPendingWALClosed(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // Append some data. @@ -435,7 +435,7 @@ func TestManager_Put(t *testing.T) { MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) + }, NewManagerMetrics(nil)) require.NoError(t, err) // There should be 1 available and 0 pending segments. @@ -482,7 +482,7 @@ func TestManager_Metrics(t *testing.T) { m, err := NewManager(Config{ MaxSegments: 1, MaxSegmentSize: 1024, // 1KB - }, NewMetrics(r)) + }, NewManagerMetrics(r)) require.NoError(t, err) metricNames := []string{ diff --git a/pkg/storage/wal/metrics.go b/pkg/storage/wal/metrics.go index a0c0676c6c81c..194580959d518 100644 --- a/pkg/storage/wal/metrics.go +++ b/pkg/storage/wal/metrics.go @@ -7,8 +7,8 @@ import ( type ManagerMetrics struct { NumAvailable prometheus.Gauge - NumPending prometheus.Gauge NumFlushing prometheus.Gauge + NumPending prometheus.Gauge } func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics { @@ -17,34 +17,35 @@ func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics { Name: "wal_segments_available", Help: "The number of WAL segments accepting writes.", }), - NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "wal_segments_pending", - Help: "The number of WAL segments waiting to be flushed.", - }), NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "wal_segments_flushing", Help: "The number of WAL segments being flushed.", }), + NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "wal_segments_pending", + Help: "The number of WAL segments waiting to be flushed.", + }), } } type SegmentMetrics struct { - outputSizeBytes prometheus.Histogram - inputSizeBytes prometheus.Histogram - streams prometheus.Histogram - tenants prometheus.Histogram + age prometheus.Histogram + size prometheus.Histogram + streams prometheus.Histogram + tenants prometheus.Histogram + writeSize prometheus.Histogram } func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics { return &SegmentMetrics{ - outputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_ingester_rf1_segment_output_size_bytes", - Help: "The segment size as written to disk (compressed).", - Buckets: prometheus.ExponentialBuckets(100, 10, 8), + age: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_rf1_segment_age_seconds", + Help: "The segment age (time between first append and flush).", + Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), NativeHistogramBucketFactor: 1.1, }), - inputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_ingester_rf1_segment_input_size_bytes", + size: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_rf1_segment_size_bytes", Help: "The segment size (uncompressed).", Buckets: prometheus.ExponentialBuckets(100, 10, 8), NativeHistogramBucketFactor: 1.1, @@ -61,17 +62,11 @@ func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics { Buckets: prometheus.ExponentialBuckets(1, 2, 10), NativeHistogramBucketFactor: 1.1, }), - } -} - -type Metrics struct { - SegmentMetrics *SegmentMetrics - ManagerMetrics *ManagerMetrics -} - -func NewMetrics(r prometheus.Registerer) *Metrics { - return &Metrics{ - ManagerMetrics: NewManagerMetrics(r), - SegmentMetrics: NewSegmentMetrics(r), + writeSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_rf1_segment_write_size_bytes", + Help: "The segment size as written to disk (compressed).", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), + NativeHistogramBucketFactor: 1.1, + }), } } diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index a53763dfa15be..1ba28093eb074 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -47,7 +47,6 @@ type streamID struct { } type SegmentWriter struct { - metrics *SegmentMetrics streams map[streamID]*streamSegment buf1 encoding.Encbuf outputSize atomic.Int64 @@ -65,6 +64,45 @@ type SegmentWriter struct { lastAppend time.Time } +// SegmentStats contains the stats for a SegmentWriter. +type SegmentStats struct { + // Age is the time between the first append and the flush. + Age time.Duration + // Idle is the time between the last append and the flush. + Idle time.Duration + Streams int + Tenants int + Size int64 + WriteSize int64 +} + +// GetSegmentStats returns the stats for a SegmentWriter. The age of a segment +// is calculated from t. WriteSize is zero if GetSegmentStats is called before +// SegmentWriter.WriteTo. +func GetSegmentStats(w *SegmentWriter, t time.Time) SegmentStats { + tenants := make(map[string]struct{}, 64) + for _, s := range w.streams { + tenants[s.tenantID] = struct{}{} + } + return SegmentStats{ + Age: t.Sub(w.firstAppend), + Idle: t.Sub(w.lastAppend), + Streams: len(w.streams), + Tenants: len(tenants), + Size: w.inputSize.Load(), + WriteSize: w.outputSize.Load(), + } +} + +// ReportSegmentStats reports the stats as metrics. +func ReportSegmentStats(s SegmentStats, m *SegmentMetrics) { + m.age.Observe(s.Age.Seconds()) + m.streams.Observe(float64(s.Streams)) + m.tenants.Observe(float64(s.Tenants)) + m.size.Observe(float64(s.Size)) + m.writeSize.Observe(float64(s.WriteSize)) +} + type streamSegment struct { lbls labels.Labels entries []*logproto.Entry @@ -87,13 +125,12 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) { } // NewWalSegmentWriter creates a new WalSegmentWriter. -func NewWalSegmentWriter(m *SegmentMetrics) (*SegmentWriter, error) { +func NewWalSegmentWriter() (*SegmentWriter, error) { idxWriter, err := index.NewWriter() if err != nil { return nil, err } return &SegmentWriter{ - metrics: m, streams: make(map[streamID]*streamSegment, 64), buf1: encoding.EncWith(make([]byte, 0, 4)), idxWriter: idxWriter, @@ -159,19 +196,6 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels } } -// ReportMetrics for the writer. If called before WriteTo then the output size -// histogram will observe 0. -func (b *SegmentWriter) ReportMetrics() { - b.metrics.streams.Observe(float64(len(b.streams))) - tenants := make(map[string]struct{}, 64) - for _, s := range b.streams { - tenants[s.tenantID] = struct{}{} - } - b.metrics.tenants.Observe(float64(len(tenants))) - b.metrics.inputSizeBytes.Observe(float64(b.inputSize.Load())) - b.metrics.outputSizeBytes.Observe(float64(b.outputSize.Load())) -} - func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta { var globalMinT, globalMaxT int64 diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 15476f044331a..cbe42587bf7a2 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -105,7 +105,7 @@ func TestWalSegmentWriter_Append(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() // Create a new WalSegmentWriter - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + w, err := NewWalSegmentWriter() require.NoError(t, err) // Append the entries for _, batch := range tt.batches { @@ -132,7 +132,7 @@ func TestWalSegmentWriter_Append(t *testing.T) { } func TestMultiTenantWrite(t *testing.T) { - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + w, err := NewWalSegmentWriter() require.NoError(t, err) dst := bytes.NewBuffer(nil) @@ -202,7 +202,7 @@ func TestCompression(t *testing.T) { } func testCompression(t *testing.T, maxInputSize int64) { - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + w, err := NewWalSegmentWriter() require.NoError(t, err) dst := bytes.NewBuffer(nil) files := testdata.Files() @@ -259,7 +259,7 @@ func testCompression(t *testing.T, maxInputSize int64) { } func TestReset(t *testing.T) { - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + w, err := NewWalSegmentWriter() require.NoError(t, err) dst := bytes.NewBuffer(nil) @@ -290,7 +290,7 @@ func TestReset(t *testing.T) { } func Test_Meta(t *testing.T) { - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + w, err := NewWalSegmentWriter() buff := bytes.NewBuffer(nil) require.NoError(t, err) @@ -381,7 +381,7 @@ func BenchmarkWrites(b *testing.B) { dst := bytes.NewBuffer(make([]byte, 0, inputSize)) - writer, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + writer, err := NewWalSegmentWriter() require.NoError(b, err) for _, d := range data {