From 3d4e8c98c38ef1ac4d023eb2460e94e9493c74f9 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Wed, 6 Oct 2021 10:06:41 +1100 Subject: [PATCH] more love Release note (): --- pkg/jobs/metrics.go | 8 + pkg/sql/ttl.go | 304 +++++++++++++++++++----------- pkg/workload/ttl/logger/logger.go | 7 +- 3 files changed, 206 insertions(+), 113 deletions(-) diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 624e2fa63a0e..eca384bac6ee 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -46,6 +46,7 @@ type TTLMetrics struct { DeletionSelectNanos *metric.Histogram DeletionDeleteNanos *metric.Histogram RowDeletions *metric.Counter + NumWorkers *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -96,6 +97,13 @@ func makeTTLMetrics(histogramWindowInterval time.Duration) TTLMetrics { MetricType: io_prometheus_client.MetricType_COUNTER, }, ), + NumWorkers: metric.NewGauge( + metric.Metadata{ + Name: "jobs.ttl.num_workers", + Measurement: "num_workers", + Unit: metric.Unit_COUNT, + }, + ), } } diff --git a/pkg/sql/ttl.go b/pkg/sql/ttl.go index bbd0ca48838c..9140f1fd7e14 100644 --- a/pkg/sql/ttl.go +++ b/pkg/sql/ttl.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -45,14 +47,19 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { db := p.ExecCfg().DB details := t.job.Details().(jobspb.TTLDetails) cn := tree.Name(details.ColumnName) - var lastRows []interface{} var pks []string var pkStr string - const batchSize = 1000 + var pkTypes []string + const batchSize = 500 metrics := p.ExecCfg().JobRegistry.MetricsStruct().TTL // TODO(XXX): get dynamic table names. + type rangeTarget struct { + startKey []string + endKey []string + } + var rangeTargets []rangeTarget if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID(ctx, txn, details.TableID, tree.ObjectLookupFlagsWithRequired()) @@ -60,134 +67,211 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } pks = desc.GetPrimaryIndex().IndexDesc().KeyColumnNames + colMap := make(map[descpb.ColumnID]catalog.Column) + for _, col := range desc.AllColumns() { + colMap[col.GetID()] = col + } + for _, id := range desc.GetPrimaryIndex().IndexDesc().KeyColumnIDs { + pkTypes = append(pkTypes, colMap[id].GetType().SQLString()) + } pkStr = strings.Join(pks, ", ") + + rows, err := ie.QueryIterator( + ctx, + "ttl-range", + txn, + fmt.Sprintf(`select start_pretty, end_pretty from crdb_internal.ranges where table_id = %d`, details.TableID), + ) + if err != nil { + return err + } + for { + hasNext, err := rows.Next(ctx) + if err != nil { + return err + } + if !hasNext { + break + } + // TODO(XXX): "/" in key name. + processKey := func(s tree.Datum) []string { + start := tree.MustBeDString(s) + k := strings.Split(string(start), "/") + if len(k) > 4 { + return k[4:] + } + return nil + } + rangeTargets = append(rangeTargets, rangeTarget{ + startKey: processKey(rows.Cur()[0]), + endKey: processKey(rows.Cur()[1]), + }) + } return nil }); err != nil { return err } - untilTS := timeutil.Unix(details.UntilUnix, 0) - for { - var rows []tree.Datums - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - startTime := timeutil.Now() + var wg sync.WaitGroup + var errs error + wg.Add(len(rangeTargets)) + for _, rangeTarget := range rangeTargets { + rangeTarget := rangeTarget + go func() { + metrics.NumWorkers.Inc(1) defer func() { - metrics.DeletionTotalNanos.RecordValue(timeutil.Now().Sub(startTime).Nanoseconds()) + wg.Done() + metrics.NumWorkers.Dec(1) }() + var lastRows []interface{} + if err := func() error { + untilTS := timeutil.Unix(details.UntilUnix, 0) + for { + var rows []tree.Datums + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + startTime := timeutil.Now() + defer func() { + metrics.DeletionTotalNanos.RecordValue(timeutil.Now().Sub(startTime).Nanoseconds()) + }() + + // TODO(XXX): order by directions. + // TODO(XXX): prevent full table scans? + _, err := ie.Exec(ctx, "ttl-begin", txn, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") + if err != nil { + return err + } - // TODO(XXX): order by directions. - // TODO(XXX): prevent full table scans? - _, err := ie.Exec(ctx, "ttl-begin", txn, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") - if err != nil { - return err - } + placeholderPos := 2 + generatePKComparison := func(sign string, until int, includeType bool) string { + // (pk1, pk2, pk3) > ($2, $3, $4) + s := fmt.Sprintf("(%s) %s (", strings.Join(pks[:until], ", "), sign) + for i := range pks { + if i >= until { + break + } + if i > 0 { + s += ", " + } + s += fmt.Sprintf("$%d", placeholderPos) + if includeType { + s += fmt.Sprintf("::%s", pkTypes[i]) + } + placeholderPos++ + } + s += ")" + return s + } - // ((i = 10 AND j = 100 AND k > 1000)) OR (i = 10 AND j > 10) OR (i > 0)) - var filterClause string - if len(lastRows) > 0 { - filterClause += " AND (" - for i := 0; i < len(lastRows); i++ { - if i > 0 { - filterClause += " OR " - } - filterClause += "(" - until := len(lastRows) - i - for j := 0; j < until; j++ { - if j > 0 { - filterClause += " AND " + var filterClause string + preparedRows := lastRows + if len(lastRows) > 0 { + filterClause += " AND " + generatePKComparison(">", len(pks), false) } - sign := "=" - if j == until-1 { - sign = ">" + // range clause + // TODO(XXX): multi keys + if len(lastRows) == 0 && len(rangeTarget.startKey) > 0 { + filterClause += " AND " + generatePKComparison(">=", len(rangeTarget.startKey), true) + for _, k := range rangeTarget.startKey { + preparedRows = append(preparedRows, strings.Trim(k, `"`)) + } } - filterClause += fmt.Sprintf("%s %s $%d", pks[j], sign, j+2) + if len(rangeTarget.endKey) > 0 { + filterClause += " AND " + generatePKComparison("<=", len(rangeTarget.endKey), true) + for _, k := range rangeTarget.endKey { + // Total hack but whatever. + preparedRows = append(preparedRows, strings.Trim(k, `"`)) + } + } + + q := fmt.Sprintf( + `SELECT %[1]s FROM [%[2]d AS tbl_name] WHERE %[3]s < $1%[5]s ORDER BY %[6]s LIMIT %[4]d`, + pkStr, + details.TableID, + cn.String(), + batchSize, + filterClause, + pkStr, + ) + args := append( + []interface{}{untilTS}, + preparedRows..., + ) + //fmt.Printf("initial query:%s\nargs: %#v\nrange: %s\n----\n", q, args, rangeTarget) + selectStartTime := timeutil.Now() + rows, err = ie.QueryBuffered( + ctx, + "ttl", + txn, + q, + args..., + ) + metrics.DeletionSelectNanos.RecordValue(timeutil.Now().Sub(selectStartTime).Nanoseconds()) + return err + }); err != nil { + return err } - filterClause += ")" - } - filterClause += ")" - } - q := fmt.Sprintf( - `SELECT %[1]s FROM [%[2]d AS t] WHERE %[3]s < $1%[5]s ORDER BY %[6]s LIMIT %[4]d`, - pkStr, - details.TableID, - cn.String(), - batchSize, - filterClause, - pkStr, - ) - args := append( - []interface{}{untilTS}, - lastRows..., - ) - //fmt.Printf("initial query:%s\nargs: %#v\n", q, args) - selectStartTime := timeutil.Now() - rows, err = ie.QueryBuffered( - ctx, - "ttl", - txn, - q, - args..., - ) - metrics.DeletionSelectNanos.RecordValue(timeutil.Now().Sub(selectStartTime).Nanoseconds()) - return err - }); err != nil { - return err - } - // If we have no rows found, we're done. - if len(rows) == 0 { - break - } + // If we have no rows found, we're done. + if len(rows) == 0 { + break + } - lastRowIdx := len(rows) - 1 - lastRows = make([]interface{}, len(pks)) - for i := 0; i < len(pks); i++ { - lastRows[i] = rows[lastRowIdx][i] - } + lastRowIdx := len(rows) - 1 + lastRows = make([]interface{}, len(pks)) + for i := 0; i < len(pks); i++ { + lastRows[i] = rows[lastRowIdx][i] + } - // TODO(XXX): account for schema changes. - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - placeholderVals := make([]interface{}, len(pks)*len(rows)) - placeholderStr := "" - for i, row := range rows { - if i > 0 { - placeholderStr += ", " - } - placeholderStr += "(" - for j := 0; j < len(pks); j++ { - if j > 0 { - placeholderStr += ", " + // TODO(XXX): account for schema changes. + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + placeholderVals := make([]interface{}, len(pks)*len(rows)) + placeholderStr := "" + for i, row := range rows { + if i > 0 { + placeholderStr += ", " + } + placeholderStr += "(" + for j := 0; j < len(pks); j++ { + if j > 0 { + placeholderStr += ", " + } + placeholderStr += fmt.Sprintf("$%d", 1+i*len(pks)+j) + placeholderVals[i*len(pks)+j] = row[j] + } + placeholderStr += ")" + } + // TODO(XXX): we should probably do a secondary check here if we decide against strict TTL. + q := fmt.Sprintf(`DELETE FROM [%d AS tbl_name] WHERE (%s) IN (%s)`, details.TableID, pkStr, placeholderStr) + // fmt.Printf("%s\n", q) + deletionStartTime := timeutil.Now() + if _, err := ie.Exec( + ctx, + "ttl_delete", + txn, + q, + placeholderVals..., + ); err != nil { + return err + } + metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds()) + metrics.RowDeletions.Inc(int64(len(rows))) + return nil + }); err != nil { + return err + } + + if len(rows) < batchSize { + break } - placeholderStr += fmt.Sprintf("$%d", 1+i*len(pks)+j) - placeholderVals[i*len(pks)+j] = row[j] } - placeholderStr += ")" - } - // TODO(XXX): we should probably do a secondary check here if we decide against strict TTL. - q := fmt.Sprintf(`DELETE FROM [%d AS t] WHERE (%s) IN (%s)`, details.TableID, pkStr, placeholderStr) - // fmt.Printf("%s\n", q) - deletionStartTime := timeutil.Now() - if _, err := ie.Exec( - ctx, - "ttl_delete", - txn, - q, - placeholderVals..., - ); err != nil { - return err + return nil + }(); err != nil { + errs = errors.CombineErrors(errs, err) } - metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds()) - metrics.RowDeletions.Inc(int64(len(rows))) - return nil - }); err != nil { - return err - } - - if len(rows) < batchSize { - break - } + }() } - return nil + wg.Wait() + return errs } func (t ttlResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { diff --git a/pkg/workload/ttl/logger/logger.go b/pkg/workload/ttl/logger/logger.go index 9762cc135063..632c189aa28d 100644 --- a/pkg/workload/ttl/logger/logger.go +++ b/pkg/workload/ttl/logger/logger.go @@ -161,12 +161,12 @@ func (l *logger) Ops( l.setupMetrics(reg.Registerer()) ql.WorkerFns[0] = func(ctx context.Context) error { var numRows int64 - if err := db.QueryRow("SELECT count(1) FROM logs AS OF SYSTEM TIME follower_read_timestamp()").Scan(&numRows); err != nil { + if err := db.QueryRow("SELECT count(1) FROM logs AS OF SYSTEM TIME '-30s'").Scan(&numRows); err != nil { return err } l.prometheus.numRows.Set(float64(numRows)) var numExpiredRows int64 - if err := db.QueryRow("SELECT count(1) FROM logs AS OF SYSTEM TIME follower_read_timestamp() WHERE now() > crdb_internal_ttl_expiration").Scan(&numExpiredRows); err != nil { + if err := db.QueryRow("SELECT count(1) FROM logs AS OF SYSTEM TIME '-30s' WHERE now() > crdb_internal_ttl_expiration").Scan(&numExpiredRows); err != nil { return err } l.prometheus.numExpiredRows.Set(float64(numExpiredRows)) @@ -185,7 +185,8 @@ func (l logger) Tables() []workload.Table { Name: "logs", Schema: fmt.Sprintf(`( ts TIMESTAMPTZ NOT NULL DEFAULT current_timestamp(), - id UUID NOT NULL DEFAULT gen_random_uuid(), + -- total hack to get around pretty print but whatever + id TEXT NOT NULL DEFAULT gen_random_uuid()::string, message TEXT NOT NULL, PRIMARY KEY (ts, id) ) TTL '%d seconds'`, int(l.ttl.Seconds())),