Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy boltdb-shipper cache changes from PR #6054 to generic index shipper #6316

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ const (
gzipExtension = ".gz"
)

var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init() error
Init(forQuerying bool) error
Close()
ForEach(ctx context.Context, callback index.ForEachIndexCallback) error
DropAllDBs() error
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init() (err error) {
func (t *indexSet) Init(forQuerying bool) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (t *indexSet) Init() (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.index)))

// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false)
err = t.sync(ctx, false, forQuerying)
if err != nil {
return
}
Expand Down Expand Up @@ -241,14 +243,14 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true)
return t.sync(ctx, true, false)
}

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock bool, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName))

toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock)
toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)
if err != nil {
return err
}
Expand All @@ -260,6 +262,14 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
return err
}

// if we did not bypass list cache and skipped downloading all the new files due to them being removed by compaction,
// it means the cache is not valid anymore since compaction would have happened after last index list cache refresh.
// Let us return error to ask the caller to re-run the sync after the list cache refresh.
if !bypassListCache && len(downloadedFiles) == 0 && len(toDownload) > 0 {
level.Error(t.logger).Log("msg", "we skipped downloading all the new files, possibly removed by compaction", "files", toDownload)
return errIndexListCacheTooStale
}

if lock {
err = t.indexMtx.lock(ctx)
if err != nil {
Expand Down Expand Up @@ -289,11 +299,11 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
}

// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListCache bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, false)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, bypassListCache)
if err != nil {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
}, util_log.Logger)
require.NoError(t, err)

require.NoError(t, idxSet.Init())
require.NoError(t, idxSet.Init(false))

return idxSet.(*indexSet), idxSet.Close
}
Expand Down
45 changes: 33 additions & 12 deletions pkg/storage/stores/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = userIndexSet.Init()
err = userIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand All @@ -122,7 +122,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = commonIndexSet.Init()
err = commonIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,15 +153,7 @@ func (t *table) ForEach(ctx context.Context, userID string, callback index.ForEa
}

if indexSet.Err() != nil {
level.Error(util_log.WithContext(ctx, t.logger)).Log("msg", fmt.Sprintf("index set %s has some problem, cleaning it up", uid), "err", indexSet.Err())
if err := indexSet.DropAllDBs(); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to cleanup broken index set %s", uid), "err", err)
}

t.indexSetsMtx.Lock()
delete(t.indexSets, userID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the cause of the page LokiBoltdbShipperTablesDownloadFailed that we saw yesterday, we were unexpectedly cleaning up user index set always instead of the one that was broken.

t.indexSetsMtx.Unlock()

t.cleanupBrokenIndexSet(ctx, uid)
return indexSet.Err()
}

Expand Down Expand Up @@ -243,6 +235,15 @@ func (t *table) Sync(ctx context.Context) error {

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)

err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}
Expand Down Expand Up @@ -297,15 +298,35 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}()
}

err := indexSet.Init()
err := indexSet.Init(forQuerying)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
}

t.cleanupBrokenIndexSet(ctx, id)
}()

return indexSet, nil
}

func (t *table) cleanupBrokenIndexSet(ctx context.Context, id string) {
t.indexSetsMtx.Lock()
defer t.indexSetsMtx.Unlock()

// cleanup indexset only if it exists and is really broken i.e Err() returns a non-nil error
indexSet, ok := t.indexSets[id]
if !ok || indexSet.Err() == nil {
return
}

level.Error(util_log.WithContext(ctx, t.logger)).Log("msg", fmt.Sprintf("index set %s has some problem, cleaning it up", id), "err", indexSet.Err())
if err := indexSet.DropAllDBs(); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to cleanup broken index set %s", id), "err", err)
}

delete(t.indexSets, id)
}

// EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs.
// When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids.
func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error {
Expand Down
27 changes: 27 additions & 0 deletions pkg/storage/stores/indexshipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ func TestTable_Sync(t *testing.T) {
_, ok := expectedFilesInDir[fileInfo.Name()]
require.True(t, ok)
}

// let us simulate a compaction to test stale index list cache handling

// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755))
table.storageClient.RefreshIndexListCache(context.Background())

// now, without syncing the table, let us compact the index in storage
compactedDBName := "compacted-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, compactedDBName), []byte(compactedDBName), 0755))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, noUpdatesDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, newDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, oneMoreDB)))

// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, table.Sync(context.Background()))

// verify that table has got only compacted db
indexesFound = []string{}
err = table.ForEach(context.Background(), userID, func(idx index.Index) error {
indexesFound = append(indexesFound, idx.Name())
return nil
})
require.NoError(t, err)
sort.Strings(indexesFound)
require.Equal(t, []string{compactedDBName}, indexesFound)
}

func TestLoadTable(t *testing.T) {
Expand Down