Skip to content

Commit

Permalink
perf(bloom): Compute chunkrefs for series right before sending task t…
Browse files Browse the repository at this point in the history
…o builder (#14808)
  • Loading branch information
salvacorts authored Nov 7, 2024
1 parent 3b20cb0 commit 66e6b1c
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 187 deletions.
24 changes: 7 additions & 17 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ const (
gzipExtension = ".gz"
)

type ForSeries = sharding.ForSeries

type ClosableForSeries interface {
sharding.ForSeries
ForSeries
Close() error
}

Expand Down Expand Up @@ -124,9 +126,9 @@ func (b *BloomTSDBStore) LoadTSDB(
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)
series := make([]model.Fingerprint, 0, 100)

if err := f.ForSeries(
ctx,
Expand All @@ -138,19 +140,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return true
default:
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
series = append(series, fp)
return false
}
},
Expand All @@ -161,7 +151,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
default:
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators(
v1.CompareIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
require.Equal(t, a, b.Fingerprint)
},
itr,
srcItr,
Expand Down
80 changes: 49 additions & 31 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ func (p *Planner) runOne(ctx context.Context) error {
}

var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
wg sync.WaitGroup
start = time.Now()
status = statusFailure
openTSDBs strategies.TSDBSet
)
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
Expand All @@ -238,6 +239,15 @@ func (p *Planner) runOne(ctx context.Context) error {
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs.
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
}
}
}()

p.metrics.buildStarted.Inc()
Expand Down Expand Up @@ -275,7 +285,19 @@ func (p *Planner) runOne(ctx context.Context) error {
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
continue
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
Expand All @@ -286,7 +308,7 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -374,7 +396,8 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
tsdbs strategies.TSDBSet,
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
Expand Down Expand Up @@ -402,29 +425,11 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
Expand Down Expand Up @@ -506,18 +511,26 @@ func openAllTSDBs(
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
alreadyOpen strategies.TSDBSet,
) (strategies.TSDBSet, error) {
if len(alreadyOpen) == 0 {
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
}

for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if _, ok := alreadyOpen[idx]; ok {
continue
}

reader, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
alreadyOpen[idx] = reader
}

return openTSDBs, nil
return alreadyOpen, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
Expand Down Expand Up @@ -847,8 +860,13 @@ func (p *Planner) forwardTaskToBuilder(
builderID string,
task *QueueTask,
) (*protos.TaskResult, error) {
protoTask, err := task.ToProtoTask(builder.Context())
if err != nil {
return nil, fmt.Errorf("error converting task to proto task: %w", err)
}

msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: protoTask,
}

if err := builder.Send(msg); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,12 +713,21 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))

tasks := make([]*QueueTask, 0, n)
// Enqueue tasks
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
strategies.NewTask(
config.NewDayTable(plannertest.TestDay, "fake"),
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
nil,
),
forSeries,
resultsCh,
)
tasks = append(tasks, task)
Expand Down
58 changes: 56 additions & 2 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var TestDay = ParseDayTime("2023-09-01")
Expand Down Expand Up @@ -87,11 +89,23 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}

func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, i)
}
return series
}

func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
return GenV1SeriesWithStep(bounds, 1)
}

func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Expand Down Expand Up @@ -139,3 +153,43 @@ func ParseDayTime(s string) config.DayTime {
Time: model.TimeFromUnix(t.Unix()),
}
}

type FakeForSeries struct {
series []*v1.Series
}

func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
return &FakeForSeries{
series: series,
}
}

func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}

for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
KB: 100,
})
}

if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}

func (f FakeForSeries) Close() error {
return nil
}
Loading

0 comments on commit 66e6b1c

Please sign in to comment.