Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Boltdb-shipper] If S3 ListObjects returns the directory itself getDBNameFromObjectKey fails #3173

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (t *table) compact() error {
return
}

// The s3 client can also return the directory itself in the ListObjects.
if shipper_util.IsDirectory(objectKey) {
continue
}

var dbName string
dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,11 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {

level.Debug(spanLogger).Log("total-files-downloaded", len(objects))

objects = shipper_util.RemoveDirectories(objects)

// open all the downloaded dbs
for _, object := range objects {

dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
return err
Expand Down Expand Up @@ -408,6 +411,8 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.
t.dbsMtx.RLock()
defer t.dbsMtx.RUnlock()

objects = shipper_util.RemoveDirectories(objects)

for _, object := range objects {
dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
Expand Down Expand Up @@ -506,6 +511,11 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
break
}

// The s3 client can also return the directory itself in the ListObjects.
if shipper_util.IsDirectory(object.Key) {
continue
}

var dbName string
dbName, err = getDBNameFromObjectKey(object.Key)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"runtime/debug"
"strings"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"go.etcd.io/bbolt"

Expand Down Expand Up @@ -155,3 +156,20 @@ func safeOpenBoltDbFile(path string, ret chan *result) {
res.boltdb = b
res.err = err
}

// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed.
func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject {
outgoing := make([]chunk.StorageObject, 0, len(incoming))
for _, o := range incoming {
if IsDirectory(o.Key) {
continue
}
outgoing = append(outgoing, o)
}
return outgoing
}

// IsDirectory will return true if the string ends in a forward slash
func IsDirectory(key string) bool {
return strings.HasSuffix(key, "/")
}
75 changes: 75 additions & 0 deletions pkg/storage/stores/shipper/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"path/filepath"
"testing"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
Expand Down Expand Up @@ -76,3 +78,76 @@ func Test_CompressFile(t *testing.T) {

require.Equal(t, testData, b)
}

func TestRemoveDirectories(t *testing.T) {
tests := []struct {
name string
incoming []chunk.StorageObject
expected []chunk.StorageObject
}{
{
name: "no trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "obj3"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "obj3"},
},
},
{
name: "one trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2/"},
{Key: "obj3"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj3"},
},
},
{
name: "only trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "/"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
},
},
{
name: "all trailing slash",
incoming: []chunk.StorageObject{
{Key: "/"},
{Key: "/"},
{Key: "/"},
},
expected: []chunk.StorageObject{},
},
{
name: "internal slash",
incoming: []chunk.StorageObject{
{Key: "test/test1"},
{Key: "te/st"},
{Key: "/sted"},
},
expected: []chunk.StorageObject{
{Key: "test/test1"},
{Key: "te/st"},
{Key: "/sted"},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expected, RemoveDirectories(test.incoming))
})
}
}