From 7b55dc950fa417251d7e4d28efe1c0eda668bcfa Mon Sep 17 00:00:00 2001 From: Wouter D'Haeseleer Date: Thu, 14 Jan 2021 18:22:22 +0100 Subject: [PATCH 1/2] The s3 client can also return the directory itself in the ListObjects If that's the case the getDBNameFromObjectKey will fail to split the names based on `/` --- pkg/storage/stores/shipper/compactor/table.go | 6 ++++++ pkg/storage/stores/shipper/downloads/table.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 33f2758fb7a0..860f3b7c099f 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -107,6 +108,11 @@ func (t *table) compact() error { return } + // The s3 client can also return the directory itself in the ListObjects. + if strings.HasSuffix(objectKey, "/") { + continue + } + var dbName string dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey) if err != nil { diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 2dc42178a62e..e0cb27648530 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -211,6 +211,12 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { // open all the downloaded dbs for _, object := range objects { + + // The s3 client can also return the directory itself in the ListObjects. + if strings.HasSuffix(object.Key, "/") { + continue + } + dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return err @@ -409,6 +415,10 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk. defer t.dbsMtx.RUnlock() for _, object := range objects { + // The s3 client can also return the directory itself in the ListObjects. + if strings.HasSuffix(object.Key, "/") { + continue + } dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return nil, nil, err @@ -506,6 +516,11 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO break } + // The s3 client can also return the directory itself in the ListObjects. + if strings.HasSuffix(object.Key, "/") { + continue + } + var dbName string dbName, err = getDBNameFromObjectKey(object.Key) if err != nil { From ea7f6371217e0ea1bc56e0e3e41e1214a2a8edd1 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Fri, 26 Feb 2021 11:59:55 -0500 Subject: [PATCH 2/2] refactor out a function with some tests --- pkg/storage/stores/shipper/compactor/table.go | 3 +- pkg/storage/stores/shipper/downloads/table.go | 15 ++-- pkg/storage/stores/shipper/util/util.go | 18 +++++ pkg/storage/stores/shipper/util/util_test.go | 75 +++++++++++++++++++ 4 files changed, 99 insertions(+), 12 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 860f3b7c099f..1d051d9c171a 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -109,7 +108,7 @@ func (t *table) compact() error { } // The s3 client can also return the directory itself in the ListObjects. - if strings.HasSuffix(objectKey, "/") { + if shipper_util.IsDirectory(objectKey) { continue } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index e0cb27648530..1f1d224f7e9b 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -209,14 +209,11 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { level.Debug(spanLogger).Log("total-files-downloaded", len(objects)) + objects = shipper_util.RemoveDirectories(objects) + // open all the downloaded dbs for _, object := range objects { - // The s3 client can also return the directory itself in the ListObjects. - if strings.HasSuffix(object.Key, "/") { - continue - } - dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return err @@ -414,11 +411,9 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk. t.dbsMtx.RLock() defer t.dbsMtx.RUnlock() + objects = shipper_util.RemoveDirectories(objects) + for _, object := range objects { - // The s3 client can also return the directory itself in the ListObjects. - if strings.HasSuffix(object.Key, "/") { - continue - } dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return nil, nil, err @@ -517,7 +512,7 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO } // The s3 client can also return the directory itself in the ListObjects. - if strings.HasSuffix(object.Key, "/") { + if shipper_util.IsDirectory(object.Key) { continue } diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 8861043948d1..5189e1607121 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -8,6 +8,7 @@ import ( "runtime/debug" "strings" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" "go.etcd.io/bbolt" @@ -155,3 +156,20 @@ func safeOpenBoltDbFile(path string, ret chan *result) { res.boltdb = b res.err = err } + +// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed. +func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject { + outgoing := make([]chunk.StorageObject, 0, len(incoming)) + for _, o := range incoming { + if IsDirectory(o.Key) { + continue + } + outgoing = append(outgoing, o) + } + return outgoing +} + +// IsDirectory will return true if the string ends in a forward slash +func IsDirectory(key string) bool { + return strings.HasSuffix(key, "/") +} diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/shipper/util/util_test.go index 4943cad55cce..03d39f07025a 100644 --- a/pkg/storage/stores/shipper/util/util_test.go +++ b/pkg/storage/stores/shipper/util/util_test.go @@ -7,7 +7,9 @@ import ( "path/filepath" "testing" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" @@ -76,3 +78,76 @@ func Test_CompressFile(t *testing.T) { require.Equal(t, testData, b) } + +func TestRemoveDirectories(t *testing.T) { + tests := []struct { + name string + incoming []chunk.StorageObject + expected []chunk.StorageObject + }{ + { + name: "no trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "obj3"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "obj3"}, + }, + }, + { + name: "one trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2/"}, + {Key: "obj3"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj3"}, + }, + }, + { + name: "only trailing slash", + incoming: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + {Key: "/"}, + }, + expected: []chunk.StorageObject{ + {Key: "obj1"}, + {Key: "obj2"}, + }, + }, + { + name: "all trailing slash", + incoming: []chunk.StorageObject{ + {Key: "/"}, + {Key: "/"}, + {Key: "/"}, + }, + expected: []chunk.StorageObject{}, + }, + { + name: "internal slash", + incoming: []chunk.StorageObject{ + {Key: "test/test1"}, + {Key: "te/st"}, + {Key: "/sted"}, + }, + expected: []chunk.StorageObject{ + {Key: "test/test1"}, + {Key: "te/st"}, + {Key: "/sted"}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, RemoveDirectories(test.incoming)) + }) + } +}