Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boltdb shipper index list cache improvements #6054

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDow
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, false)
if err != nil {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
}

// list the users that have dedicated index files for this table
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName)
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro
return m.tablesInStorage, nil
}

func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil
}
21 changes: 18 additions & 3 deletions pkg/storage/stores/indexshipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName)
func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -46,14 +46,28 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table
return files, userIDs, nil
}

func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) {
files, err := o.Client.ListUserFiles(ctx, tableName, userID, true)
if err != nil {
return nil, err
}

files = append(files, storage.IndexFile{
Name: "fake-object",
ModifiedAt: time.Now(),
})

return files, nil
}

func buildTestTable(t *testing.T, path string) (*table, stopFunc) {
storageClient := buildTestStorageClient(t, path)
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(tableName, cachePath, storageClient, func(path string) (index.Index, error) {
return openMockIndexFile(t, path), nil
}).(*table)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false)
require.NoError(t, err)
require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex))

Expand Down Expand Up @@ -301,6 +315,7 @@ func TestTable_Sync(t *testing.T) {
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, newDB), []byte(newDB), 0755))

// sync the table
table.storageClient.RefreshIndexListCache(context.Background())
require.NoError(t, table.Sync(context.Background()))

// check that table got the new index and dropped the deleted index
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) {
ctx, cancelFunc := context.WithTimeout(is.ctx, userIndexReadinessTimeout)
defer cancelFunc()

is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID)
is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID, false)
if is.err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s
}

func (t *table) compact(applyRetention bool) error {
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name)
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (t *table) done() error {
continue
}

indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID)
indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID, false)
if err != nil {
return err
}
Expand Down
26 changes: 18 additions & 8 deletions pkg/storage/stores/shipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/grafana/loki/pkg/util/spanlogger"
)

var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init() error
Init(forQuerying bool) error
Close()
MultiQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error
DropAllDBs() error
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init() (err error) {
func (t *indexSet) Init(forQuerying bool) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (t *indexSet) Init() (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.dbs)))

// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false)
err = t.sync(ctx, false, forQuerying)
if err != nil {
return
}
Expand Down Expand Up @@ -275,11 +277,11 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true)
return t.sync(ctx, true, false)
}

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", "syncing index files")

defer func() {
Expand All @@ -290,7 +292,7 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
t.metrics.tablesSyncOperationTotal.WithLabelValues(status).Inc()
}()

toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock)
toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)
if err != nil {
return err
}
Expand All @@ -302,6 +304,14 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
return err
}

// if we did not bypass list cache and skipped downloading all the new files due to them being removed by compaction,
// it means the cache is not valid anymore since compaction would have happened after last index list cache refresh.
// Let us return error to ask the caller to re-run the sync after the list cache refresh.
if !bypassListCache && len(downloadedFiles) == 0 && len(toDownload) > 0 {
level.Error(t.logger).Log("msg", "we skipped downloading all the new files, possibly removed by compaction", "files", toDownload)
return errIndexListCacheTooStale
}

if lock {
err = t.dbsMtx.lock(ctx)
if err != nil {
Expand Down Expand Up @@ -331,11 +341,11 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
}

// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListCache bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, bypassListCache)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/index_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
boltDBIndexClient, util_log.Logger, newMetrics(nil))
require.NoError(t, err)

require.NoError(t, idxSet.Init())
require.NoError(t, idxSet.Init(false))

return idxSet.(*indexSet), func() {
idxSet.Close()
Expand Down
15 changes: 12 additions & 3 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}

err = userIndexSet.Init()
err = userIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand All @@ -128,7 +128,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}

err = commonIndexSet.Init()
err = commonIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,6 +254,15 @@ func (t *table) Sync(ctx context.Context) error {

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)

err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}
Expand Down Expand Up @@ -309,7 +318,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}()
}

err := indexSet.Init()
err := indexSet.Init(forQuerying)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error {
}

// list the users that have dedicated index files for this table
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName)
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro
return m.tablesInStorage, nil
}

func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil
}
42 changes: 39 additions & 3 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName)
func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, _ bool) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -52,6 +52,20 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table
return files, userIDs, nil
}

func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) {
files, err := o.Client.ListUserFiles(ctx, tableName, userID, true)
if err != nil {
return nil, err
}

files = append(files, storage.IndexFile{
Name: "fake-object",
ModifiedAt: time.Now(),
})

return files, nil
}

type stopFunc func()

func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, storage.Client) {
Expand All @@ -72,7 +86,7 @@ func buildTestTable(t *testing.T, path string) (*table, *local.BoltIndexClient,
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)).(*table)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false)
require.NoError(t, err)
require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex))

Expand Down Expand Up @@ -397,6 +411,7 @@ func TestTable_Sync(t *testing.T) {
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, newDB), boltdbClient, 20, 10, nil)

// sync the table
table.storageClient.RefreshIndexListCache(context.Background())
require.NoError(t, table.Sync(context.Background()))

// query and verify table has expected records from new db and the records from deleted db are gone
Expand All @@ -416,6 +431,27 @@ func TestTable_Sync(t *testing.T) {
_, ok := expectedFilesInDir[fileInfo.Name()]
require.True(t, ok)
}

// let us simulate a compaction to test stale index list cache handling

// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, oneMoreDB), boltdbClient, 30, 10, nil)
table.storageClient.RefreshIndexListCache(context.Background())

// now, without syncing the table, let us compact the index in storage
compactedDBName := "compacted-db"
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, compactedDBName), boltdbClient, 10, 30, nil)
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, noUpdatesDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, newDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, oneMoreDB)))

// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, table.Sync(context.Background()))
// query and verify table has expected records
testutil.TestSingleTableQuery(t, userID, []index.Query{{}}, table, 10, 30)

require.Len(t, table.indexSets[""].(*indexSet).dbs, 1)
}

func TestTable_QueryResponse(t *testing.T) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/stores/shipper/storage/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectCl
// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
// we are doing read-through cache, and we do not want to serve stale results.
func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
func (c *cachedObjectClient) buildCacheOnce(ctx context.Context, forceRefresh bool) {
c.buildCacheWg.Add(1)
defer c.buildCacheWg.Done()

Expand All @@ -59,7 +59,7 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
select {
case c.buildCacheChan <- struct{}{}:
c.err = nil
c.err = c.buildCache(ctx)
c.err = c.buildCache(ctx, forceRefresh)
<-c.buildCacheChan
if c.err != nil {
level.Error(util_log.Logger).Log("msg", "failed to build cache", "err", c.err)
Expand All @@ -68,15 +68,24 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
}
}

func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
func (c *cachedObjectClient) RefreshIndexListCache(ctx context.Context) {
c.buildCacheOnce(ctx, true)
c.buildCacheWg.Wait()
}

func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
if bypassCache {
return c.ObjectClient.List(ctx, prefix, objectDelimiter)
}

prefix = strings.TrimSuffix(prefix, delimiter)
ss := strings.Split(prefix, delimiter)
if len(ss) > 2 {
return nil, nil, fmt.Errorf("invalid prefix %s", prefix)
}

if time.Since(c.cacheBuiltAt) >= cacheTimeout {
c.buildCacheOnce(ctx)
c.buildCacheOnce(ctx, false)
}

// wait for cache build operation to finish, if running
Expand Down Expand Up @@ -120,8 +129,8 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]clie
}

// buildCache builds the cache if expired
func (c *cachedObjectClient) buildCache(ctx context.Context) error {
if time.Since(c.cacheBuiltAt) < cacheTimeout {
func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) error {
if !forceRefresh && time.Since(c.cacheBuiltAt) < cacheTimeout {
return nil
}

Expand Down
Loading