diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fe5ff9d0422..1079db9cadd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ ##### Enhancements ##### Fixes +* [6152](https://github.com/grafana/loki/pull/6152) **slim-bean**: Fixes unbounded ingester memory growth when live tailing under specific circumstances. * [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values ##### Changes * [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4d034d7acb37..1be7d34fdd0a 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -28,8 +28,7 @@ type tailer struct { id uint32 orgID string matchers []*labels.Matcher - pipeline syntax.Pipeline - expr syntax.Expr + expr syntax.LogSelectorExpr pipelineMtx sync.Mutex sendChan chan *logproto.Stream @@ -52,7 +51,9 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta if err != nil { return nil, err } - pipeline, err := expr.Pipeline() + // Make sure we can build a pipeline. The stream processing code doesn't have a place to handle + // this error so make sure we handle it here. + _, err = expr.Pipeline() if err != nil { return nil, err } @@ -61,7 +62,6 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta return &tailer{ orgID: orgID, matchers: matchers, - pipeline: pipeline, sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), conn: conn, droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), @@ -135,8 +135,13 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { } func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { + // Build a new pipeline for each call because the pipeline builds a cache of labels + // and if we don't start with a new pipeline that cache will grow unbounded. + // The error is ignored because it would be handled in the constructor of the tailer. + pipeline, _ := t.expr.Pipeline() + // Optimization: skip filtering entirely, if no filter is set - if log.IsNoopPipeline(t.pipeline) { + if log.IsNoopPipeline(pipeline) { return []*logproto.Stream{&stream} } // pipeline are not thread safe and tailer can process multiple stream at once. @@ -145,7 +150,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log streams := map[uint64]*logproto.Stream{} - sp := t.pipeline.ForStream(lbs) + sp := pipeline.ForStream(lbs) for _, e := range stream.Entries { newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok {