Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not retain span logger created with index set initialized at query time #14027

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions pkg/storage/stores/shipper/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

Expand All @@ -32,7 +31,7 @@ const (
var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init(forQuerying bool) error
Init(forQuerying bool, logger log.Logger) error
Close()
ForEach(ctx context.Context, callback index.ForEachIndexCallback) error
ForEachConcurrent(ctx context.Context, callback index.ForEachIndexCallback) error
Expand Down Expand Up @@ -94,14 +93,12 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init(forQuerying bool) (err error) {
func (t *indexSet) Init(forQuerying bool, logger log.Logger) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx := context.Background()
ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout)

logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init")

defer func() {
if err != nil {
level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err)
Expand Down Expand Up @@ -186,7 +183,7 @@ func (t *indexSet) ForEach(ctx context.Context, callback index.ForEachIndexCallb
}
defer t.indexMtx.rUnlock()

logger := util_log.WithContext(ctx, t.logger)
logger := spanlogger.FromContextWithFallback(ctx, t.logger)
level.Debug(logger).Log("index-files-count", len(t.index))

for _, idx := range t.index {
Expand All @@ -205,7 +202,7 @@ func (t *indexSet) ForEachConcurrent(ctx context.Context, callback index.ForEach
}
defer t.indexMtx.rUnlock()

logger := util_log.WithContext(ctx, t.logger)
logger := spanlogger.FromContextWithFallback(ctx, t.logger)
level.Debug(logger).Log("index-files-count", len(t.index))

if len(t.index) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
}, util_log.Logger)
require.NoError(t, err)

require.NoError(t, idxSet.Init(false))
require.NoError(t, idxSet.Init(false, util_log.Logger))

return idxSet.(*indexSet), idxSet.Close
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/stores/shipper/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
}

userID := entry.Name()
logger := loggerWithUserID(table.logger, userID)
userIndexSet, err := NewIndexSet(name, userID, filepath.Join(cacheLocation, userID),
table.baseUserIndexSet, openIndexFileFunc, loggerWithUserID(table.logger, userID))
table.baseUserIndexSet, openIndexFileFunc, logger)
if err != nil {
return nil, err
}

err = userIndexSet.Init(false)
err = userIndexSet.Init(false, logger)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +130,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = commonIndexSet.Init(false)
err = commonIndexSet.Init(false, table.logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func (t *table) Sync(ctx context.Context) error {
// forQuerying must be set to true only getting the index for querying since
// it captures the amount of time it takes to download the index at query time.
func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) {
logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name))
logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id))

t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[id]
Expand All @@ -311,7 +312,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}

// instantiate the index set, add it to the map
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger)
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, loggerWithUserID(t.logger, id))
if err != nil {
return nil, err
}
Expand All @@ -321,7 +322,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
// it is up to the caller to wait for its readiness using IndexSet.AwaitReady()
go func() {
start := time.Now()
err := indexSet.Init(forQuerying)
err := indexSet.Init(forQuerying, logger)
duration := time.Since(start)

level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil)
Expand Down
Loading