diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 33f2758fb7a0..1d051d9c171a 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -107,6 +107,11 @@ func (t *table) compact() error { return } + // The s3 client can also return the directory itself in the ListObjects. + if shipper_util.IsDirectory(objectKey) { + continue + } + var dbName string dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey) if err != nil { diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 2dc42178a62e..1f1d224f7e9b 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -209,8 +209,11 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { level.Debug(spanLogger).Log("total-files-downloaded", len(objects)) + objects = shipper_util.RemoveDirectories(objects) + // open all the downloaded dbs for _, object := range objects { + dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return err @@ -408,6 +411,8 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk. t.dbsMtx.RLock() defer t.dbsMtx.RUnlock() + objects = shipper_util.RemoveDirectories(objects) + for _, object := range objects { dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { @@ -506,6 +511,11 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO break } + // The s3 client can also return the directory itself in the ListObjects. + if shipper_util.IsDirectory(object.Key) { + continue + } + var dbName string dbName, err = getDBNameFromObjectKey(object.Key) if err != nil { diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 8861043948d1..5189e1607121 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -8,6 +8,7 @@ import ( "runtime/debug" "strings" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" "go.etcd.io/bbolt" @@ -155,3 +156,20 @@ func safeOpenBoltDbFile(path string, ret chan *result) { res.boltdb = b res.err = err } + +// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed. +func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject { + outgoing := make([]chunk.StorageObject, 0, len(incoming)) + for _, o := range incoming { + if IsDirectory(o.Key) { + continue + } + outgoing = append(outgoing, o) + } + return outgoing +} + +// IsDirectory will return true if the string ends in a forward slash +func IsDirectory(key string) bool { + return strings.HasSuffix(key, "/") +} diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/shipper/util/util_test.go index 4943cad55cce..03d39f07025a 100644 --- a/pkg/storage/stores/shipper/util/util_test.go +++ b/pkg/storage/stores/shipper/util/util_test.go @@ -7,7 +7,9 @@ import ( "path/filepath" "testing" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" @@ -76,3 +78,76 @@ func Test_CompressFile(t *testing.T) { require.Equal(t, testData, b) } + +func TestRemoveDirectories(t *testing.T) { + tests := []struct { + name string + incoming []chunk.StorageObject + expected []chunk.StorageObject + }{ + { + name: "no trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "obj3"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "obj3"}, + }, + }, + { + name: "one trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2/"}, + {Key: "obj3"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj3"}, + }, + }, + { + name: "only trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "/"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + }, + }, + { + name: "all trailing slash", + incoming: []chunk.StorageObject{ + {Key: "/"}, + {Key: "/"}, + {Key: "/"}, + }, + expected: []chunk.StorageObject{}, + }, + { + name: "internal slash", + incoming: []chunk.StorageObject{ + {Key: "test/test1"}, + {Key: "te/st"}, + {Key: "/sted"}, + }, + expected: []chunk.StorageObject{ + {Key: "test/test1"}, + {Key: "te/st"}, + {Key: "/sted"}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, RemoveDirectories(test.incoming)) + }) + } +}