Skip to content

Commit

Permalink
some more knobs for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
otan committed Oct 19, 2021
1 parent 303d7f6 commit 92ac08f
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion pkg/sql/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/ttlpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
Expand All @@ -54,6 +55,18 @@ var ttlDeleteBatchSize = settings.RegisterIntSetting(
100,
)

var ttlActive = settings.RegisterBoolSetting(
"job.ttl.enabled",
"whether the TTL job is enabled",
true,
)

var ttlRateLimit = settings.RegisterIntSetting(
"job.ttl.rate_limit",
"maximum deletions per second",
5000,
)

func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error {
p := execCtx.(JobExecContext)
ie := p.ExecCfg().InternalExecutor
Expand All @@ -64,8 +77,15 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error {
var pkStr string
var pkTypes []string

if !ttlActive.Get(p.ExecCfg().SV()) {
return nil
}

metrics := p.ExecCfg().JobRegistry.MetricsStruct().TTL

limit := ttlRateLimit.Get(p.ExecCfg().SV())
rl := quotapool.NewRateLimiter("ttl", quotapool.Limit(limit), limit)

// TODO(XXX): get dynamic table names.
type rangeTarget struct {
startKey []string
Expand Down Expand Up @@ -244,8 +264,21 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error {
until = len(rows)
}
deleteBatch := rows[i:until]
a, err := rl.Acquire(ctx, int64(len(deleteBatch)))
if err != nil {
return err
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
defer a.Consume()
if _, err := ie.Exec(
ctx,
"ttl_delete_low_pri",
txn,
"SET TRANSACTION PRIORITY LOW",
); err != nil {
return err
}
placeholderVals := make([]interface{}, len(pks)*len(deleteBatch))
placeholderStr := ""
for i, row := range deleteBatch {
Expand Down Expand Up @@ -276,7 +309,7 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}
metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds())
metrics.RowDeletions.Inc(int64(len(rows)))
metrics.RowDeletions.Inc(int64(len(deleteBatch)))
return nil
}); err != nil {
return err
Expand Down

0 comments on commit 92ac08f

Please sign in to comment.