diff --git a/pkg/chunk/deleter.go b/pkg/chunk/deleter.go index 65189d0a5..b6723d0ce 100644 --- a/pkg/chunk/deleter.go +++ b/pkg/chunk/deleter.go @@ -7,6 +7,6 @@ import ( ) type Deleter interface { - DeleteEntry(context.Context, chunk.IndexEntry) error + DeleteEntry(context.Context, chunk.IndexEntry, bool) error DeleteSeries(context.Context, chunk.IndexQuery) ([]error, error) } diff --git a/pkg/chunk/filter/filter.go b/pkg/chunk/filter/filter.go index 3185c8bde..beb978085 100644 --- a/pkg/chunk/filter/filter.go +++ b/pkg/chunk/filter/filter.go @@ -2,6 +2,7 @@ package filter import ( "math" + "strings" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -11,10 +12,11 @@ import ( ) type Config struct { - Name string - User string - From int64 - To int64 + Name string + User string + From int64 + To int64 + Labels string } func (c *Config) Register(cmd *kingpin.CmdClause) { @@ -22,27 +24,33 @@ func (c *Config) Register(cmd *kingpin.CmdClause) { cmd.Flag("filter.user", "option to filter metrics by user").StringVar(&c.User) cmd.Flag("filter.from", "option to filter only metrics after specific time point").Int64Var(&c.From) cmd.Flag("filter.to", "option to filter only metrics after specific time point").Int64Var(&c.To) + cmd.Flag("filter.labels", "option to filter metrics with the corresponding labels, provide a comma separated list e.g. ,").StringVar(&c.Labels) } // MetricFilter provides a set of matchers to determine whether a chunk should be returned type MetricFilter struct { - User string - Name string - From model.Time - To model.Time + User string + Name string + From model.Time + To model.Time + Labels []string } +// NewMetricFilter returns a metric filter func NewMetricFilter(cfg Config) MetricFilter { // By default the maximum time point is chosen if no point is specified if cfg.To == 0 { cfg.To = math.MaxInt64 } + labellist := strings.Split(cfg.Labels, ",") + return MetricFilter{ - User: cfg.User, - Name: cfg.Name, - From: model.Time(cfg.From), - To: model.Time(cfg.To), + User: cfg.User, + Name: cfg.Name, + From: model.Time(cfg.From), + To: model.Time(cfg.To), + Labels: labellist, } } diff --git a/pkg/chunk/gcp/bigtable_delete.go b/pkg/chunk/gcp/bigtable_delete.go index 45274b859..5c25230d9 100644 --- a/pkg/chunk/gcp/bigtable_delete.go +++ b/pkg/chunk/gcp/bigtable_delete.go @@ -64,7 +64,7 @@ func newstorageIndexDeleter(cfg gcp.Config, client *bigtable.Client) *storageInd } } -func (s *storageIndexDeleter) DeleteEntry(ctx context.Context, entry chunk.IndexEntry) error { +func (s *storageIndexDeleter) DeleteEntry(ctx context.Context, entry chunk.IndexEntry, deleteSeries bool) error { sp, ctx := ot.StartSpanFromContext(ctx, "DeleteEntry") defer sp.Finish() @@ -72,9 +72,17 @@ func (s *storageIndexDeleter) DeleteEntry(ctx context.Context, entry chunk.Index rowKey, columnKey := s.keysFn(entry.HashValue, entry.RangeValue) mut := bigtable.NewMutation() - mut.DeleteCellsInColumn(columnFamily, columnKey) + if deleteSeries { + mut.DeleteRow() + } else { + mut.DeleteCellsInColumn(columnFamily, columnKey) + } - return table.Apply(ctx, rowKey, mut) + err := table.Apply(ctx, rowKey, mut) + if err != nil { + return err + } + return nil } func (s *storageIndexDeleter) DeleteSeries(ctx context.Context, series chunk.IndexQuery) ([]error, error) { diff --git a/pkg/chunk/tool/tool.go b/pkg/chunk/tool/tool.go deleted file mode 100644 index 81cfaffaa..000000000 --- a/pkg/chunk/tool/tool.go +++ /dev/null @@ -1,18 +0,0 @@ -package tool - -import ( - "context" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/grafana/cortex-tool/pkg/chunk/filter" -) - -// Scanner scans an -type Scanner interface { - Scan(ctx context.Context, table string, fltr filter.MetricFilter, out chan chunk.Chunk) error -} - -type Deleter interface { - DeleteEntry(context.Context, chunk.IndexEntry) error - DeleteSeries(context.Context, chunk.IndexQuery) ([]error, error) -} diff --git a/pkg/commands/chunks.go b/pkg/commands/chunks.go index 4bebfaaf3..fc223b67c 100644 --- a/pkg/commands/chunks.go +++ b/pkg/commands/chunks.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "sync" + "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/gcp" @@ -12,12 +13,39 @@ import ( "github.com/grafana/cortex-tool/pkg/chunk/filter" toolGCP "github.com/grafana/cortex-tool/pkg/chunk/gcp" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/sirupsen/logrus" "gopkg.in/alecthomas/kingpin.v2" "gopkg.in/yaml.v2" ) +var ( + chunkRefsDeleted = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "chunk_entries_deleted_total", + Help: "Total count of entries deleted from the cortex index", + }) + + seriesEntriesDeleted = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "series_entries_deleted_total", + Help: "Total count of entries deleted from the cortex index", + }) + + labelEntriesDeleted = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "series_label_entries_deleted_total", + Help: "Total count of label entries deleted from the cortex index", + }) + + deletionDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "delete_operation_seconds", + Help: "The duration of the chunk deletion operation.", + }) +) + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []*chunk.PeriodConfig `yaml:"configs"` @@ -49,7 +77,8 @@ type chunkCommandOptions struct { type deleteChunkCommandOptions struct { chunkCommandOptions - GCS gcp.GCSConfig + GCS gcp.GCSConfig + DeleteSeries bool } type deleteSeriesCommandOptions struct { @@ -60,6 +89,7 @@ func registerDeleteChunkCommandOptions(cmd *kingpin.CmdClause) { deleteChunkCommandOptions := &deleteChunkCommandOptions{} deleteChunkCommand := cmd.Command("delete", "Deletes the specified chunk references from the index").Action(deleteChunkCommandOptions.run) deleteChunkCommand.Flag("dryrun", "if enabled, no delete action will be taken").BoolVar(&deleteChunkCommandOptions.DryRun) + deleteChunkCommand.Flag("delete-series", "if enabled, the entire series will be deleted, not just the chunkID column").BoolVar(&deleteChunkCommandOptions.DeleteSeries) deleteChunkCommand.Flag("bigtable.project", "bigtable project to use").StringVar(&deleteChunkCommandOptions.Bigtable.Project) deleteChunkCommand.Flag("bigtable.instance", "bigtable instance to use").StringVar(&deleteChunkCommandOptions.Bigtable.Instance) deleteChunkCommand.Flag("chunk.gcs.bucketname", "specify gcs bucket to scan for chunks").StringVar(&deleteChunkCommandOptions.GCS.BucketName) @@ -79,11 +109,21 @@ func registerDeleteSeriesCommandOptions(cmd *kingpin.CmdClause) { // RegisterChunkCommands registers the ChunkCommand flags with the kingpin applicattion func RegisterChunkCommands(app *kingpin.Application) { - chunkCommand := app.Command("chunk", "Chunk related operations") + chunkCommand := app.Command("chunk", "Chunk related operations").PreAction(setup) registerDeleteChunkCommandOptions(chunkCommand) registerDeleteSeriesCommandOptions(chunkCommand) } +func setup(k *kingpin.ParseContext) error { + prometheus.MustRegister( + chunkRefsDeleted, + seriesEntriesDeleted, + labelEntriesDeleted, + ) + + return nil +} + func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error { err := c.Schema.Load() if err != nil { @@ -169,9 +209,11 @@ func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error { } for _, e := range entries { if !c.DryRun { - err := deleter.DeleteEntry(ctx, e) + err := deleter.DeleteEntry(ctx, e, c.DeleteSeries) if err != nil { logrus.Errorln(err) + } else { + chunkRefsDeleted.Inc() } } } @@ -181,13 +223,15 @@ func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error { }() table := schemaConfig.ChunkTables.TableFor(fltr.From) + + start := time.Now() err = scanner.Scan(ctx, table, fltr, outChan) close(outChan) if err != nil { return errors.Wrap(err, "scan failed") } - wg.Wait() + deletionDuration.Set(time.Since(start).Seconds()) return nil } @@ -239,6 +283,8 @@ func (c *deleteSeriesCommandOptions) run(k *kingpin.ParseContext) error { logrus.Errorln(err) } + start := time.Now() + for _, query := range deleteMetricNameRows { logrus.WithFields(logrus.Fields{ "table": query.TableName, @@ -253,8 +299,35 @@ func (c *deleteSeriesCommandOptions) run(k *kingpin.ParseContext) error { if err != nil { return err } + seriesEntriesDeleted.Inc() } } + for _, lbl := range fltr.Labels { + deleteMetricNameRows, err := schema.GetReadQueriesForMetricLabel(fltr.From, fltr.To, fltr.User, fltr.Name, lbl) + if err != nil { + logrus.Errorln(err) + } + for _, query := range deleteMetricNameRows { + logrus.WithFields(logrus.Fields{ + "table": query.TableName, + "hashvalue": query.HashValue, + "dryrun": c.DryRun, + }).Debugln("deleting series from index") + if !c.DryRun { + errs, err := deleter.DeleteSeries(ctx, query) + for _, e := range errs { + logrus.WithError(e).Errorln("series deletion error") + } + if err != nil { + return err + } + labelEntriesDeleted.Inc() + } + } + } + + deletionDuration.Set(time.Since(start).Seconds()) + return nil }