Skip to content

Commit

Permalink
more love
Browse files Browse the repository at this point in the history
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
otan committed Oct 5, 2021
1 parent a713131 commit 3d4e8c9
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 113 deletions.
8 changes: 8 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type TTLMetrics struct {
DeletionSelectNanos *metric.Histogram
DeletionDeleteNanos *metric.Histogram
RowDeletions *metric.Counter
NumWorkers *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -96,6 +97,13 @@ func makeTTLMetrics(histogramWindowInterval time.Duration) TTLMetrics {
MetricType: io_prometheus_client.MetricType_COUNTER,
},
),
NumWorkers: metric.NewGauge(
metric.Metadata{
Name: "jobs.ttl.num_workers",
Measurement: "num_workers",
Unit: metric.Unit_COUNT,
},
),
}
}

Expand Down
304 changes: 194 additions & 110 deletions pkg/sql/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand All @@ -45,149 +47,231 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error {
db := p.ExecCfg().DB
details := t.job.Details().(jobspb.TTLDetails)
cn := tree.Name(details.ColumnName)
var lastRows []interface{}
var pks []string
var pkStr string
const batchSize = 1000
var pkTypes []string
const batchSize = 500

metrics := p.ExecCfg().JobRegistry.MetricsStruct().TTL

// TODO(XXX): get dynamic table names.
type rangeTarget struct {
startKey []string
endKey []string
}
var rangeTargets []rangeTarget

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID(ctx, txn, details.TableID, tree.ObjectLookupFlagsWithRequired())
if err != nil {
return err
}
pks = desc.GetPrimaryIndex().IndexDesc().KeyColumnNames
colMap := make(map[descpb.ColumnID]catalog.Column)
for _, col := range desc.AllColumns() {
colMap[col.GetID()] = col
}
for _, id := range desc.GetPrimaryIndex().IndexDesc().KeyColumnIDs {
pkTypes = append(pkTypes, colMap[id].GetType().SQLString())
}
pkStr = strings.Join(pks, ", ")

rows, err := ie.QueryIterator(
ctx,
"ttl-range",
txn,
fmt.Sprintf(`select start_pretty, end_pretty from crdb_internal.ranges where table_id = %d`, details.TableID),
)
if err != nil {
return err
}
for {
hasNext, err := rows.Next(ctx)
if err != nil {
return err
}
if !hasNext {
break
}
// TODO(XXX): "/" in key name.
processKey := func(s tree.Datum) []string {
start := tree.MustBeDString(s)
k := strings.Split(string(start), "/")
if len(k) > 4 {
return k[4:]
}
return nil
}
rangeTargets = append(rangeTargets, rangeTarget{
startKey: processKey(rows.Cur()[0]),
endKey: processKey(rows.Cur()[1]),
})
}
return nil
}); err != nil {
return err
}

untilTS := timeutil.Unix(details.UntilUnix, 0)
for {
var rows []tree.Datums
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
startTime := timeutil.Now()
var wg sync.WaitGroup
var errs error
wg.Add(len(rangeTargets))
for _, rangeTarget := range rangeTargets {
rangeTarget := rangeTarget
go func() {
metrics.NumWorkers.Inc(1)
defer func() {
metrics.DeletionTotalNanos.RecordValue(timeutil.Now().Sub(startTime).Nanoseconds())
wg.Done()
metrics.NumWorkers.Dec(1)
}()
var lastRows []interface{}
if err := func() error {
untilTS := timeutil.Unix(details.UntilUnix, 0)
for {
var rows []tree.Datums
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
startTime := timeutil.Now()
defer func() {
metrics.DeletionTotalNanos.RecordValue(timeutil.Now().Sub(startTime).Nanoseconds())
}()

// TODO(XXX): order by directions.
// TODO(XXX): prevent full table scans?
_, err := ie.Exec(ctx, "ttl-begin", txn, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
if err != nil {
return err
}

// TODO(XXX): order by directions.
// TODO(XXX): prevent full table scans?
_, err := ie.Exec(ctx, "ttl-begin", txn, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
if err != nil {
return err
}
placeholderPos := 2
generatePKComparison := func(sign string, until int, includeType bool) string {
// (pk1, pk2, pk3) > ($2, $3, $4)
s := fmt.Sprintf("(%s) %s (", strings.Join(pks[:until], ", "), sign)
for i := range pks {
if i >= until {
break
}
if i > 0 {
s += ", "
}
s += fmt.Sprintf("$%d", placeholderPos)
if includeType {
s += fmt.Sprintf("::%s", pkTypes[i])
}
placeholderPos++
}
s += ")"
return s
}

// ((i = 10 AND j = 100 AND k > 1000)) OR (i = 10 AND j > 10) OR (i > 0))
var filterClause string
if len(lastRows) > 0 {
filterClause += " AND ("
for i := 0; i < len(lastRows); i++ {
if i > 0 {
filterClause += " OR "
}
filterClause += "("
until := len(lastRows) - i
for j := 0; j < until; j++ {
if j > 0 {
filterClause += " AND "
var filterClause string
preparedRows := lastRows
if len(lastRows) > 0 {
filterClause += " AND " + generatePKComparison(">", len(pks), false)
}
sign := "="
if j == until-1 {
sign = ">"
// range clause
// TODO(XXX): multi keys
if len(lastRows) == 0 && len(rangeTarget.startKey) > 0 {
filterClause += " AND " + generatePKComparison(">=", len(rangeTarget.startKey), true)
for _, k := range rangeTarget.startKey {
preparedRows = append(preparedRows, strings.Trim(k, `"`))
}
}
filterClause += fmt.Sprintf("%s %s $%d", pks[j], sign, j+2)
if len(rangeTarget.endKey) > 0 {
filterClause += " AND " + generatePKComparison("<=", len(rangeTarget.endKey), true)
for _, k := range rangeTarget.endKey {
// Total hack but whatever.
preparedRows = append(preparedRows, strings.Trim(k, `"`))
}
}

q := fmt.Sprintf(
`SELECT %[1]s FROM [%[2]d AS tbl_name] WHERE %[3]s < $1%[5]s ORDER BY %[6]s LIMIT %[4]d`,
pkStr,
details.TableID,
cn.String(),
batchSize,
filterClause,
pkStr,
)
args := append(
[]interface{}{untilTS},
preparedRows...,
)
//fmt.Printf("initial query:%s\nargs: %#v\nrange: %s\n----\n", q, args, rangeTarget)
selectStartTime := timeutil.Now()
rows, err = ie.QueryBuffered(
ctx,
"ttl",
txn,
q,
args...,
)
metrics.DeletionSelectNanos.RecordValue(timeutil.Now().Sub(selectStartTime).Nanoseconds())
return err
}); err != nil {
return err
}
filterClause += ")"
}
filterClause += ")"
}
q := fmt.Sprintf(
`SELECT %[1]s FROM [%[2]d AS t] WHERE %[3]s < $1%[5]s ORDER BY %[6]s LIMIT %[4]d`,
pkStr,
details.TableID,
cn.String(),
batchSize,
filterClause,
pkStr,
)
args := append(
[]interface{}{untilTS},
lastRows...,
)
//fmt.Printf("initial query:%s\nargs: %#v\n", q, args)
selectStartTime := timeutil.Now()
rows, err = ie.QueryBuffered(
ctx,
"ttl",
txn,
q,
args...,
)
metrics.DeletionSelectNanos.RecordValue(timeutil.Now().Sub(selectStartTime).Nanoseconds())
return err
}); err != nil {
return err
}

// If we have no rows found, we're done.
if len(rows) == 0 {
break
}
// If we have no rows found, we're done.
if len(rows) == 0 {
break
}

lastRowIdx := len(rows) - 1
lastRows = make([]interface{}, len(pks))
for i := 0; i < len(pks); i++ {
lastRows[i] = rows[lastRowIdx][i]
}
lastRowIdx := len(rows) - 1
lastRows = make([]interface{}, len(pks))
for i := 0; i < len(pks); i++ {
lastRows[i] = rows[lastRowIdx][i]
}

// TODO(XXX): account for schema changes.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
placeholderVals := make([]interface{}, len(pks)*len(rows))
placeholderStr := ""
for i, row := range rows {
if i > 0 {
placeholderStr += ", "
}
placeholderStr += "("
for j := 0; j < len(pks); j++ {
if j > 0 {
placeholderStr += ", "
// TODO(XXX): account for schema changes.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
placeholderVals := make([]interface{}, len(pks)*len(rows))
placeholderStr := ""
for i, row := range rows {
if i > 0 {
placeholderStr += ", "
}
placeholderStr += "("
for j := 0; j < len(pks); j++ {
if j > 0 {
placeholderStr += ", "
}
placeholderStr += fmt.Sprintf("$%d", 1+i*len(pks)+j)
placeholderVals[i*len(pks)+j] = row[j]
}
placeholderStr += ")"
}
// TODO(XXX): we should probably do a secondary check here if we decide against strict TTL.
q := fmt.Sprintf(`DELETE FROM [%d AS tbl_name] WHERE (%s) IN (%s)`, details.TableID, pkStr, placeholderStr)
// fmt.Printf("%s\n", q)
deletionStartTime := timeutil.Now()
if _, err := ie.Exec(
ctx,
"ttl_delete",
txn,
q,
placeholderVals...,
); err != nil {
return err
}
metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds())
metrics.RowDeletions.Inc(int64(len(rows)))
return nil
}); err != nil {
return err
}

if len(rows) < batchSize {
break
}
placeholderStr += fmt.Sprintf("$%d", 1+i*len(pks)+j)
placeholderVals[i*len(pks)+j] = row[j]
}
placeholderStr += ")"
}
// TODO(XXX): we should probably do a secondary check here if we decide against strict TTL.
q := fmt.Sprintf(`DELETE FROM [%d AS t] WHERE (%s) IN (%s)`, details.TableID, pkStr, placeholderStr)
// fmt.Printf("%s\n", q)
deletionStartTime := timeutil.Now()
if _, err := ie.Exec(
ctx,
"ttl_delete",
txn,
q,
placeholderVals...,
); err != nil {
return err
return nil
}(); err != nil {
errs = errors.CombineErrors(errs, err)
}
metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds())
metrics.RowDeletions.Inc(int64(len(rows)))
return nil
}); err != nil {
return err
}

if len(rows) < batchSize {
break
}
}()
}
return nil
wg.Wait()
return errs
}

func (t ttlResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
Expand Down
Loading

0 comments on commit 3d4e8c9

Please sign in to comment.