From b44ef33084260c305189d3f32558f0e01bf80e6a Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 26 Jul 2024 16:12:32 -0600 Subject: [PATCH] feat: instrument failed chunk encoding/decoding (#13684) (cherry picked from commit 5a87ccb648ee3bf48a3704643ae9923d64651aed) --- pkg/ingester/flush.go | 13 ++++++--- pkg/ingester/metrics.go | 30 +++++++++++++++---- pkg/storage/chunk/chunk.go | 35 +++++++++++++++++++++-- pkg/storage/chunk/client/metrics.go | 17 +++++++++++ pkg/storage/chunk/client/object_client.go | 10 ++++++- 5 files changed, 92 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e6e22f72f097e..d851d9e4addec 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -2,6 +2,7 @@ package ingester import ( "bytes" + "errors" "fmt" "net/http" "sync" @@ -62,7 +63,7 @@ func (i *Ingester) Flush() { } // TransferOut implements ring.FlushTransferer -// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more. +// Noop implementation because ingesters have a WAL now that does not require transferring chunks any more. // We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so. func (i *Ingester) TransferOut(_ context.Context) error { return ring.ErrTransferDisabled @@ -179,7 +180,6 @@ func (i *Ingester) flushLoop(j int) { m := util_log.WithUserID(op.userID, l) err := i.flushOp(m, op) - if err != nil { level.Error(m).Log("msg", "failed to flush", "err", err) } @@ -410,10 +410,15 @@ func (i *Ingester) encodeChunk(ctx context.Context, ch *chunk.Chunk, desc *chunk } start := time.Now() chunkBytesSize := desc.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header - if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil { - return fmt.Errorf("chunk encoding: %w", err) + if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize)), i.logger); err != nil { + if !errors.Is(err, chunk.ErrChunkDecode) { + return fmt.Errorf("chunk encoding: %w", err) + } + + i.metrics.chunkDecodeFailures.WithLabelValues(ch.UserID).Inc() } i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds()) + i.metrics.chunksEncoded.WithLabelValues(ch.UserID).Inc() return nil } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index ad190285ccd08..ff4db43747676 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -50,6 +50,8 @@ type ingesterMetrics struct { chunksFlushFailures prometheus.Counter chunksFlushedPerReason *prometheus.CounterVec chunkLifespan prometheus.Histogram + chunksEncoded *prometheus.CounterVec + chunkDecodeFailures *prometheus.CounterVec flushedChunksStats *analytics.Counter flushedChunksBytesStats *analytics.Statistics flushedChunksLinesStats *analytics.Statistics @@ -252,12 +254,28 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges // 1h -> 8hr Buckets: prometheus.LinearBuckets(1, 1, 8), }), - flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"), - flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"), - flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"), - flushedChunksAgeStats: analytics.NewStatistics("ingester_flushed_chunks_age_seconds"), - flushedChunksLifespanStats: analytics.NewStatistics("ingester_flushed_chunks_lifespan_seconds"), - flushedChunksUtilizationStats: analytics.NewStatistics("ingester_flushed_chunks_utilization"), + chunksEncoded: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "ingester_chunks_encoded_total", + Help: "The total number of chunks encoded in the ingester.", + }, []string{"user"}), + chunkDecodeFailures: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "ingester_chunk_decode_failures_total", + Help: "The number of freshly encoded chunks that failed to decode.", + }, []string{"user"}), + flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"), + flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"), + flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"), + flushedChunksAgeStats: analytics.NewStatistics( + "ingester_flushed_chunks_age_seconds", + ), + flushedChunksLifespanStats: analytics.NewStatistics( + "ingester_flushed_chunks_lifespan_seconds", + ), + flushedChunksUtilizationStats: analytics.NewStatistics( + "ingester_flushed_chunks_utilization", + ), chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "ingester_chunks_created_total", diff --git a/pkg/storage/chunk/chunk.go b/pkg/storage/chunk/chunk.go index e807b5fb87798..6f050f8cbd01d 100644 --- a/pkg/storage/chunk/chunk.go +++ b/pkg/storage/chunk/chunk.go @@ -3,6 +3,7 @@ package chunk import ( "bytes" "encoding/binary" + "fmt" "hash/crc32" "reflect" "strconv" @@ -12,6 +13,8 @@ import ( errs "errors" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" @@ -19,6 +22,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/logproto" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var ( @@ -27,6 +31,7 @@ var ( ErrMetadataLength = errs.New("chunk metadata wrong length") ErrDataLength = errs.New("chunk data wrong length") ErrSliceOutOfRange = errs.New("chunk can't be sliced out of its data range") + ErrChunkDecode = errs.New("error decoding freshly created chunk") ) var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) @@ -227,11 +232,11 @@ var writerPool = sync.Pool{ // Encode writes the chunk into a buffer, and calculates the checksum. func (c *Chunk) Encode() error { - return c.EncodeTo(nil) + return c.EncodeTo(nil, util_log.Logger) } // EncodeTo is like Encode but you can provide your own buffer to use. -func (c *Chunk) EncodeTo(buf *bytes.Buffer) error { +func (c *Chunk) EncodeTo(buf *bytes.Buffer, log log.Logger) error { if buf == nil { buf = bytes.NewBuffer(nil) } @@ -275,6 +280,32 @@ func (c *Chunk) EncodeTo(buf *bytes.Buffer) error { // Now work out the checksum c.encoded = buf.Bytes() c.Checksum = crc32.Checksum(c.encoded, castagnoliTable) + + newCh := Chunk{ + ChunkRef: logproto.ChunkRef{ + UserID: c.UserID, + Fingerprint: c.Fingerprint, + From: c.From, + Through: c.Through, + Checksum: c.Checksum, + }, + } + + if err := newCh.Decode(NewDecodeContext(), c.encoded); err != nil { + externalKey := fmt.Sprintf( + "%s/%x/%x:%x:%x", + c.UserID, + c.Fingerprint, + int64(c.From), + int64(c.Through), + c.Checksum, + ) + level.Error(log). + Log("msg", "error decoding freshly created chunk", "err", err, "key", externalKey) + + return ErrChunkDecode + } + return nil } diff --git a/pkg/storage/chunk/client/metrics.go b/pkg/storage/chunk/client/metrics.go index 76ca20a1bac5f..dfe789e7af2e3 100644 --- a/pkg/storage/chunk/client/metrics.go +++ b/pkg/storage/chunk/client/metrics.go @@ -29,6 +29,7 @@ type ChunkClientMetrics struct { chunksSizePutPerUser *prometheus.CounterVec chunksFetchedPerUser *prometheus.CounterVec chunksSizeFetchedPerUser *prometheus.CounterVec + chunkDecodeFailures *prometheus.CounterVec } func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics { @@ -53,6 +54,11 @@ func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics { Name: "chunk_store_fetched_chunk_bytes_total", Help: "Total bytes fetched in chunks per user.", }, []string{"user"}), + chunkDecodeFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "chunk_store_decode_failures_total", + Help: "Total chunk decoding failures.", + }, []string{"user"}), } } @@ -85,6 +91,17 @@ func (c MetricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) func (c MetricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { chks, err := c.Client.GetChunks(ctx, chunks) if err != nil { + // Get chunks fetches chunks in parallel, and returns any error. As a result we don't know which chunk failed, + // so we increment the metric for all tenants with chunks in the request. I think in practice we're only ever + // fetching chunks for a single tenant at a time anyway? + affectedUsers := map[string]struct{}{} + for _, chk := range chks { + affectedUsers[chk.UserID] = struct{}{} + } + for user := range affectedUsers { + c.metrics.chunkDecodeFailures.WithLabelValues(user).Inc() + } + return chks, err } diff --git a/pkg/storage/chunk/client/object_client.go b/pkg/storage/chunk/client/object_client.go index 460c9566f6e76..9a57b769d67c3 100644 --- a/pkg/storage/chunk/client/object_client.go +++ b/pkg/storage/chunk/client/object_client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "fmt" "io" "strings" "time" @@ -185,7 +186,14 @@ func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex } if err := c.Decode(decodeContext, buf.Bytes()); err != nil { - return chunk.Chunk{}, errors.WithStack(err) + return chunk.Chunk{}, errors.WithStack( + fmt.Errorf( + "failed to decode chunk '%s' for tenant `%s`: %w", + key, + c.ChunkRef.UserID, + err, + ), + ) } return c, nil }