Skip to content

Commit

Permalink
Remove compression of .tar bloom blocks
Browse files Browse the repository at this point in the history
Decompression is a CPU intensive task, especially un-gzipping. The gain
of compressing a tar archive of storage optimized binary blocks is
rather neglectable.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Sep 18, 2024
1 parent 7e6b4c2 commit bcbe6c7
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 103 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarGz(buf, block.Reader()); err != nil {
if err := v1.Tar(buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

Expand Down Expand Up @@ -1019,7 +1019,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)
// logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: []string{"bloom/invalid/block.tar.gz"},
Blocks: []string{"bloom/invalid/block.tar"},
}

ctx := user.InjectOrgID(context.Background(), tenantID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
wd := t.TempDir()

// plain file
fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz"))
fp, _ := os.Create(filepath.Join(wd, "regular-file.tar"))
fp.Close()

// invalid directory
Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {

require.Equal(t, 1, len(c.entries))

key := validDir + v1.ExtTarGz // cache key must not contain directory prefix
key := validDir + v1.ExtTar // cache key must not contain directory prefix
elem, found := c.entries[key]
require.True(t, found)
blockDir := elem.Value.(*Entry).Value
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ func BlockFrom(tenant, table string, blk *v1.Block) (Block, error) {

// TODO(owen-d): pool
buf := bytes.NewBuffer(nil)
err := v1.TarGz(buf, blk.Reader())
err := v1.Tar(buf, blk.Reader())

if err != nil {
return Block{}, errors.Wrap(err, "archiving+compressing block")
return Block{}, err
}

reader := bytes.NewReader(buf.Bytes())
Expand Down Expand Up @@ -321,14 +321,14 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector
defer rc.Close()

path := b.fsResolver.Block(ref).LocalPath()
// the block directory should not contain the .tar.gz extension
path = strings.TrimSuffix(path, v1.ExtTarGz)
// the block directory should not contain the .tar extension
path = strings.TrimSuffix(path, v1.ExtTar)
err = util.EnsureDirectory(path)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err)
}

err = v1.UnTarGz(path, rc)
err = v1.UnTar(path, rc)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min
day := start.Unix() / step

tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
fp, _ := os.CreateTemp(t.TempDir(), "*.tar")

blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)

err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
err = v1.Tar(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)

_, _ = fp.Seek(0, 0)
Expand Down Expand Up @@ -277,13 +277,13 @@ func TestBloomClient_PutBlock(t *testing.T) {
start := parseTime("2024-02-05 12:00")

tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
fp, _ := os.CreateTemp(t.TempDir(), "*.tar")

blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)

err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
err = v1.Tar(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)

block := Block{
Expand Down
29 changes: 0 additions & 29 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils.go

This file was deleted.

49 changes: 0 additions & 49 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ func (f *Fetcher) loadBlocksFromFS(_ context.Context, refs []BlockRef) ([]BlockD

for _, ref := range refs {
path := f.localFSResolver.Block(ref).LocalPath()
// the block directory does not contain the .tar.gz extension
// the block directory does not contain the .tar extension
// since it is stripped when the archive is extracted into a folder
path = strings.TrimSuffix(path, v1.ExtTarGz)
path = strings.TrimSuffix(path, v1.ExtTar)
if ok, clean := f.isBlockDir(path); ok {
blockDirs = append(blockDirs, NewBlockDirectory(ref, path))
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,9 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) {
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}},
}
dirs := []string{
strings.TrimSuffix(resolver.Block(refs[0]).LocalPath(), v1.ExtTarGz),
strings.TrimSuffix(resolver.Block(refs[1]).LocalPath(), v1.ExtTarGz),
strings.TrimSuffix(resolver.Block(refs[2]).LocalPath(), v1.ExtTarGz),
strings.TrimSuffix(resolver.Block(refs[0]).LocalPath(), v1.ExtTar),
strings.TrimSuffix(resolver.Block(refs[1]).LocalPath(), v1.ExtTar),
strings.TrimSuffix(resolver.Block(refs[2]).LocalPath(), v1.ExtTar),
}

createBlockDir(t, dirs[1])
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
BlocksPrefix = "blocks"

metaExtension = ".json"
blockExtension = v1.ExtTarGz
blockExtension = v1.ExtTar
)

// KeyResolver is an interface for resolving keys to locations.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestResolver_ParseBlockKey(t *testing.T) {
// encode block ref as string
loc := r.Block(ref)
path := loc.LocalPath()
require.Equal(t, "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd.tar.gz", path)
require.Equal(t, "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd.tar", path)

// parse encoded string into block ref
parsed, err := r.ParseBlockKey(key(path))
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) {
loc := r.Meta(metaRef)
require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath())
})

t.Run("multiple prefixes", func(t *testing.T) {
Expand All @@ -96,6 +96,6 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) {
loc := r.Meta(metaRef)
require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath())
})
}
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min

func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) {
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
fp, _ := os.CreateTemp(t.TempDir(), "*.tar")

blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)

err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
err = v1.Tar(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)

_, _ = fp.Seek(0, 0)
Expand Down

0 comments on commit bcbe6c7

Please sign in to comment.