Skip to content

Commit

Permalink
Handling CMK AccessDenied errors (#5420)
Browse files Browse the repository at this point in the history
* Handling CMK Errors

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* lint

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* add test on BucketStores

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fixing race

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* lint

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Implementing error handling on labels apis

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* handling errros from thanos SG

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* creating IsOneOfTheExpectedErrors func

Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
  • Loading branch information
alanprot authored Jul 3, 2023
1 parent 1dcb72d commit e2e5bcf
Show file tree
Hide file tree
Showing 41 changed files with 767 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43
github.com/uber/jaeger-client-go v2.30.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd h1:asQ0HomkaUXZuR3J7daBEusMS++3hkYsYM6u8gpmPWM=
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 h1:UHyTPGdDHAoNHuSce5cJ2vEi6g1v8D5ZFBWZ61uTHSM=
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// Give up cleaning if we get access denied
level.Warn(userLogger).Log("msg", err.Error())
return nil
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
return err
}
Expand Down
49 changes: 32 additions & 17 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package compactor
import (
"context"
"crypto/rand"
"errors"
"fmt"
"path"
"strings"
Expand All @@ -17,14 +16,12 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -57,6 +54,37 @@ func TestBlocksCleaner(t *testing.T) {
}
}

func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"

bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

// Create blocks.
ctx := context.Background()
deletionDelay := 12 * time.Hour
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
}

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
err := cleaner.cleanUser(ctx, userID, true)
require.NoError(t, err)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)

Expand Down Expand Up @@ -254,7 +282,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
createDeletionMark(t, bucketClient, userID, block4, now.Add(-deletionDelay).Add(-time.Hour))

// To emulate a failure deleting a block, we wrap the bucket client in a mocked one.
bucketClient = &mockBucketFailure{
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
DeleteFailures: []string{path.Join(userID, block3.String(), metadata.MetaFilename)},
}
Expand Down Expand Up @@ -658,19 +686,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
}
}

type mockBucketFailure struct {
objstore.Bucket

DeleteFailures []string
}

func (m *mockBucketFailure) Delete(ctx context.Context, name string) error {
if util.StringsContain(m.DeleteFailures, name) {
return errors.New("mocked delete failure")
}
return m.Bucket.Delete(ctx, name)
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -62,6 +64,11 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,
// so the bucket index hasn't been created yet.
return nil, nil, nil
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
return nil, nil, validation.AccessDeniedError(err.Error())
}

if err != nil {
return nil, nil, err
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/blocks_finder_bucket_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -241,3 +243,21 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn

return finder
}

func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

bkt = &cortex_testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
}

finder := prepareBucketIndexBlocksFinder(t, bkt)

_, _, err := finder.GetBlocks(context.Background(), userID, 0, 100)
expected := validation.AccessDeniedError("error")
require.IsType(t, expected, err)
}
3 changes: 3 additions & 0 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func TranslateToPromqlAPIError(err error) error {
case validation.LimitError:
// This will be returned with status code 422 by Prometheus API.
return err
case validation.AccessDeniedError:
// This will be returned with status code 422 by Prometheus API.
return err
default:
if errors.Is(err, context.Canceled) {
return err // 422
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func TestApiStatusCodes(t *testing.T) {
expectedCode: 422,
},

{
err: validation.AccessDeniedError("access denied"),
expectedString: "access denied",
expectedCode: 422,
},

{
err: promql.ErrTooManySamples("query execution"),
expectedString: "too many samples",
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")

ErrCustomerManagedKeyAccessDenied = errors.New("access denied: customer key")
)

// Config holds configuration for accessing long-term storage.
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/bucket/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"github.com/thanos-io/objstore"
)

var errObjectDoesNotExist = errors.New("object does not exist")
var (
errObjectDoesNotExist = errors.New("object does not exist")
errKeyPermissionDenied = errors.New("object key permission denied")
)

// ClientMock mocks objstore.Bucket
type ClientMock struct {
Expand Down Expand Up @@ -175,6 +178,11 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool {
return err == errObjectDoesNotExist
}

// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError()
func (m *ClientMock) IsCustomerManagedKeyError(err error) bool {
return err == errKeyPermissionDenied
}

// ObjectSize mocks objstore.Bucket.Attributes()
func (m *ClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
args := m.Called(ctx, name)
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/bucket/prefixed_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,23 @@ func (b *PrefixedBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked.
func (b *PrefixedBucketClient) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

// Attributes returns attributes of the specified object.
func (b *PrefixedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, b.fullName(name))
}

// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *PrefixedBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.WithExpectedErrs(fn)
}

// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *PrefixedBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation
if lastErr == nil {
return nil
}
if b.bucket.IsObjNotFoundErr(lastErr) {
if b.bucket.IsObjNotFoundErr(lastErr) || b.bucket.IsCustomerManagedKeyError(lastErr) {
return lastErr
}
retries.Wait()
Expand Down Expand Up @@ -194,6 +194,10 @@ func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

func (b *BucketWithRetries) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

func (b *BucketWithRetries) Close() error {
return b.bucket.Close()
}
57 changes: 55 additions & 2 deletions pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"testing"
Expand All @@ -13,6 +14,49 @@ import (
"github.com/thanos-io/objstore"
)

var (
errNotFound = errors.New("not found")
errKeyDenied = errors.New("key denied")
)

func TestBucketWithRetries_ShouldRetry(t *testing.T) {
t.Parallel()

cases := map[string]struct {
err error
retryCount int
}{
"should not retry on not found": {
err: errNotFound,
retryCount: 1,
},
"should not retry on key access denied": {
err: errKeyDenied,
retryCount: 1,
},
}

for name, tc := range cases {
t.Run(name, func(*testing.T) {
m := mockBucket{
FailCount: 3,
errToReturn: tc.err,
}

b := BucketWithRetries{
logger: log.NewNopLogger(),
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

_, _ = b.Get(context.Background(), "something")
require.Equal(t, 1, m.calledCount)
})
}
}

func TestBucketWithRetries_UploadSeekable(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -102,6 +146,9 @@ func (f *fakeReader) Read(p []byte) (n int, err error) {
type mockBucket struct {
FailCount int
uploadedContent []byte
errToReturn error

calledCount int
}

// Upload mocks objstore.Bucket.Upload()
Expand Down Expand Up @@ -135,7 +182,8 @@ func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error,

// Get mocks objstore.Bucket.Get()
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return nil, nil
m.calledCount++
return nil, m.errToReturn
}

// GetRange mocks objstore.Bucket.GetRange()
Expand All @@ -150,7 +198,12 @@ func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) {

// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr()
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
return false
return err == errNotFound
}

// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError()
func (m *mockBucket) IsCustomerManagedKeyError(err error) bool {
return err == errKeyDenied
}

// ObjectSize mocks objstore.Bucket.Attributes()
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

// Attributes implements objstore.Bucket.
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, name)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/tsdb/bucketindex/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {

if errors.Is(err, ErrIndexNotFound) {
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID)
} else {
// We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just
// started to remote write and its blocks haven't uploaded to storage yet).
Expand Down Expand Up @@ -196,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) {
l.loadAttempts.Inc()
startTime := time.Now()
idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger)
if err != nil && !errors.Is(err, ErrIndexNotFound) {
if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
l.loadFailures.Inc()
level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err)
return
Expand Down
Loading

0 comments on commit e2e5bcf

Please sign in to comment.