diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 328ec9b323d05..89685c73522ba 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -695,16 +695,21 @@ func (si *bufferedIterator) close() { current.BytesDecompressed += si.bytesDecompressed current.BytesCompressed += int64(len(si.origBytes)) }) - si.pool.PutReader(si.reader) - BufReaderPool.Put(si.bufReader) + if si.reader != nil { + si.pool.PutReader(si.reader) + si.reader = nil + } + if si.bufReader != nil { + BufReaderPool.Put(si.bufReader) + si.bufReader = nil + } + if si.buf != nil { BytesBufferPool.Put(si.buf) + si.buf = nil } si.origBytes = nil - si.bufReader = nil - si.buf = nil si.decBuf = nil - si.reader = nil } func (si *bufferedIterator) Labels() string { return "" } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 4bd2fe9acaf36..af1fc8eafc1bb 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -13,6 +13,7 @@ import ( "github.com/dustin/go-humanize" "github.com/grafana/loki/pkg/chunkenc/testdata" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/require" ) @@ -394,6 +395,47 @@ func TestChunkSize(t *testing.T) { } } +func TestIteratorClose(t *testing.T) { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + for _, test := range []func(iter iter.EntryIterator, t *testing.T){ + func(iter iter.EntryIterator, t *testing.T) { + // close without iterating + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + func(iter iter.EntryIterator, t *testing.T) { + // close after iterating + for iter.Next() { + _ = iter.Entry() + } + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + func(iter iter.EntryIterator, t *testing.T) { + // close after a single iteration + iter.Next() + _ = iter.Entry() + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + } { + c := NewMemChunk(enc) + inserted := fillChunk(c) + iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil) + if err != nil { + t.Fatal(err) + } + test(iter, t) + } + + }) + } +} + var result []Chunk func BenchmarkWrite(b *testing.B) { diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 51736ee82ed95..9e48edacbb4ab 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -522,7 +522,12 @@ func (i *entryIteratorBackward) Entry() logproto.Entry { return i.cur } -func (i *entryIteratorBackward) Close() error { return nil } +func (i *entryIteratorBackward) Close() error { + if !i.loaded { + return i.forwardIter.Close() + } + return nil +} func (i *entryIteratorBackward) Error() error { return nil } @@ -608,7 +613,12 @@ func (i *entryIteratorForward) Entry() logproto.Entry { return i.cur.entry } -func (i *entryIteratorForward) Close() error { return nil } +func (i *entryIteratorForward) Close() error { + if !i.loaded { + return i.backwardIter.Close() + } + return nil +} func (i *entryIteratorForward) Error() error { return nil }