diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index de41f6a5c6e2..12a1f5ba60fe 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -6,7 +6,6 @@ import ( "flag" "path/filepath" "reflect" - "strconv" "sync" "time" @@ -198,7 +197,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { return err } - interval := extractIntervalFromTableName(tableName) + interval := retention.ExtractIntervalFromTableName(tableName) intervalHasExpiredChunks := false if c.cfg.RetentionEnabled { intervalHasExpiredChunks = c.expirationChecker.IntervalHasExpiredChunks(interval) @@ -338,17 +337,6 @@ func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bo return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval) } -func extractIntervalFromTableName(tableName string) model.Interval { - interval := model.Interval{ - Start: 0, - End: model.Now(), - } - tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64) - if err != nil { - return interval - } - - interval.Start = model.TimeFromUnix(tableNumber * 86400) - interval.End = interval.Start.Add(24 * time.Hour) - return interval +func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { + return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now) } diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index f508a0beaaa1..d00b6a24e47e 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -12,11 +12,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" loki_storage "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" @@ -59,43 +57,6 @@ func TestIsDefaults(t *testing.T) { } } -func TestExtractIntervalFromTableName(t *testing.T) { - periodicTableConfig := chunk.PeriodicTableConfig{ - Prefix: "dummy", - Period: 24 * time.Hour, - } - - const millisecondsInDay = model.Time(24 * time.Hour / time.Millisecond) - - calculateInterval := func(tm model.Time) (m model.Interval) { - m.Start = tm - tm%millisecondsInDay - m.End = m.Start + millisecondsInDay - return - } - - for i, tc := range []struct { - tableName string - expectedInterval model.Interval - }{ - { - tableName: periodicTableConfig.TableFor(model.Now()), - expectedInterval: calculateInterval(model.Now()), - }, - { - tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour)), - expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour)), - }, - { - tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour).Add(time.Minute)), - expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour).Add(time.Minute)), - }, - } { - t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tc.expectedInterval, extractIntervalFromTableName(tc.tableName)) - }) - } -} - func TestCompactor_RunCompaction(t *testing.T) { tempDir, err := ioutil.TempDir("", "compactor-run-compaction") require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index ed1b202b2470..bb66d08fe85e 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -214,3 +214,7 @@ func (d *DeleteRequestsManager) IntervalHasExpiredChunks(interval model.Interval return false } + +func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool { + return false +} diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 2d508a2c51af..42716db7dfc6 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -19,6 +19,7 @@ type ExpirationChecker interface { MarkPhaseStarted() MarkPhaseFailed() MarkPhaseFinished() + DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool } type expirationChecker struct { @@ -46,6 +47,15 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod return now.Sub(ref.Through) > period, nil } +// DropFromIndex tells if it is okay to drop the chunk entry from index table. +// We check if tableEndTime is out of retention period, calculated using the labels from the chunk. +// If the tableEndTime is out of retention then we can drop the chunk entry without removing the chunk from the store. +func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { + userID := unsafeGetString(ref.UserID) + period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) + return now.Sub(tableEndTime) > period +} + func (e *expirationChecker) MarkPhaseStarted() { smallestRetentionPeriod := findSmallestRetentionPeriod(e.tenantsRetention.limits) e.latestRetentionStartTime = model.Now().Add(-smallestRetentionPeriod) diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator.go b/pkg/storage/stores/shipper/compactor/retention/iterator.go index 7a2f7e25a364..a0b1df5521a4 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator.go @@ -1,11 +1,9 @@ package retention import ( - "bytes" - "encoding/binary" "fmt" - "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "go.etcd.io/bbolt" @@ -91,93 +89,60 @@ func (b *chunkIndexIterator) Next() bool { } type SeriesCleaner interface { - Cleanup(seriesID []byte, userID []byte) error + Cleanup(userID []byte, lbls labels.Labels) error } type seriesCleaner struct { - bucketTimestamps []string - shards map[uint32]string - cursor *bbolt.Cursor - config chunk.PeriodConfig + tableInterval model.Interval + shards map[uint32]string + bucket *bbolt.Bucket + config chunk.PeriodConfig + schema chunk.SeriesStoreSchema buf []byte } -func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig) *seriesCleaner { - var ( - fromDay = config.From.Time.Unix() / int64(config.IndexTables.Period/time.Second) - throughDay = config.From.Add(config.IndexTables.Period).Unix() / int64(config.IndexTables.Period/time.Second) - bucketTimestamps = []string{} - ) - for i := fromDay; i <= throughDay; i++ { - bucketTimestamps = append(bucketTimestamps, fmt.Sprintf("d%d", i)) - } +func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName string) *seriesCleaner { + baseSchema, _ := config.CreateSchema() + schema := baseSchema.(chunk.SeriesStoreSchema) var shards map[uint32]string + if config.RowShards != 0 { shards = map[uint32]string{} for s := uint32(0); s <= config.RowShards; s++ { shards[s] = fmt.Sprintf("%02d", s) } } + return &seriesCleaner{ - bucketTimestamps: bucketTimestamps, - cursor: bucket.Cursor(), - buf: make([]byte, 0, 1024), - config: config, - shards: shards, + tableInterval: ExtractIntervalFromTableName(tableName), + schema: schema, + bucket: bucket, + buf: make([]byte, 0, 1024), + config: config, + shards: shards, } } -func (s *seriesCleaner) Cleanup(seriesID []byte, userID []byte) error { - for _, timestamp := range s.bucketTimestamps { - // build the chunk ref prefix - s.buf = s.buf[:0] - if s.config.Schema != "v9" { - shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards - s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...) - s.buf = append(s.buf, ':') - } - s.buf = append(s.buf, userID...) - s.buf = append(s.buf, ':') - s.buf = append(s.buf, unsafeGetBytes(timestamp)...) - s.buf = append(s.buf, ':') - s.buf = append(s.buf, seriesID...) - - if key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf) { - // this series still have chunk entries we can't cleanup - continue - } - // we don't have any chunk ref for that series let's delete all label index entries - s.buf = s.buf[:0] - if s.config.Schema != "v9" { - shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards - s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...) - s.buf = append(s.buf, ':') - } - s.buf = append(s.buf, userID...) - s.buf = append(s.buf, ':') - s.buf = append(s.buf, unsafeGetBytes(timestamp)...) - s.buf = append(s.buf, ':') - s.buf = append(s.buf, unsafeGetBytes(logMetricName)...) +func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error { + _, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "") + if err != nil { + return err + } - // delete all seriesRangeKeyV1 and labelSeriesRangeKeyV1 via prefix - // todo(cyriltovena) we might be able to encode index key instead of parsing all label entries for faster delete. - for key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf); key, _ = s.cursor.Next() { + for i := range indexEntries { + for _, indexEntry := range indexEntries[i] { + key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue)) + key = append(key, []byte(indexEntry.HashValue)...) + key = append(key, []byte(separator)...) + key = append(key, indexEntry.RangeValue...) - parsedSeriesID, ok, err := parseLabelIndexSeriesID(decodeKey(key)) + err := s.bucket.Delete(key) if err != nil { return err } - if !ok { - continue - } - if !bytes.Equal(seriesID, parsedSeriesID) { - continue - } - if err := s.cursor.Delete(); err != nil { - return err - } } } + return nil } diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go index 7bf7ead4a2b7..b7454f662614 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go @@ -100,11 +100,11 @@ func Test_SeriesCleaner(t *testing.T) { require.NoError(t, err) err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config) - if err := cleaner.Cleanup(entryFromChunk(c2).SeriesID, entryFromChunk(c2).UserID); err != nil { + cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config, tables[0].name) + if err := cleaner.Cleanup(entryFromChunk(c2).UserID, c2.Metric); err != nil { return err } - if err := cleaner.Cleanup(entryFromChunk(c1).SeriesID, entryFromChunk(c1).UserID); err != nil { + if err := cleaner.Cleanup(entryFromChunk(c1).UserID, c1.Metric); err != nil { return err } return nil diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 347434f773a3..0af991d69019 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -104,7 +104,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return err } - empty, err = markforDelete(ctx, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg), t.expiration, chunkRewriter) + empty, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter) if err != nil { return err } @@ -129,38 +129,65 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return empty, markerWriter.Count(), nil } -func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) { +func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) { seriesMap := newUserSeriesMap() + // tableInterval holds the interval for which the table is expected to have the chunks indexed + tableInterval := ExtractIntervalFromTableName(tableName) empty := true now := model.Now() + for chunkIt.Next() { if chunkIt.Err() != nil { return false, chunkIt.Err() } c := chunkIt.Entry() + seriesMap.Add(c.SeriesID, c.UserID, c.Labels) + + // see if the chunk is deleted completely or partially if expired, nonDeletedIntervals := expiration.Expired(c, now); expired { - if len(nonDeletedIntervals) == 0 { - seriesMap.Add(c.SeriesID, c.UserID) - } else { + if len(nonDeletedIntervals) > 0 { wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals) if err != nil { return false, err } - if !wroteChunks { - seriesMap.Add(c.SeriesID, c.UserID) + if wroteChunks { + // we have re-written chunk to the storage so the table won't be empty and the series are still being referred. + empty = false + seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) } } if err := chunkIt.Delete(); err != nil { return false, err } - if err := marker.Put(c.ChunkID); err != nil { - return false, err + + // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. + // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then + // the retention would fail because it would fail to find it in the storage. + if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End { + if err := marker.Put(c.ChunkID); err != nil { + return false, err + } } continue } + + // The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval. + // If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table. + // We would now check if the end time of the tableInterval is out of retention period so that + // we can drop the chunk entry from this table without removing the chunk from the store. + if c.Through.After(tableInterval.End) { + if expiration.DropFromIndex(c, tableInterval.End, now) { + if err := chunkIt.Delete(); err != nil { + return false, err + } + continue + } + } + empty = false + seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) } if empty { return true, nil @@ -168,8 +195,13 @@ func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt Chun if ctx.Err() != nil { return false, ctx.Err() } - return false, seriesMap.ForEach(func(seriesID, userID []byte) error { - return seriesCleaner.Cleanup(seriesID, userID) + + return false, seriesMap.ForEach(func(info userSeriesInfo) error { + if !info.isDeleted { + return nil + } + + return seriesCleaner.Cleanup(info.UserID(), info.lbls) }) } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index e1f78436132f..a0741ad6a051 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -187,7 +187,7 @@ func (noopWriter) Close() error { return nil } type noopCleaner struct{} -func (noopCleaner) Cleanup(seriesID []byte, userID []byte) error { return nil } +func (noopCleaner) Cleanup(userID []byte, lbls labels.Labels) error { return nil } func Test_EmptyTable(t *testing.T) { schema := allSchemas[0] @@ -207,7 +207,7 @@ func Test_EmptyTable(t *testing.T) { err := tables[0].DB.Update(func(tx *bbolt.Tx) error { it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) require.NoError(t, err) - empty, err := markforDelete(context.Background(), noopWriter{}, it, noopCleaner{}, + empty, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil) require.NoError(t, err) require.True(t, empty) @@ -407,3 +407,292 @@ func TestChunkRewriter(t *testing.T) { }) } } + +type seriesCleanedRecorder struct { + // map of userID -> map of labels hash -> struct{} + deletedSeries map[string]map[uint64]struct{} +} + +func newSeriesCleanRecorder() *seriesCleanedRecorder { + return &seriesCleanedRecorder{map[string]map[uint64]struct{}{}} +} + +func (s *seriesCleanedRecorder) Cleanup(userID []byte, lbls labels.Labels) error { + s.deletedSeries[string(userID)] = map[uint64]struct{}{lbls.Hash(): {}} + return nil +} + +type chunkExpiry struct { + isExpired bool + nonDeletedIntervals []model.Interval +} + +type mockExpirationChecker struct { + ExpirationChecker + chunksExpiry map[string]chunkExpiry +} + +func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpirationChecker { + return mockExpirationChecker{chunksExpiry: chunksExpiry} +} + +func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { + ce := m.chunksExpiry[string(ref.ChunkID)] + return ce.isExpired, ce.nonDeletedIntervals +} + +func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { + return false +} + +func TestMarkForDelete_SeriesCleanup(t *testing.T) { + now := model.Now() + schema := allSchemas[2] + userID := "1" + todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now)) + + for _, tc := range []struct { + name string + chunks []chunk.Chunk + expiry []chunkExpiry + expectedDeletedSeries []map[uint64]struct{} + expectedEmpty []bool + }{ + { + name: "no chunk and series deleted", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now), + }, + expiry: []chunkExpiry{ + { + isExpired: false, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, + }, + expectedEmpty: []bool{ + false, + }, + }, + { + name: "only one chunk in store which gets deleted", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now), + }, + expiry: []chunkExpiry{ + { + isExpired: true, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, + }, + expectedEmpty: []bool{ + true, + }, + }, + { + name: "only one chunk in store which gets partially deleted", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now), + }, + expiry: []chunkExpiry{ + { + isExpired: true, + nonDeletedIntervals: []model.Interval{{ + Start: now.Add(-15 * time.Minute), + End: now, + }}, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, + }, + expectedEmpty: []bool{ + false, + }, + }, + { + name: "one of two chunks deleted", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now), + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, now.Add(-30*time.Minute), now), + }, + expiry: []chunkExpiry{ + { + isExpired: false, + }, + { + isExpired: true, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + {labels.Labels{labels.Label{Name: "foo", Value: "2"}}.Hash(): struct{}{}}, + }, + expectedEmpty: []bool{ + false, + }, + }, + { + name: "one of two chunks partially deleted", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now), + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, now.Add(-30*time.Minute), now), + }, + expiry: []chunkExpiry{ + { + isExpired: false, + }, + { + isExpired: true, + nonDeletedIntervals: []model.Interval{{ + Start: now.Add(-15 * time.Minute), + End: now, + }}, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, + }, + expectedEmpty: []bool{ + false, + }, + }, + { + name: "one big chunk partially deleted for yesterdays table without rewrite", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start.Add(-time.Hour), now), + }, + expiry: []chunkExpiry{ + { + isExpired: true, + nonDeletedIntervals: []model.Interval{{ + Start: todaysTableInterval.Start, + End: now, + }}, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, nil, + }, + expectedEmpty: []bool{ + true, false, + }, + }, + { + name: "one big chunk partially deleted for yesterdays table with rewrite", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start.Add(-time.Hour), now), + }, + expiry: []chunkExpiry{ + { + isExpired: true, + nonDeletedIntervals: []model.Interval{{ + Start: todaysTableInterval.Start.Add(-30 * time.Minute), + End: now, + }}, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, nil, + }, + expectedEmpty: []bool{ + false, false, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + store := newTestStore(t) + + require.NoError(t, store.Put(context.TODO(), tc.chunks)) + chunksExpiry := map[string]chunkExpiry{} + for i, chunk := range tc.chunks { + chunksExpiry[chunk.ExternalKey()] = tc.expiry[i] + } + + expirationChecker := newMockExpirationChecker(chunksExpiry) + + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, len(tc.expectedDeletedSeries)) + + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder) + + for i, table := range tables { + seriesCleanRecorder := newSeriesCleanRecorder() + err := table.DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + require.NoError(t, err) + + cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(bucketName)) + require.NoError(t, err) + empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder, + expirationChecker, cr) + require.NoError(t, err) + require.Equal(t, tc.expectedEmpty[i], empty) + return nil + }) + require.NoError(t, err) + + require.EqualValues(t, tc.expectedDeletedSeries[i], seriesCleanRecorder.deletedSeries[userID]) + } + }) + } +} + +func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { + schema := allSchemas[2] + store := newTestStore(t) + now := model.Now() + todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now)) + retentionPeriod := now.Sub(todaysTableInterval.Start) / 2 + + // chunks in retention + c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, now) + c2 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start.Add(-7*24*time.Hour), now) + + // chunks out of retention + c3 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, now.Add(-retentionPeriod)) + c4 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "3"}}, todaysTableInterval.Start.Add(-12*time.Hour), todaysTableInterval.Start.Add(-10*time.Hour)) + c5 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "4"}}, todaysTableInterval.Start, now.Add(-retentionPeriod)) + + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{ + c1, c2, c3, c4, c5, + })) + + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, 8) + + for i, table := range tables { + err := table.DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + require.NoError(t, err) + empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{}, + NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil) + require.NoError(t, err) + if i == 7 { + require.False(t, empty) + } else { + require.True(t, empty, "table %s must be empty", table.name) + } + return nil + }) + require.NoError(t, err) + require.NoError(t, table.Close()) + } + + store.open() + + // verify the chunks which were not supposed to be deleted are still there + require.True(t, store.HasChunk(c1)) + require.True(t, store.HasChunk(c2)) + + // verify the chunks which were supposed to be deleted are gone + require.False(t, store.HasChunk(c3)) + require.False(t, store.HasChunk(c4)) + require.False(t, store.HasChunk(c5)) +} diff --git a/pkg/storage/stores/shipper/compactor/retention/series.go b/pkg/storage/stores/shipper/compactor/retention/series.go index 690018d32ec5..bd447773265c 100644 --- a/pkg/storage/stores/shipper/compactor/retention/series.go +++ b/pkg/storage/stores/shipper/compactor/retention/series.go @@ -44,20 +44,42 @@ func (us *userSeries) Reset(seriesID []byte, userID []byte) { us.seriesIDLen = len(seriesID) } -type userSeriesMap map[string]userSeries +type userSeriesInfo struct { + userSeries + isDeleted bool + lbls labels.Labels +} + +type userSeriesMap map[string]userSeriesInfo func newUserSeriesMap() userSeriesMap { return make(userSeriesMap) } -func (u userSeriesMap) Add(seriesID []byte, userID []byte) { +func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) { + us := newUserSeries(seriesID, userID) + if _, ok := u[us.Key()]; ok { + return + } + + u[us.Key()] = userSeriesInfo{ + userSeries: us, + isDeleted: true, + lbls: lbls, + } +} + +// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store +func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) { us := newUserSeries(seriesID, userID) - u[us.Key()] = us + usi := u[us.Key()] + usi.isDeleted = false + u[us.Key()] = usi } -func (u userSeriesMap) ForEach(callback func(seriesID []byte, userID []byte) error) error { +func (u userSeriesMap) ForEach(callback func(info userSeriesInfo) error) error { for _, v := range u { - if err := callback(v.SeriesID(), v.UserID()); err != nil { + if err := callback(v); err != nil { return err } } diff --git a/pkg/storage/stores/shipper/compactor/retention/series_test.go b/pkg/storage/stores/shipper/compactor/retention/series_test.go index dc978055c4f3..04577d540b3d 100644 --- a/pkg/storage/stores/shipper/compactor/retention/series_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/series_test.go @@ -10,17 +10,17 @@ import ( func Test_UserSeries(t *testing.T) { m := newUserSeriesMap() - m.Add([]byte(`series1`), []byte(`user1`)) - m.Add([]byte(`series1`), []byte(`user1`)) - m.Add([]byte(`series1`), []byte(`user2`)) - m.Add([]byte(`series2`), []byte(`user1`)) - m.Add([]byte(`series2`), []byte(`user1`)) - m.Add([]byte(`series2`), []byte(`user2`)) + m.Add([]byte(`series1`), []byte(`user1`), nil) + m.Add([]byte(`series1`), []byte(`user1`), nil) + m.Add([]byte(`series1`), []byte(`user2`), nil) + m.Add([]byte(`series2`), []byte(`user1`), nil) + m.Add([]byte(`series2`), []byte(`user1`), nil) + m.Add([]byte(`series2`), []byte(`user2`), nil) keys := []string{} - err := m.ForEach(func(seriesID, userID []byte) error { - keys = append(keys, string(seriesID)+":"+string(userID)) + err := m.ForEach(func(info userSeriesInfo) error { + keys = append(keys, string(info.SeriesID())+":"+string(info.UserID())) return nil }) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/compactor/retention/util.go b/pkg/storage/stores/shipper/compactor/retention/util.go index 4802cbe9534f..b5d3630fe1d1 100644 --- a/pkg/storage/stores/shipper/compactor/retention/util.go +++ b/pkg/storage/stores/shipper/compactor/retention/util.go @@ -4,8 +4,11 @@ import ( "fmt" "io" "os" - "reflect" + "strconv" + "time" "unsafe" + + "github.com/prometheus/common/model" ) // unsafeGetString is like yolostring but with a meaningful name @@ -13,14 +16,6 @@ func unsafeGetString(buf []byte) string { return *((*string)(unsafe.Pointer(&buf))) } -func unsafeGetBytes(s string) []byte { - var buf []byte - p := unsafe.Pointer(&buf) - *(*string)(p) = s - (*reflect.SliceHeader)(p).Cap = len(s) - return buf -} - func copyFile(src, dst string) (int64, error) { sourceFileStat, err := os.Stat(src) if err != nil { @@ -45,3 +40,19 @@ func copyFile(src, dst string) (int64, error) { nBytes, err := io.Copy(destination, source) return nBytes, err } + +// ExtractIntervalFromTableName gives back the time interval for which the table is expected to hold the chunks index. +func ExtractIntervalFromTableName(tableName string) model.Interval { + interval := model.Interval{ + Start: 0, + End: model.Now(), + } + tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64) + if err != nil { + return interval + } + + interval.Start = model.TimeFromUnix(tableNumber * 86400) + interval.End = interval.Start.Add(24 * time.Hour) + return interval +} diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/shipper/compactor/retention/util_test.go index 4244edc04c16..9ba7d2bfec68 100644 --- a/pkg/storage/stores/shipper/compactor/retention/util_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/util_test.go @@ -2,6 +2,7 @@ package retention import ( "context" + "fmt" "io/ioutil" "path/filepath" "testing" @@ -262,3 +263,40 @@ func newTestStore(t testing.TB) *testStore { limits: limits, } } + +func TestExtractIntervalFromTableName(t *testing.T) { + periodicTableConfig := chunk.PeriodicTableConfig{ + Prefix: "dummy", + Period: 24 * time.Hour, + } + + const millisecondsInDay = model.Time(24 * time.Hour / time.Millisecond) + + calculateInterval := func(tm model.Time) (m model.Interval) { + m.Start = tm - tm%millisecondsInDay + m.End = m.Start + millisecondsInDay + return + } + + for i, tc := range []struct { + tableName string + expectedInterval model.Interval + }{ + { + tableName: periodicTableConfig.TableFor(model.Now()), + expectedInterval: calculateInterval(model.Now()), + }, + { + tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour)), + expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour)), + }, + { + tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour).Add(time.Minute)), + expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour).Add(time.Minute)), + }, + } { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, tc.expectedInterval, ExtractIntervalFromTableName(tc.tableName)) + }) + } +}