Skip to content

Commit

Permalink
fix(blooms): Delete outdated metas during planning (#13363)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 3, 2024
1 parent d8cc1ce commit 11e1976
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 54 deletions.
15 changes: 9 additions & 6 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

statusSuccess = "success"
statusFailure = "failure"

phasePlanning = "planning"
phaseBuilding = "building"
)

type Metrics struct {
Expand All @@ -33,8 +36,8 @@ type Metrics struct {
buildTime *prometheus.HistogramVec
buildLastSuccess prometheus.Gauge

blocksDeleted prometheus.Counter
metasDeleted prometheus.Counter
blocksDeleted *prometheus.CounterVec
metasDeleted *prometheus.CounterVec

tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
Expand Down Expand Up @@ -127,18 +130,18 @@ func NewMetrics(
Help: "Unix timestamp of the last successful build cycle.",
}),

blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
blocksDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "blocks_deleted_total",
Help: "Number of blocks deleted",
}),
metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
}, []string{"phase"}),
metasDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "metas_deleted_total",
Help: "Number of metas deleted",
}),
}, []string{"phase"}),

tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
55 changes: 35 additions & 20 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to get metas: %w", err)
}

// In case the planner restarted before deleting outdated metas in the previous iteration,
// we delete them during the planning phase to avoid reprocessing them.
metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, metas, phasePlanning)
if err != nil {
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

Expand All @@ -351,9 +358,6 @@ func (p *Planner) computeTasks(
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}
if len(gaps) == 0 {
continue
}

for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
Expand Down Expand Up @@ -424,51 +428,62 @@ func (p *Planner) processTenantTaskResults(
}

combined := append(originalMetas, newMetas...)
outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log("msg", "no outdated metas found")
return tasksSucceed, nil
}

level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, combined, phaseBuilding); err != nil {
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

return tasksSucceed, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
// It returns the up-to-date metas from the `metas` argument.
func (p *Planner) deleteOutdatedMetasAndBlocks(
ctx context.Context,
table config.DayTable,
tenant string,
metas []bloomshipper.Meta,
) error {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
phase string,
) ([]bloomshipper.Meta, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase)

upToDate, outdated := outdatedMetas(metas)
if len(outdated) == 0 {
level.Debug(logger).Log(
"msg", "no outdated metas found",
"upToDate", len(upToDate),
)
return upToDate, nil
}

level.Debug(logger).Log(
"msg", "found outdated metas",
"outdated", len(outdated),
"upToDate", len(upToDate),
)

client, err := p.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
return nil, errors.Wrap(err, "failed to get client")
}

var (
deletedMetas int
deletedBlocks int
)
defer func() {
p.metrics.metasDeleted.Add(float64(deletedMetas))
p.metrics.blocksDeleted.Add(float64(deletedBlocks))
p.metrics.metasDeleted.WithLabelValues(phase).Add(float64(deletedMetas))
p.metrics.blocksDeleted.WithLabelValues(phase).Add(float64(deletedBlocks))
}()

for _, meta := range metas {
for _, meta := range outdated {
for _, block := range meta.Blocks {
if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
return errors.Wrap(err, "failed to delete block")
return nil, errors.Wrap(err, "failed to delete block")
}
}

Expand All @@ -482,7 +497,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
return errors.Wrap(err, "failed to delete meta")
return nil, errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
Expand All @@ -495,7 +510,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
"blocks", deletedBlocks,
)

return nil
return upToDate, nil
}

func (p *Planner) tables(ts time.Time) *dayRangeIterator {
Expand Down
113 changes: 100 additions & 13 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func createPlanner(
HardLimit: flagext.Bytes(20 << 20),
TTL: time.Hour,
},
CacheListOps: false,
},
FSConfig: local.FSConfig{
Directory: t.TempDir(),
Expand Down Expand Up @@ -796,19 +797,7 @@ func Test_processTenantTaskResults(t *testing.T) {
},
)
require.NoError(t, err)

// TODO(salvacorts): Fix this
// For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB.
// As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the
// comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the
// fetched metas
for i := range metas {
for j := range metas[i].Sources {
sec := metas[i].Sources[j].TS.Unix()
nsec := metas[i].Sources[j].TS.Nanosecond()
metas[i].Sources[j].TS = time.Unix(sec, int64(nsec))
}
}
removeLocFromMetasSources(metas)

// Compare metas
require.Equal(t, len(tc.expectedMetas), len(metas))
Expand All @@ -817,6 +806,104 @@ func Test_processTenantTaskResults(t *testing.T) {
}
}

// For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB.
// As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the
// comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the
// fetched metas
func removeLocFromMetasSources(metas []bloomshipper.Meta) []bloomshipper.Meta {
for i := range metas {
for j := range metas[i].Sources {
sec := metas[i].Sources[j].TS.Unix()
nsec := metas[i].Sources[j].TS.Nanosecond()
metas[i].Sources[j].TS = time.Unix(sec, int64(nsec))
}
}

return metas
}

func Test_deleteOutdatedMetas(t *testing.T) {
for _, tc := range []struct {
name string
originalMetas []bloomshipper.Meta
expectedUpToDateMetas []bloomshipper.Meta
}{
{
name: "no metas",
},
{
name: "only up to date metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
},
{
name: "outdated metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}),
genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}),
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

bloomClient, err := planner.bloomStore.Client(testDay.ModelTime())
require.NoError(t, err)

// Create original metas and blocks
err = putMetas(bloomClient, tc.originalMetas)
require.NoError(t, err)

// Get all metas
metas, err := planner.bloomStore.FetchMetas(
context.Background(),
bloomshipper.MetaSearchParams{
TenantID: "fakeTenant",
Interval: bloomshipper.NewInterval(testTable.Bounds()),
Keyspace: v1.NewBounds(0, math.MaxUint64),
},
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.originalMetas, metas)

upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning)
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate)

// Get all metas
metas, err = planner.bloomStore.FetchMetas(
context.Background(),
bloomshipper.MetaSearchParams{
TenantID: "fakeTenant",
Interval: bloomshipper.NewInterval(testTable.Bounds()),
Keyspace: v1.NewBounds(0, math.MaxUint64),
},
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.expectedUpToDateMetas, metas)
})
}
}

type fakeBuilder struct {
mx sync.Mutex // Protects tasks and currTaskIdx.
id string
Expand Down
14 changes: 12 additions & 2 deletions pkg/bloombuild/planner/versioned_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ func (t tsdbTokenRange) reassemble(from int) tsdbTokenRange {
return t[:len(t)-(reassembleTo-from)]
}

func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta {
func outdatedMetas(metas []bloomshipper.Meta) ([]bloomshipper.Meta, []bloomshipper.Meta) {
var outdated []bloomshipper.Meta
var upToDate []bloomshipper.Meta

// Sort metas descending by most recent source when checking
// for outdated metas (older metas are discarded if they don't change the range).
Expand Down Expand Up @@ -254,8 +255,17 @@ func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta {
tokenRange, added = tokenRange.Add(version, meta.Bounds)
if !added {
outdated = append(outdated, meta)
continue
}

upToDate = append(upToDate, meta)
}

return outdated
// We previously sorted the input metas by their TSDB source TS, therefore, they may not be sorted by FP anymore.
// We need to re-sort them by their FP to match the original order.
sort.Slice(upToDate, func(i, j int) bool {
return upToDate[i].Bounds.Less(upToDate[j].Bounds)
})

return upToDate, outdated
}
Loading

0 comments on commit 11e1976

Please sign in to comment.