Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: remove dead code from compactor #5069

Merged
merged 4 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type Syncer struct {
fetcher block.MetadataFetcher
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
metrics *syncerMetrics
deduplicateBlocksFilter DeduplicateFilter
}
Expand Down Expand Up @@ -105,23 +104,14 @@ func (s *Syncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

metas, partial, err := s.fetcher.Fetch(ctx)
metas, _, err := s.fetcher.Fetch(ctx)
if err != nil {
return err
}
s.blocks = metas
s.partial = partial
return nil
}

// Partial returns partial blocks since last sync.
func (s *Syncer) Partial() map[ulid.ULID]error {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.partial
}

// Metas returns loaded metadata blocks since last sync.
func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta {
s.mtx.Lock()
Expand Down Expand Up @@ -943,7 +933,7 @@ func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{}

// Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true,
// blocks are also removed from metas. (Thanos version of the filter doesn't do removal).
func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
noCompactMarkedMap := make(map[ulid.ULID]struct{})

// Find all no-compact markers in the storage.
Expand Down Expand Up @@ -1007,7 +997,7 @@ func (f *ExcludeMarkedForDeletionFilter) DeletionMarkBlocks() map[ulid.ULID]stru

// Filter filters out blocks that are marked for deletion.
// It also builds the map returned by DeletionMarkBlocks() method.
func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
deletionMarkMap := make(map[ulid.ULID]struct{})

// Find all markers in the storage.
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), false)
require.NoError(t, f.Filter(ctx, metas, synced, nil))
require.NoError(t, f.Filter(ctx, metas, synced))

require.Contains(t, metas, block1)
require.Contains(t, metas, block2)
Expand All @@ -206,7 +206,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
require.NoError(t, f.Filter(ctx, metas, synced, nil))
require.NoError(t, f.Filter(ctx, metas, synced))

require.Contains(t, metas, block1)
require.NotContains(t, metas, block2) // block2 was removed from metas.
Expand All @@ -232,7 +232,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
cancel()

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
require.Error(t, f.Filter(canceledCtx, metas, synced, nil))
require.Error(t, f.Filter(canceledCtx, metas, synced))

require.Contains(t, metas, block1)
require.Contains(t, metas, block2)
Expand All @@ -249,7 +249,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
err := f.Filter(ctx, metas, synced, nil)
err := f.Filter(ctx, metas, synced)
require.NoError(t, err)
require.Empty(t, metas)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/label_remover_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter {
}

// Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.
func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec, _ block.GaugeVec) error {
func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec) error {
for _, meta := range metas {
for _, l := range f.labels {
delete(meta.Thanos.Labels, l)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/label_remover_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestLabelRemoverFilter(t *testing.T) {
}

f := NewLabelRemoverFilter(testData.labels)
err := f.Filter(context.Background(), metas, nil, nil)
err := f.Filter(context.Background(), metas, nil)
require.NoError(t, err)
assert.Len(t, metas, len(testData.expected))

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/shard_aware_deduplicate_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewShardAwareDeduplicateFilter() *ShardAwareDeduplicateFilter {

// Filter filters out from metas, the initial map of blocks, all the blocks that are contained in other, compacted, blocks.
// The removed blocks are source blocks of the blocks that remain in metas after the filtering is executed.
func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
f.duplicateIDs = f.duplicateIDs[:0]

metasByResolution := make(map[int64][]*metadata.Meta)
Expand Down
7 changes: 3 additions & 4 deletions pkg/compactor/shard_aware_deduplicate_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestShardAwareDeduplicateFilter_Filter(t *testing.T) {
expected[id] = m
}

require.NoError(t, f.Filter(context.Background(), metas, m.Synced, m.Modified))
require.NoError(t, f.Filter(context.Background(), metas, m.Synced))
require.Equal(t, expected, metas)
require.Equal(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(duplicateMeta)))

Expand All @@ -387,8 +387,7 @@ func TestShardAwareDeduplicateFilter_Filter(t *testing.T) {

func newTestFetcherMetrics() *block.FetcherMetrics {
return &block.FetcherMetrics{
Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}),
Modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}),
Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}),
}
}

Expand Down Expand Up @@ -453,7 +452,7 @@ func BenchmarkDeduplicateFilter_Filter(b *testing.B) {
b.ResetTimer()
b.Run("", func(b *testing.B) {
for n := 0; n <= b.N; n++ {
_ = dedupFilter.Filter(context.Background(), tcase, synced, nil)
_ = dedupFilter.Filter(context.Background(), tcase, synced)
require.Equal(b, 0, len(dedupFilter.DuplicateIDs()))
}
})
Expand Down
34 changes: 4 additions & 30 deletions pkg/storage/tsdb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,17 @@ type FetcherMetrics struct {
SyncFailures prometheus.Counter
SyncDuration prometheus.Histogram

Synced *extprom.TxGaugeVec
Modified *extprom.TxGaugeVec
Synced *extprom.TxGaugeVec
}

// Submit applies new values for metrics tracked by transaction GaugeVec.
func (s *FetcherMetrics) Submit() {
s.Synced.Submit()
s.Modified.Submit()
}

// ResetTx starts new transaction for metrics tracked by transaction GaugeVec.
func (s *FetcherMetrics) ResetTx() {
s.Synced.ResetTx()
s.Modified.ResetTx()
}

const (
Expand All @@ -74,9 +71,6 @@ const (

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)

func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExtraLabels [][]string) *FetcherMetrics {
Expand Down Expand Up @@ -118,18 +112,6 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt
{MarkedForNoCompactionMeta},
}, syncedExtraLabels...)...,
)
m.Modified = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of blocks whose metadata changed",
},
[]string{"modified"},
append([][]string{
{replicaRemovedMeta},
}, modifiedExtraLabels...)...,
)
return &m
}

Expand All @@ -144,7 +126,7 @@ type GaugeVec interface {

// Filter allows filtering or modifying metas from the provided map or returns error.
type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) error
}

// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
Expand All @@ -158,7 +140,6 @@ type MetaFetcher struct {

// Optional local directory to cache meta.json files.
cacheDir string
syncs prometheus.Counter
g singleflight.Group

mtx sync.Mutex
Expand Down Expand Up @@ -187,11 +168,6 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
cached: map[ulid.ULID]*metadata.Meta{},
metrics: NewFetcherMetrics(reg, nil, nil),
filters: filters,
syncs: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "base_syncs_total",
Help: "Total blocks metadata synchronization attempts by meta fetcher",
}),
}, nil
}

Expand Down Expand Up @@ -286,8 +262,6 @@ type response struct {
}

func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
f.syncs.Inc()

var (
resp = response{
metas: make(map[ulid.ULID]*metadata.Meta),
Expand Down Expand Up @@ -438,7 +412,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*metadata.Meta

for _, filter := range f.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if err := filter.Filter(ctx, metas, f.metrics.Synced, f.metrics.Modified); err != nil {
if err := filter.Filter(ctx, metas, f.metrics.Synced); err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}
Expand Down Expand Up @@ -503,7 +477,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) error {
deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark)

// Make a copy of block IDs to check, in order to avoid concurrency issues
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.
if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok {
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced)
} else {
err = filter.Filter(ctx, metas, f.metrics.Synced, f.metrics.Modified)
err = filter.Filter(ctx, metas, f.metrics.Synced)
}

if err != nil {
Expand Down
15 changes: 0 additions & 15 deletions pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
assert.Empty(t, logs)

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP blocks_meta_modified Number of blocks whose metadata changed
# TYPE blocks_meta_modified gauge
blocks_meta_modified{modified="replica-label-removed"} 0

# HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures
# TYPE blocks_meta_sync_failures_total counter
blocks_meta_sync_failures_total 0
Expand All @@ -99,7 +95,6 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
# TYPE blocks_meta_syncs_total counter
blocks_meta_syncs_total 1
`),
"blocks_meta_modified",
"blocks_meta_sync_failures_total",
"blocks_meta_synced",
"blocks_meta_syncs_total",
Expand All @@ -123,10 +118,6 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
assert.Empty(t, logs)

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP blocks_meta_modified Number of blocks whose metadata changed
# TYPE blocks_meta_modified gauge
blocks_meta_modified{modified="replica-label-removed"} 0

# HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures
# TYPE blocks_meta_sync_failures_total counter
blocks_meta_sync_failures_total 0
Expand All @@ -150,7 +141,6 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
# TYPE blocks_meta_syncs_total counter
blocks_meta_syncs_total 1
`),
"blocks_meta_modified",
"blocks_meta_sync_failures_total",
"blocks_meta_synced",
"blocks_meta_syncs_total",
Expand All @@ -177,10 +167,6 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
assert.Regexp(t, "corrupted bucket index found", logs)

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP blocks_meta_modified Number of blocks whose metadata changed
# TYPE blocks_meta_modified gauge
blocks_meta_modified{modified="replica-label-removed"} 0

# HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures
# TYPE blocks_meta_sync_failures_total counter
blocks_meta_sync_failures_total 0
Expand All @@ -204,7 +190,6 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
# TYPE blocks_meta_syncs_total counter
blocks_meta_syncs_total 1
`),
"blocks_meta_modified",
"blocks_meta_sync_failures_total",
"blocks_meta_synced",
"blocks_meta_syncs_total",
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.
}

// Filter implements block.MetadataFilter.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
return f.upstream.Filter(ctx, metas, synced, modified)
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
return f.upstream.Filter(ctx, metas, synced)
}

// FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
Expand Down Expand Up @@ -93,7 +93,7 @@ func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter {
return &minTimeMetaFilter{limit: limit}
}

func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
if f.limit <= 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
if bucketIndexEnabled {
require.NoError(t, f.FilterWithBucketIndex(ctx, inputMetas, idx, synced))
} else {
require.NoError(t, f.Filter(ctx, inputMetas, synced, nil))
require.NoError(t, f.Filter(ctx, inputMetas, synced))
}

assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(block.MarkedForDeletionMeta)))
Expand Down Expand Up @@ -138,12 +138,12 @@ func TestTimeMetaFilter(t *testing.T) {

// Test negative limit.
f := newMinTimeMetaFilter(-10 * time.Minute)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced, nil))
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))
assert.Equal(t, inputMetas, inputMetas)
assert.Equal(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))

f = newMinTimeMetaFilter(limit)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced, nil))
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))

assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))
Expand Down
4 changes: 0 additions & 4 deletions pkg/storegateway/metadata_fetcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ type MetadataFetcherMetrics struct {
syncFailures *prometheus.Desc
syncDuration *prometheus.Desc
synced *prometheus.Desc

// Ignored:
// blocks_meta_modified
// blocks_meta_base_syncs_total
}

func NewMetadataFetcherMetrics() *MetadataFetcherMetrics {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy)

// Filter implements block.MetadataFilter.
// This function is NOT safe for use by multiple goroutines concurrently.
func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error {
if err := a.strategy.FilterBlocks(ctx, a.userID, metas, a.lastBlocks, synced); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tools/compaction-planner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
compactor.NewNoCompactionMarkFilter(bucket.NewUserBucketClient(cfg.userID, bkt, nil), true),
} {
log.Printf("Filtering using %T\n", f)
err = f.Filter(ctx, metas, synced, nil)
err = f.Filter(ctx, metas, synced)
if err != nil {
log.Fatalln("filter failed:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/list-deduplicated-blocks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
[]string{"state"}, []string{"duplicate"})

log.Println("Running filter")
err = df.Filter(ctx, metasMap, s, nil)
err = df.Filter(ctx, metasMap, s)
if err != nil {
log.Fatalln("deduplication failed:", err)
}
Expand Down