From d362df7a98a4ba409a10a5e24b869fb8c41cdbc8 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 10 Jun 2021 16:44:39 +0530 Subject: [PATCH] ignore download of missing boltdb files possibly removed during compaction (#3563) --- pkg/storage/stores/shipper/downloads/table.go | 17 +++++++- .../stores/shipper/downloads/table_test.go | 41 ++++++++++++++++--- 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 3574d6d71ba5..5eb228a10890 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -2,6 +2,7 @@ package downloads import ( "context" + "errors" "fmt" "io" "io/ioutil" @@ -221,13 +222,16 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { // open all the downloaded dbs for _, object := range objects { - dbName, err := getDBNameFromObjectKey(object.Key) if err != nil { return err } filePath := path.Join(folderPath, dbName) + if _, err := os.Stat(filePath); os.IsNotExist(err) { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping opening of non-existent file %s, possibly not downloaded due to it being removed during compaction.", filePath)) + continue + } boltdb, err := shipper_util.SafeOpenBoltdbFile(filePath) if err != nil { return err @@ -454,6 +458,10 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true) if err != nil { + if errors.Is(err, chunk.ErrStorageObjectNotFound) { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", storageObject.Key)) + return nil + } return err } @@ -529,7 +537,12 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO filePath := path.Join(folderPathForTable, dbName) err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true) if err != nil { - break + if errors.Is(err, chunk.ErrStorageObjectNotFound) { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", object.Key)) + err = nil + } else { + break + } } } diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index 033ed6ebffe6..7aa35dcd5efc 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -22,9 +22,33 @@ const ( objectsStorageDirName = "objects" ) +// storageClientWithFakeObjectsInList adds a fake object in the list call response which +// helps with testing the case where objects gets deleted in the middle of a Sync/Download operation due to compaction. +type storageClientWithFakeObjectsInList struct { + StorageClient +} + +func newStorageClientWithFakeObjectsInList(storageClient StorageClient) StorageClient { + return storageClientWithFakeObjectsInList{storageClient} +} + +func (o storageClientWithFakeObjectsInList) List(ctx context.Context, prefix string, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { + objects, commonPrefixes, err := o.StorageClient.List(ctx, prefix, delimiter) + if err != nil { + return nil, nil, err + } + + objects = append(objects, chunk.StorageObject{ + Key: fmt.Sprintf(prefix, "fake-object"), + ModifiedAt: time.Now(), + }) + + return objects, commonPrefixes, nil +} + type stopFunc func() -func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) { +func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, StorageClient) { cachePath := filepath.Join(path, cacheDirName) boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: cachePath}) @@ -38,10 +62,10 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local } func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIndexClient, stopFunc) { - boltDBIndexClient, fsObjectClient := buildTestClients(t, path) + boltDBIndexClient, storageClient := buildTestClients(t, path) cachePath := filepath.Join(path, cacheDirName) - table := NewTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil)) + table := NewTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)) // wait for either table to get ready or a timeout hits select { @@ -138,6 +162,9 @@ func TestTable_Sync(t *testing.T) { stopFunc() }() + // replace the storage client with the one that adds fake objects in the list call + table.storageClient = newStorageClientWithFakeObjectsInList(table.storageClient) + // query table to see it has expected records setup testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20) @@ -304,11 +331,13 @@ func TestLoadTable(t *testing.T) { // setup the table in storage with some records testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false) - boltDBIndexClient, fsObjectClient := buildTestClients(t, tempDir) + boltDBIndexClient, storageClient := buildTestClients(t, tempDir) cachePath := filepath.Join(tempDir, cacheDirName) + storageClient = newStorageClientWithFakeObjectsInList(storageClient) + // try loading the table. - table, err := LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil)) + table, err := LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)) require.NoError(t, err) require.NotNil(t, table) @@ -338,7 +367,7 @@ func TestLoadTable(t *testing.T) { testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false) // try loading the table, it should skip loading corrupt file and reload it from storage. - table, err = LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil)) + table, err = LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)) require.NoError(t, err) require.NotNil(t, table)