Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(blooms): Remove compression of .tar archived bloom blocks #14159

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/grafana/loki/v3/pkg/util/ring"
)

// TODO(chaudum): Make configurable via (per-tenant?) setting.
var blockCompressionAlgo = compression.EncNone

type Builder struct {
services.Service

Expand Down Expand Up @@ -404,7 +407,7 @@ func (b *Builder) processTask(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk)
built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarGz(buf, block.Reader()); err != nil {
if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

Expand Down Expand Up @@ -1019,7 +1019,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)
// logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: []string{"bloom/invalid/block.tar.gz"},
Blocks: []string{"bloom/invalid/block.tar"},
}

ctx := user.InjectOrgID(context.Background(), tenantID)
Expand Down
7 changes: 2 additions & 5 deletions pkg/compression/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Encoding byte
const (
EncNone Encoding = iota
EncGZIP
EncDumb
EncDumb // not supported
EncLZ4_64k
EncSnappy
EncLZ4_256k
Expand Down Expand Up @@ -41,8 +41,6 @@ func (e Encoding) String() string {
return "gzip"
case EncNone:
return "none"
case EncDumb:
return "dumb"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
Expand All @@ -58,7 +56,7 @@ func (e Encoding) String() string {
case EncZstd:
return "zstd"
default:
return "unknown"
panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding()))
}
}

Expand All @@ -70,7 +68,6 @@ func ParseEncoding(enc string) (Encoding, error) {
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())

}

// SupportedEncoding returns the list of supported Encoding.
Expand Down
50 changes: 50 additions & 0 deletions pkg/compression/fileext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package compression

import "fmt"

const (
ExtNone = ""
ExtGZIP = ".gz"
ExtSnappy = ".sz"
ExtLZ4 = ".lz4"
ExtFlate = ".zz"
ExtZstd = ".zst"
)

func ToFileExtension(e Encoding) string {
switch e {
case EncNone:
return ExtNone
case EncGZIP:
return ExtGZIP
case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M:
return ExtLZ4
case EncSnappy:
return ExtSnappy
case EncFlate:
return ExtFlate
case EncZstd:
return ExtZstd
default:
panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding()))
}
}

func FromFileExtension(ext string) Encoding {
switch ext {
case ExtNone:
return EncNone
case ExtGZIP:
return EncGZIP
case ExtLZ4:
return EncLZ4_4M
case ExtSnappy:
return EncSnappy
case ExtFlate:
return EncFlate
case ExtZstd:
return EncZstd
default:
panic(fmt.Sprintf("invalid file extension: %s", ext))
}
}
36 changes: 27 additions & 9 deletions pkg/storage/bloom/v1/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,34 @@ import (
"github.com/grafana/loki/v3/pkg/compression"
)

const (
ExtTar = ".tar"
)

type TarEntry struct {
Name string
Size int64
Body io.ReadSeeker
}

func TarGz(dst io.Writer, reader BlockReader) error {
func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error {
comprPool := compression.GetWriterPool(enc)
comprWriter := comprPool.GetWriter(dst)
defer func() {
comprWriter.Close()
comprPool.PutWriter(comprWriter)
}()

return Tar(comprWriter, reader)
}

func Tar(dst io.Writer, reader BlockReader) error {
itr, err := reader.TarEntries()
if err != nil {
return errors.Wrap(err, "error getting tar entries")
}

gzipper := compression.GetWriterPool(compression.EncGZIP).GetWriter(dst)
defer gzipper.Close()

tarballer := tar.NewWriter(gzipper)
tarballer := tar.NewWriter(dst)
defer tarballer.Close()

for itr.Next() {
Expand All @@ -49,13 +61,19 @@ func TarGz(dst io.Writer, reader BlockReader) error {
return itr.Err()
}

func UnTarGz(dst string, r io.Reader) error {
gzipper, err := compression.GetReaderPool(compression.EncGZIP).GetReader(r)
func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error {
comprPool := compression.GetReaderPool(enc)
comprReader, err := comprPool.GetReader(r)
if err != nil {
return errors.Wrap(err, "error getting gzip reader")
return errors.Wrapf(err, "error getting %s reader", enc.String())
}
defer comprPool.PutReader(comprReader)

return UnTar(dst, comprReader)
}

tarballer := tar.NewReader(gzipper)
func UnTar(dst string, r io.Reader) error {
tarballer := tar.NewReader(r)

for {
header, err := tarballer.Next()
Expand Down
91 changes: 88 additions & 3 deletions pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.EncNone,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
Expand All @@ -40,9 +40,9 @@ func TestArchive(t *testing.T) {
reader := NewDirectoryBlockReader(dir1)

w := bytes.NewBuffer(nil)
require.Nil(t, TarGz(w, reader))
require.Nil(t, Tar(w, reader))

require.Nil(t, UnTarGz(dir2, w))
require.Nil(t, UnTar(dir2, w))

reader2 := NewDirectoryBlockReader(dir2)

Expand Down Expand Up @@ -78,3 +78,88 @@ func TestArchive(t *testing.T) {
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)
}

func TestArchiveCompression(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
enc compression.Encoding
}{
{compression.EncNone},
{compression.EncGZIP},
{compression.EncSnappy},
{compression.EncLZ4_64k},
{compression.EncLZ4_256k},
{compression.EncLZ4_1M},
{compression.EncLZ4_4M},
{compression.EncFlate},
{compression.EncZstd},
} {
t.Run(tc.enc.String(), func(t *testing.T) {
// for writing files to two dirs for comparison and ensuring they're equal
dir1 := t.TempDir()
dir2 := t.TempDir()

numSeries := 100
data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
NewDirectoryBlockWriter(dir1),
)

require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

reader := NewDirectoryBlockReader(dir1)

w := bytes.NewBuffer(nil)
require.Nil(t, TarCompress(tc.enc, w, reader))

require.Nil(t, UnTarCompress(tc.enc, dir2, w))

reader2 := NewDirectoryBlockReader(dir2)

// Check Index is byte for byte equivalent
srcIndex, err := reader.Index()
require.Nil(t, err)
_, err = srcIndex.Seek(0, io.SeekStart)
require.Nil(t, err)
dstIndex, err := reader2.Index()
require.Nil(t, err)
_, err = dstIndex.Seek(0, io.SeekStart)
require.Nil(t, err)

srcIndexBytes, err := io.ReadAll(srcIndex)
require.Nil(t, err)
dstIndexBytes, err := io.ReadAll(dstIndex)
require.Nil(t, err)
require.Equal(t, srcIndexBytes, dstIndexBytes)

// Check Blooms is byte for byte equivalent
srcBlooms, err := reader.Blooms()
require.Nil(t, err)
_, err = srcBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)
dstBlooms, err := reader2.Blooms()
require.Nil(t, err)
_, err = dstBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)

srcBloomsBytes, err := io.ReadAll(srcBlooms)
require.Nil(t, err)
dstBloomsBytes, err := io.ReadAll(dstBlooms)
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)

})
}
}
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ func loadBlockDirectories(root string, logger log.Logger) (keys []string, values
return nil
}

ref, err := resolver.ParseBlockKey(key(path))
// The block file extension (.tar) needs to be added so the key can be parsed.
// This is because the extension is stripped off when the tar archive is extracted.
ref, err := resolver.ParseBlockKey(key(path + blockExtension))
if err != nil {
return nil
}

if ok, clean := isBlockDir(path, logger); ok {
// the cache key must not contain the directory prefix
// therefore we use the defaultKeyResolver to resolve the block's address
key := defaultKeyResolver{}.Block(ref).Addr()
key := cacheKey(ref)
keys = append(keys, key)
values = append(values, NewBlockDirectory(ref, path))
level.Debug(logger).Log("msg", "found block directory", "path", path, "key", key)
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
)
Expand Down Expand Up @@ -63,7 +64,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
wd := t.TempDir()

// plain file
fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz"))
ext := blockExtension + compression.ExtGZIP
fp, _ := os.Create(filepath.Join(wd, "regular-file"+ext))
fp.Close()

// invalid directory
Expand Down Expand Up @@ -99,8 +101,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {

require.Equal(t, 1, len(c.entries))

key := validDir + ".tar.gz" // cache key must not contain directory prefix
elem, found := c.entries[key]
// cache key does neither contain directory prefix nor file extension suffix
elem, found := c.entries[validDir]
require.True(t, found)
blockDir := elem.Value.(*Entry).Value
require.Equal(t, filepath.Join(wd, validDir), blockDir.Path)
Expand Down
Loading
Loading