Skip to content

Commit

Permalink
fix(blooms): Ship chunkrefs in task payload (#13677)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jul 29, 2024
1 parent a77457f commit 450bbce
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 117 deletions.
12 changes: 3 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (b *Builder) processTask(
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation.
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
Expand Down Expand Up @@ -454,15 +454,9 @@ func (b *Builder) processTask(
func (b *Builder) loadWorkForGap(
ctx context.Context,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))

// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
Expand Down
24 changes: 10 additions & 14 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -30,6 +29,11 @@ const (
gzipExtension = ".gz"
)

type ClosableForSeries interface {
sharding.ForSeries
Close() error
}

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
Expand All @@ -38,8 +42,7 @@ type TSDBStore interface {
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error)
) (ClosableForSeries, error)
}

// BloomTSDBStore is a wrapper around the storage.Client interface which
Expand Down Expand Up @@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
Expand All @@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB(
}

idx := tsdb.NewTSDBIndex(reader)
defer func() {
if err := idx.Close(); err != nil {
level.Error(b.logger).Log("msg", "failed to close index", "err", err)
}
}()

return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
Expand Down Expand Up @@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}

return store.LoadTSDB(ctx, table, tenant, id, bounds)
return store.LoadTSDB(ctx, table, tenant, id)
}
99 changes: 73 additions & 26 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,37 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)

// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
Expand Down Expand Up @@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults(
return tasksSucceed, nil
}

func openAllTSDBs(
ctx context.Context,
table config.DayTable,
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
}

return openTSDBs, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
// It returns the up-to-date metas from the `metas` argument.
func (p *Planner) deleteOutdatedMetasAndBlocks(
Expand Down Expand Up @@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.GapWithBlocks
gaps []protos.Gap
}

func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTable,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, nil
}

// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
Expand All @@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps(
return nil, nil
}

work, err := blockPlansForGaps(tsdbsWithGaps, metas)
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
Expand All @@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps(

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []v1.FingerprintBounds
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}

// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for _, db := range tsdbs {
for db, tsdb := range tsdbs {
id := db.Name()

relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
Expand All @@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas(

if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdb: db,
gaps: gaps,
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}
Expand All @@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas(
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := protos.GapWithBlocks{
planGap := protos.Gap{
Bounds: gap,
}

for _, meta := range metas {
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}

for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
Expand Down
Loading

0 comments on commit 450bbce

Please sign in to comment.