Skip to content

Commit

Permalink
Handle not found metas gracefully
Browse files Browse the repository at this point in the history
There is a time window between between listing metas and fetching them
from object storage which could lead to a race condition that the meta
is not found in object storage, because it was deleted and superseded by
a newer meta.

This can happen when querying recent bloom data, that is still subject
to updates.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed May 2, 2024
1 parent 48bbf98 commit 2342f8d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
19 changes: 11 additions & 8 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -385,31 +386,33 @@ func (b *BloomClient) Stop() {

func (b *BloomClient) GetMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) {
results := make([]Meta, len(refs))
err := concurrency.ForEachJob(ctx, len(refs), b.concurrency, func(ctx context.Context, idx int) error {
_ = concurrency.ForEachJob(ctx, len(refs), b.concurrency, func(ctx context.Context, idx int) error {
meta, err := b.GetMeta(ctx, refs[idx])
if err != nil {
return err
level.Error(b.logger).Log("msg", "failed to get meta", "err", err)
}
results[idx] = meta
return nil
})
return results, err
return results, nil
}

// GetMeta fetches the meta file for given MetaRef from object storage and
// decodes the JSON data into a Meta.
// If the meta file is not found in storage or decoding fails, the empty Meta
// is returned along with the error.
func (b *BloomClient) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) {
meta := Meta{
MetaRef: ref,
}
meta := Meta{MetaRef: ref}
key := b.KeyResolver.Meta(ref).Addr()
reader, _, err := b.client.GetObject(ctx, key)
if err != nil {
return Meta{}, fmt.Errorf("failed to get meta file%s: %w", key, err)
return meta, fmt.Errorf("failed to fetch meta file %s: %w", key, err)
}
defer reader.Close()

err = json.NewDecoder(reader).Decode(&meta)
if err != nil {
return Meta{}, fmt.Errorf("failed to decode meta file %s: %w", key, err)
return meta, fmt.Errorf("failed to decode meta file %s: %w", key, err)
}
return meta, nil
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,20 @@ func TestBloomClient_GetMetas(t *testing.T) {
require.Equal(t, metas, []Meta{m1, m2})
})

t.Run("does not exist", func(t *testing.T) {
metas, err := c.GetMetas(ctx, []MetaRef{{}})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
require.Equal(t, metas, []Meta{{}})
t.Run("does not exist - yields empty meta", func(t *testing.T) {
ref := MetaRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.FingerprintBounds{},
StartTimestamp: 1000,
EndTimestamp: 2000,
Checksum: 1234,
},
}
metas, err := c.GetMetas(ctx, []MetaRef{ref})
require.NoError(t, err)
require.Equal(t, metas, []Meta{{MetaRef: ref}})
})
}

Expand Down

0 comments on commit 2342f8d

Please sign in to comment.