Skip to content

Commit

Permalink
boltdb shipper index list cache improvements (#6054)
Browse files Browse the repository at this point in the history
* bypass index list cache when doing query time downloading of index

* detect and refresh stale index list cache during sync

(cherry picked from commit 2758dc6)
  • Loading branch information
sandeepsukhani authored and grafanabot committed May 3, 2022
1 parent fc8c4f0 commit fe2c07c
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDow
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, false)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/indexshipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
}

// list the users that have dedicated index files for this table
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName)
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro
return m.tablesInStorage, nil
}

func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil
}
21 changes: 18 additions & 3 deletions pkg/storage/stores/indexshipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName)
func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -46,14 +46,28 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table
return files, userIDs, nil
}

func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) {
files, err := o.Client.ListUserFiles(ctx, tableName, userID, true)
if err != nil {
return nil, err
}

files = append(files, storage.IndexFile{
Name: "fake-object",
ModifiedAt: time.Now(),
})

return files, nil
}

func buildTestTable(t *testing.T, path string) (*table, stopFunc) {
storageClient := buildTestStorageClient(t, path)
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(tableName, cachePath, storageClient, func(path string) (index.Index, error) {
return openMockIndexFile(t, path), nil
}).(*table)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false)
require.NoError(t, err)
require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex))

Expand Down Expand Up @@ -301,6 +315,7 @@ func TestTable_Sync(t *testing.T) {
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, newDB), []byte(newDB), 0755))

// sync the table
table.storageClient.RefreshIndexListCache(context.Background())
require.NoError(t, table.Sync(context.Background()))

// check that table got the new index and dropped the deleted index
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) {
ctx, cancelFunc := context.WithTimeout(is.ctx, userIndexReadinessTimeout)
defer cancelFunc()

is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID)
is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID, false)
if is.err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s
}

func (t *table) compact(applyRetention bool) error {
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name)
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (t *table) done() error {
continue
}

indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID)
indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID, false)
if err != nil {
return err
}
Expand Down
26 changes: 18 additions & 8 deletions pkg/storage/stores/shipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/grafana/loki/pkg/util/spanlogger"
)

var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init() error
Init(forQuerying bool) error
Close()
MultiQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error
DropAllDBs() error
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init() (err error) {
func (t *indexSet) Init(forQuerying bool) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (t *indexSet) Init() (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.dbs)))

// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false)
err = t.sync(ctx, false, forQuerying)
if err != nil {
return
}
Expand Down Expand Up @@ -275,11 +277,11 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true)
return t.sync(ctx, true, false)
}

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", "syncing index files")

defer func() {
Expand All @@ -290,7 +292,7 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
t.metrics.tablesSyncOperationTotal.WithLabelValues(status).Inc()
}()

toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock)
toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)
if err != nil {
return err
}
Expand All @@ -302,6 +304,14 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
return err
}

// if we did not bypass list cache and skipped downloading all the new files due to them being removed by compaction,
// it means the cache is not valid anymore since compaction would have happened after last index list cache refresh.
// Let us return error to ask the caller to re-run the sync after the list cache refresh.
if !bypassListCache && len(downloadedFiles) == 0 && len(toDownload) > 0 {
level.Error(t.logger).Log("msg", "we skipped downloading all the new files, possibly removed by compaction", "files", toDownload)
return errIndexListCacheTooStale
}

if lock {
err = t.dbsMtx.lock(ctx)
if err != nil {
Expand Down Expand Up @@ -331,11 +341,11 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
}

// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListCache bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, bypassListCache)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/index_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
boltDBIndexClient, util_log.Logger, newMetrics(nil))
require.NoError(t, err)

require.NoError(t, idxSet.Init())
require.NoError(t, idxSet.Init(false))

return idxSet.(*indexSet), func() {
idxSet.Close()
Expand Down
15 changes: 12 additions & 3 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}

err = userIndexSet.Init()
err = userIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand All @@ -128,7 +128,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}

err = commonIndexSet.Init()
err = commonIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,6 +254,15 @@ func (t *table) Sync(ctx context.Context) error {

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)

err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}
Expand Down Expand Up @@ -309,7 +318,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}()
}

err := indexSet.Init()
err := indexSet.Init(forQuerying)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error {
}

// list the users that have dedicated index files for this table
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName)
_, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro
return m.tablesInStorage, nil
}

func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) {
return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil
}
42 changes: 39 additions & 3 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName)
func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, _ bool) ([]storage.IndexFile, []string, error) {
files, userIDs, err := o.Client.ListFiles(ctx, tableName, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -52,6 +52,20 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table
return files, userIDs, nil
}

func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) {
files, err := o.Client.ListUserFiles(ctx, tableName, userID, true)
if err != nil {
return nil, err
}

files = append(files, storage.IndexFile{
Name: "fake-object",
ModifiedAt: time.Now(),
})

return files, nil
}

type stopFunc func()

func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, storage.Client) {
Expand All @@ -72,7 +86,7 @@ func buildTestTable(t *testing.T, path string) (*table, *local.BoltIndexClient,
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)).(*table)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName)
_, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false)
require.NoError(t, err)
require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex))

Expand Down Expand Up @@ -397,6 +411,7 @@ func TestTable_Sync(t *testing.T) {
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, newDB), boltdbClient, 20, 10, nil)

// sync the table
table.storageClient.RefreshIndexListCache(context.Background())
require.NoError(t, table.Sync(context.Background()))

// query and verify table has expected records from new db and the records from deleted db are gone
Expand All @@ -416,6 +431,27 @@ func TestTable_Sync(t *testing.T) {
_, ok := expectedFilesInDir[fileInfo.Name()]
require.True(t, ok)
}

// let us simulate a compaction to test stale index list cache handling

// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, oneMoreDB), boltdbClient, 30, 10, nil)
table.storageClient.RefreshIndexListCache(context.Background())

// now, without syncing the table, let us compact the index in storage
compactedDBName := "compacted-db"
testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, compactedDBName), boltdbClient, 10, 30, nil)
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, noUpdatesDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, newDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, oneMoreDB)))

// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, table.Sync(context.Background()))
// query and verify table has expected records
testutil.TestSingleTableQuery(t, userID, []index.Query{{}}, table, 10, 30)

require.Len(t, table.indexSets[""].(*indexSet).dbs, 1)
}

func TestTable_QueryResponse(t *testing.T) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/stores/shipper/storage/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectCl
// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
// we are doing read-through cache, and we do not want to serve stale results.
func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
func (c *cachedObjectClient) buildCacheOnce(ctx context.Context, forceRefresh bool) {
c.buildCacheWg.Add(1)
defer c.buildCacheWg.Done()

Expand All @@ -59,7 +59,7 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
select {
case c.buildCacheChan <- struct{}{}:
c.err = nil
c.err = c.buildCache(ctx)
c.err = c.buildCache(ctx, forceRefresh)
<-c.buildCacheChan
if c.err != nil {
level.Error(util_log.Logger).Log("msg", "failed to build cache", "err", c.err)
Expand All @@ -68,15 +68,24 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) {
}
}

func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
func (c *cachedObjectClient) RefreshIndexListCache(ctx context.Context) {
c.buildCacheOnce(ctx, true)
c.buildCacheWg.Wait()
}

func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
if bypassCache {
return c.ObjectClient.List(ctx, prefix, objectDelimiter)
}

prefix = strings.TrimSuffix(prefix, delimiter)
ss := strings.Split(prefix, delimiter)
if len(ss) > 2 {
return nil, nil, fmt.Errorf("invalid prefix %s", prefix)
}

if time.Since(c.cacheBuiltAt) >= cacheTimeout {
c.buildCacheOnce(ctx)
c.buildCacheOnce(ctx, false)
}

// wait for cache build operation to finish, if running
Expand Down Expand Up @@ -120,8 +129,8 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]clie
}

// buildCache builds the cache if expired
func (c *cachedObjectClient) buildCache(ctx context.Context) error {
if time.Since(c.cacheBuiltAt) < cacheTimeout {
func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) error {
if !forceRefresh && time.Since(c.cacheBuiltAt) < cacheTimeout {
return nil
}

Expand Down
Loading

0 comments on commit fe2c07c

Please sign in to comment.