Skip to content

Commit

Permalink
Merge pull request grafana/cortex-tools#5 from grafana/20190812_label…
Browse files Browse the repository at this point in the history
…_series_deletion

20190812 label series deletion, merging to get instrumentation
  • Loading branch information
jtlisi authored Aug 14, 2019
2 parents 2eb63bc + 7142687 commit 796e841
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cmd/cortextool/pkg/chunk/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
)

type Deleter interface {
DeleteEntry(context.Context, chunk.IndexEntry) error
DeleteEntry(context.Context, chunk.IndexEntry, bool) error
DeleteSeries(context.Context, chunk.IndexQuery) ([]error, error)
}
32 changes: 20 additions & 12 deletions cmd/cortextool/pkg/chunk/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filter

import (
"math"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
Expand All @@ -11,38 +12,45 @@ import (
)

type Config struct {
Name string
User string
From int64
To int64
Name string
User string
From int64
To int64
Labels string
}

func (c *Config) Register(cmd *kingpin.CmdClause) {
cmd.Flag("filter.name", "option to filter metrics by metric name").StringVar(&c.Name)
cmd.Flag("filter.user", "option to filter metrics by user").StringVar(&c.User)
cmd.Flag("filter.from", "option to filter only metrics after specific time point").Int64Var(&c.From)
cmd.Flag("filter.to", "option to filter only metrics after specific time point").Int64Var(&c.To)
cmd.Flag("filter.labels", "option to filter metrics with the corresponding labels, provide a comma separated list e.g. <label1>,<label2>").StringVar(&c.Labels)
}

// MetricFilter provides a set of matchers to determine whether a chunk should be returned
type MetricFilter struct {
User string
Name string
From model.Time
To model.Time
User string
Name string
From model.Time
To model.Time
Labels []string
}

// NewMetricFilter returns a metric filter
func NewMetricFilter(cfg Config) MetricFilter {
// By default the maximum time point is chosen if no point is specified
if cfg.To == 0 {
cfg.To = math.MaxInt64
}

labellist := strings.Split(cfg.Labels, ",")

return MetricFilter{
User: cfg.User,
Name: cfg.Name,
From: model.Time(cfg.From),
To: model.Time(cfg.To),
User: cfg.User,
Name: cfg.Name,
From: model.Time(cfg.From),
To: model.Time(cfg.To),
Labels: labellist,
}
}

Expand Down
14 changes: 11 additions & 3 deletions cmd/cortextool/pkg/chunk/gcp/bigtable_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,25 @@ func newstorageIndexDeleter(cfg gcp.Config, client *bigtable.Client) *storageInd
}
}

func (s *storageIndexDeleter) DeleteEntry(ctx context.Context, entry chunk.IndexEntry) error {
func (s *storageIndexDeleter) DeleteEntry(ctx context.Context, entry chunk.IndexEntry, deleteSeries bool) error {
sp, ctx := ot.StartSpanFromContext(ctx, "DeleteEntry")
defer sp.Finish()

table := s.client.Open(entry.TableName)
rowKey, columnKey := s.keysFn(entry.HashValue, entry.RangeValue)

mut := bigtable.NewMutation()
mut.DeleteCellsInColumn(columnFamily, columnKey)
if deleteSeries {
mut.DeleteRow()
} else {
mut.DeleteCellsInColumn(columnFamily, columnKey)
}

return table.Apply(ctx, rowKey, mut)
err := table.Apply(ctx, rowKey, mut)
if err != nil {
return err
}
return nil
}

func (s *storageIndexDeleter) DeleteSeries(ctx context.Context, series chunk.IndexQuery) ([]error, error) {
Expand Down
18 changes: 0 additions & 18 deletions cmd/cortextool/pkg/chunk/tool/tool.go

This file was deleted.

81 changes: 77 additions & 4 deletions cmd/cortextool/pkg/commands/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,47 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/gcp"
chunkTool "github.com/grafana/cortex-tool/pkg/chunk"
"github.com/grafana/cortex-tool/pkg/chunk/filter"
toolGCP "github.com/grafana/cortex-tool/pkg/chunk/gcp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/sirupsen/logrus"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
)

var (
chunkRefsDeleted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "chunk_entries_deleted_total",
Help: "Total count of entries deleted from the cortex index",
})

seriesEntriesDeleted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "series_entries_deleted_total",
Help: "Total count of entries deleted from the cortex index",
})

labelEntriesDeleted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "series_label_entries_deleted_total",
Help: "Total count of label entries deleted from the cortex index",
})

deletionDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "delete_operation_seconds",
Help: "The duration of the chunk deletion operation.",
})
)

// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
Configs []*chunk.PeriodConfig `yaml:"configs"`
Expand Down Expand Up @@ -49,7 +77,8 @@ type chunkCommandOptions struct {

type deleteChunkCommandOptions struct {
chunkCommandOptions
GCS gcp.GCSConfig
GCS gcp.GCSConfig
DeleteSeries bool
}

type deleteSeriesCommandOptions struct {
Expand All @@ -60,6 +89,7 @@ func registerDeleteChunkCommandOptions(cmd *kingpin.CmdClause) {
deleteChunkCommandOptions := &deleteChunkCommandOptions{}
deleteChunkCommand := cmd.Command("delete", "Deletes the specified chunk references from the index").Action(deleteChunkCommandOptions.run)
deleteChunkCommand.Flag("dryrun", "if enabled, no delete action will be taken").BoolVar(&deleteChunkCommandOptions.DryRun)
deleteChunkCommand.Flag("delete-series", "if enabled, the entire series will be deleted, not just the chunkID column").BoolVar(&deleteChunkCommandOptions.DeleteSeries)
deleteChunkCommand.Flag("bigtable.project", "bigtable project to use").StringVar(&deleteChunkCommandOptions.Bigtable.Project)
deleteChunkCommand.Flag("bigtable.instance", "bigtable instance to use").StringVar(&deleteChunkCommandOptions.Bigtable.Instance)
deleteChunkCommand.Flag("chunk.gcs.bucketname", "specify gcs bucket to scan for chunks").StringVar(&deleteChunkCommandOptions.GCS.BucketName)
Expand All @@ -79,11 +109,21 @@ func registerDeleteSeriesCommandOptions(cmd *kingpin.CmdClause) {

// RegisterChunkCommands registers the ChunkCommand flags with the kingpin applicattion
func RegisterChunkCommands(app *kingpin.Application) {
chunkCommand := app.Command("chunk", "Chunk related operations")
chunkCommand := app.Command("chunk", "Chunk related operations").PreAction(setup)
registerDeleteChunkCommandOptions(chunkCommand)
registerDeleteSeriesCommandOptions(chunkCommand)
}

func setup(k *kingpin.ParseContext) error {
prometheus.MustRegister(
chunkRefsDeleted,
seriesEntriesDeleted,
labelEntriesDeleted,
)

return nil
}

func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error {
err := c.Schema.Load()
if err != nil {
Expand Down Expand Up @@ -169,9 +209,11 @@ func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error {
}
for _, e := range entries {
if !c.DryRun {
err := deleter.DeleteEntry(ctx, e)
err := deleter.DeleteEntry(ctx, e, c.DeleteSeries)
if err != nil {
logrus.Errorln(err)
} else {
chunkRefsDeleted.Inc()
}
}
}
Expand All @@ -181,13 +223,15 @@ func (c *deleteChunkCommandOptions) run(k *kingpin.ParseContext) error {
}()

table := schemaConfig.ChunkTables.TableFor(fltr.From)

start := time.Now()
err = scanner.Scan(ctx, table, fltr, outChan)
close(outChan)
if err != nil {
return errors.Wrap(err, "scan failed")
}

wg.Wait()
deletionDuration.Set(time.Since(start).Seconds())
return nil
}

Expand Down Expand Up @@ -239,6 +283,8 @@ func (c *deleteSeriesCommandOptions) run(k *kingpin.ParseContext) error {
logrus.Errorln(err)
}

start := time.Now()

for _, query := range deleteMetricNameRows {
logrus.WithFields(logrus.Fields{
"table": query.TableName,
Expand All @@ -253,8 +299,35 @@ func (c *deleteSeriesCommandOptions) run(k *kingpin.ParseContext) error {
if err != nil {
return err
}
seriesEntriesDeleted.Inc()
}
}

for _, lbl := range fltr.Labels {
deleteMetricNameRows, err := schema.GetReadQueriesForMetricLabel(fltr.From, fltr.To, fltr.User, fltr.Name, lbl)
if err != nil {
logrus.Errorln(err)
}
for _, query := range deleteMetricNameRows {
logrus.WithFields(logrus.Fields{
"table": query.TableName,
"hashvalue": query.HashValue,
"dryrun": c.DryRun,
}).Debugln("deleting series from index")
if !c.DryRun {
errs, err := deleter.DeleteSeries(ctx, query)
for _, e := range errs {
logrus.WithError(e).Errorln("series deletion error")
}
if err != nil {
return err
}
labelEntriesDeleted.Inc()
}
}
}

deletionDuration.Set(time.Since(start).Seconds())

return nil
}

0 comments on commit 796e841

Please sign in to comment.