Skip to content

Commit

Permalink
chore: Move compression tooling into separate package
Browse files Browse the repository at this point in the history
Compression tooling has been part of the chunkenc (chunk encoding)
package in the past for legacy reasons.

Since more components use this now, it's easier to keep it in a separate
package. This also eliminates the confusion around "encoding", since
this has been incorrectly used synonymously with "compression" in the past.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Sep 18, 2024
1 parent 02b26a8 commit 01ef96d
Show file tree
Hide file tree
Showing 53 changed files with 694 additions and 655 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -333,7 +333,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to get client: %w", err)
}

blockEnc, err := chunkenc.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser

func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, enc := range []chunkenc.Encoding{chunkenc.EncNone, chunkenc.EncGZIP, chunkenc.EncSnappy} {
for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
baseStore "github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()

decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP)
decompressorPool := compression.GetReaderPool(compression.EncGZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0)
blockOpts := v1.NewBlockOptions(compression.EncNone, 4, 1, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

func (c *dumbChunk) Encoding() Encoding { return EncNone }
func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
Expand Down
84 changes: 2 additions & 82 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -48,86 +48,6 @@ func IsOutOfOrderErr(err error) bool {
return err == ErrOutOfOrder || IsErrTooFarBehind(err)
}

// Encoding is the identifier for a chunk encoding.
type Encoding byte

// The different available encodings.
// Make sure to preserve the order, as these numeric values are written to the chunks!
const (
EncNone Encoding = iota
EncGZIP
EncDumb
EncLZ4_64k
EncSnappy
EncLZ4_256k
EncLZ4_1M
EncLZ4_4M
EncFlate
EncZstd
)

var supportedEncoding = []Encoding{
EncNone,
EncGZIP,
EncLZ4_64k,
EncSnappy,
EncLZ4_256k,
EncLZ4_1M,
EncLZ4_4M,
EncFlate,
EncZstd,
}

func (e Encoding) String() string {
switch e {
case EncGZIP:
return "gzip"
case EncNone:
return "none"
case EncDumb:
return "dumb"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
return "lz4-256k"
case EncLZ4_1M:
return "lz4-1M"
case EncLZ4_4M:
return "lz4"
case EncSnappy:
return "snappy"
case EncFlate:
return "flate"
case EncZstd:
return "zstd"
default:
return "unknown"
}
}

// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
func ParseEncoding(enc string) (Encoding, error) {
for _, e := range supportedEncoding {
if strings.EqualFold(e.String(), enc) {
return e, nil
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())

}

// SupportedEncoding returns the list of supported Encoding.
func SupportedEncoding() string {
var sb strings.Builder
for i := range supportedEncoding {
sb.WriteString(supportedEncoding[i].String())
if i != len(supportedEncoding)-1 {
sb.WriteString(", ")
}
}
return sb.String()
}

// Chunk is the interface for the compressed logs chunk format.
type Chunk interface {
Bounds() (time.Time, time.Time)
Expand All @@ -148,7 +68,7 @@ type Chunk interface {
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() Encoding
Encoding() compression.Encoding
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

Expand Down
23 changes: 0 additions & 23 deletions pkg/chunkenc/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestParseEncoding(t *testing.T) {
tests := []struct {
enc string
want Encoding
wantErr bool
}{
{"gzip", EncGZIP, false},
{"bad", 0, true},
}
for _, tt := range tests {
t.Run(tt.enc, func(t *testing.T) {
got, err := ParseEncoding(tt.enc)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ParseEncoding() = %v, want %v", got, tt.want)
}
})
}
}

func TestIsOutOfOrderErr(t *testing.T) {
now := time.Now()

Expand Down
35 changes: 18 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -131,7 +132,7 @@ type MemChunk struct {
head HeadBlock

format byte
encoding Encoding
encoding compression.Encoding
headFmt HeadBlockFmt

// compressed size of chunk. Set when chunk is cut or while decoding chunk from storage.
Expand Down Expand Up @@ -196,7 +197,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error
return false, nil
}

func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
func (hb *headBlock) Serialise(pool compression.WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -354,7 +355,7 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}

Expand All @@ -369,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
}

// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)

symbolizer := newSymbolizer()
Expand Down Expand Up @@ -413,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
bc.encoding = EncGZIP
bc.encoding = compression.EncGZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := Encoding(db.byte())
enc := compression.Encoding(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
Expand Down Expand Up @@ -535,7 +536,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
if fromCheckpoint {
bc.symbolizer = symbolizerFromCheckpoint(lb)
} else {
symbolizer, err := symbolizerFromEnc(lb, GetReaderPool(bc.encoding))
symbolizer, err := symbolizerFromEnc(lb, compression.GetReaderPool(bc.encoding))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -653,7 +654,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
}
} else {
var err error
n, crcHash, err = c.symbolizer.SerializeTo(w, GetWriterPool(c.encoding))
n, crcHash, err = c.symbolizer.SerializeTo(w, compression.GetWriterPool(c.encoding))
if err != nil {
return offset, errors.Wrap(err, "write structured metadata")
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
func (c *MemChunk) Encoding() compression.Encoding {
return c.encoding
}

Expand Down Expand Up @@ -941,7 +942,7 @@ func (c *MemChunk) cut() error {
return nil
}

b, err := c.head.Serialise(GetWriterPool(c.encoding))
b, err := c.head.Serialise(compression.GetWriterPool(c.encoding))
if err != nil {
return err
}
Expand Down Expand Up @@ -1172,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
enc Encoding
enc compression.Encoding
format byte
symbolizer *symbolizer
block
Expand All @@ -1182,14 +1183,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite
if len(b.b) == 0 {
return iter.NoopEntryIterator
}
return newEntryIterator(ctx, GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
return newEntryIterator(ctx, compression.GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
}

func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopSampleIterator
}
return newSampleIterator(ctx, GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
return newSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
}

func (b block) Offset() int {
Expand Down Expand Up @@ -1339,7 +1340,7 @@ type bufferedIterator struct {
stats *stats.Context

reader io.Reader
pool ReaderPool
pool compression.ReaderPool
symbolizer *symbolizer

err error
Expand All @@ -1358,7 +1359,7 @@ type bufferedIterator struct {
closed bool
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(len(b)))
return &bufferedIterator{
Expand Down Expand Up @@ -1619,7 +1620,7 @@ func (si *bufferedIterator) close() {
si.origBytes = nil
}

func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator {
func newEntryIterator(ctx context.Context, pool compression.ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator {
return &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
pipeline: pipeline,
Expand Down Expand Up @@ -1671,7 +1672,7 @@ func (e *entryBufferedIterator) Close() error {
return e.bufferedIterator.Close()
}

func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
func newSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
return &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
extractor: extractor,
Expand Down
Loading

0 comments on commit 01ef96d

Please sign in to comment.