Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

less opaque chunk keys on fs with v12 #5291

Merged
merged 2 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/chunk/local/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (f *fixture) Clients() (
return
}

chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder, chunk.SchemaConfig{})
chunkClient = objectclient.NewClient(oClient, objectclient.FSEncoder, chunk.SchemaConfig{})

tableClient, err = NewTableClient(f.dirname)
if err != nil {
Expand Down
39 changes: 30 additions & 9 deletions pkg/storage/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/base64"
"strings"

"github.com/pkg/errors"

Expand All @@ -13,14 +14,30 @@ import (

// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
// from the underlying ObjectClient
type KeyEncoder func(string) string
// Schema/Chunk are passed as arguments to allow this to improve over revisions
type KeyEncoder func(schema chunk.SchemaConfig, chk chunk.Chunk) string

// Base64Encoder is used to encode chunk keys in base64 before storing/retrieving
// base64Encoder is used to encode chunk keys in base64 before storing/retrieving
// them from the ObjectClient
var Base64Encoder = func(key string) string {
var base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}

var FSEncoder = func(schema chunk.SchemaConfig, chk chunk.Chunk) string {
// Filesystem encoder pre-v12 encodes the chunk as one base64 string.
// This has the downside of making them opaque and storing all chunks in a single
// directory, hurting performance at scale and discoverability.
// Post v12, we respect the directory structure imposed by chunk keys.
key := schema.ExternalKey(chk)
if schema.VersionForChunk(chk) > 11 {
split := strings.LastIndexByte(key, '/')
encodedTail := base64Encoder(key[split+1:])
return strings.Join([]string{key[:split], encodedTail}, "/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we use filepath.Join() here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use filepath to build the keys when using fsObjectClient because it converts / to os specific separator using filepath.FromSlash and filepath.ToSlash. Basically, you would always use a / when using fsObjectClient , irrespective of os type .

If at all, we could use path.Join with always uses /, but it does not seem necessary since we are anyways working with a / above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it converts / to os specific separator using filepath.FromSlash and filepath.ToSlash.

That's why I thought we should use it, to have files in individual directories, also on windows.
With strings.Join() it would result in files all in the same root directory? But on unix system it would be organized in subfolders.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I get something wrong here.

Copy link
Member Author

@owen-d owen-d Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What @chaudum said seems sound to me. The only time I can think where this would be problematic is mounting the same file system on different OS's (i.e. migrating on an NFS mounted Loki deployment), but I'm not sure that's something we can reasonably protect against. I'll merge this one as is in the meantime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think it's weird to have different file/directory structure on Unix and Windows.


}
return base64Encoder(key)
}

const defaultMaxParallel = 150

// Client is used to store chunks in object store backends
Expand Down Expand Up @@ -56,7 +73,6 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
var (
chunkKeys []string
chunkBufs [][]byte
key string
)

for i := range chunks {
Expand All @@ -65,10 +81,11 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
return err
}

key = o.schema.ExternalKey(chunks[i])

var key string
if o.keyEncoder != nil {
key = o.keyEncoder(key)
key = o.keyEncoder(o.schema, chunks[i])
} else {
key = o.schema.ExternalKey(chunks[i])
}

chunkKeys = append(chunkKeys, key)
Expand Down Expand Up @@ -109,7 +126,7 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex

key := o.schema.ExternalKey(c)
if o.keyEncoder != nil {
key = o.keyEncoder(key)
key = o.keyEncoder(o.schema, c)
}

readCloser, size, err := o.store.GetObject(ctx, key)
Expand Down Expand Up @@ -137,7 +154,11 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex
func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
key := chunkID
if o.keyEncoder != nil {
key = o.keyEncoder(key)
c, err := chunk.ParseExternalKey(userID, key)
if err != nil {
return err
}
key = o.keyEncoder(o.schema, c)
}
return o.store.DeleteObject(ctx, key)
}
Expand Down
79 changes: 79 additions & 0 deletions pkg/storage/chunk/objectclient/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package objectclient

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk"
)

func MustParseDayTime(s string) chunk.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return chunk.DayTime{
Time: model.TimeFromUnix(t.Unix()),
}
}

func TestFSEncoder(t *testing.T) {
schema := chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
Schema: "v11",
},
{
From: MustParseDayTime("2022-01-01"),
Schema: "v12",
},
},
}

// chunk that resolves to v11
oldChunk := chunk.Chunk{
UserID: "fake",
From: MustParseDayTime("2020-01-02").Time,
Through: MustParseDayTime("2020-01-03").Time,
Checksum: 123,
Fingerprint: 456,
ChecksumSet: true,
}

// chunk that resolves to v12
newChunk := chunk.Chunk{
UserID: "fake",
From: MustParseDayTime("2022-01-02").Time,
Through: MustParseDayTime("2022-01-03").Time,
Checksum: 123,
Fingerprint: 456,
ChecksumSet: true,
}

for _, tc := range []struct {
desc string
from string
exp string
}{
{
desc: "before v12 encodes entire chunk",
from: schema.ExternalKey(oldChunk),
exp: "ZmFrZS8xYzg6MTZmNjM4ZDQ0MDA6MTZmNjhiM2EwMDA6N2I=",
},
{
desc: "v12+ encodes encodes the non-directory trail",
from: schema.ExternalKey(newChunk),
exp: "fake/1c8/MTdlMTgxNWY4MDA6MTdlMWQzYzU0MDA6N2I=",
},
} {
t.Run(tc.desc, func(t *testing.T) {
chk, err := chunk.ParseExternalKey("fake", tc.from)
require.Nil(t, err)
require.Equal(t, tc.exp, FSEncoder(schema, chk))
})
}
}
8 changes: 8 additions & 0 deletions pkg/storage/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,14 @@ func (cfg SchemaConfig) ExternalKey(chunk Chunk) string {
}
}

// VersionForChunk will return the schema version associated with the `From` timestamp of a chunk.
// The schema and chunk must be valid+compatible as the errors are not checked.
func (cfg SchemaConfig) VersionForChunk(c Chunk) int {
p, _ := cfg.SchemaForTime(c.From)
v, _ := p.VersionAsInt()
return v
}

// pre-checksum
func (cfg SchemaConfig) legacyExternalKey(chunk Chunk) string {
// This is the inverse of chunk.parseLegacyExternalKey, with "<user id>/" prepended.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, clien
if err != nil {
return nil, err
}
return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk, schemaCfg), nil
return objectclient.NewClientWithMaxParallel(store, objectclient.FSEncoder, cfg.MaxParallelGetChunk, schemaCfg), nil
case StorageTypeGrpc:
return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
if c.cfg.RetentionEnabled {
var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
encoder = objectclient.FSEncoder
}

chunkClient := objectclient.NewClient(objectClient, encoder, schemaConfig.SchemaConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestChunkRewriter(t *testing.T) {
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk}))
store.Stop()

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.Base64Encoder, schemaCfg.SchemaConfig)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.FSEncoder, schemaCfg.SchemaConfig)
for _, indexTable := range store.indexTables() {
err := indexTable.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(local.IndexBucketName)
Expand Down Expand Up @@ -668,7 +668,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, len(tc.expectedDeletedSeries))

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.Base64Encoder, schemaCfg.SchemaConfig)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.FSEncoder, schemaCfg.SchemaConfig)

for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
Expand Down