From 5221b4bae2f627d2361c43d62f8113e960360405 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 15:45:57 +0800 Subject: [PATCH 01/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/domain.go | 34 ++++++++++++++++++++++++++++++---- session/bootstrap.go | 4 ++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index d00fb08fd879f..49ad729398d86 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1252,7 +1252,10 @@ func (do *Domain) runawayRecordFlushLoop() { defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) // this times is used to batch flushing rocords, with 1s duration, // we can guarantee a watch record can be seen by the user within 1s. - timer := time.NewTimer(time.Second) + runawayRecordFluashTimer := time.NewTimer(time.Second) + runawayRecordGCTimer := time.NewTimer(time.Hour * 12) + quarantineRecordGCTimer := time.NewTimer(time.Minute * 30) + fired := false recordCh := do.RunawayManager().RunawayRecordChan() quarantineRecordCh := do.RunawayManager().QuarantineRecordChan() @@ -1276,16 +1279,30 @@ func (do *Domain) runawayRecordFlushLoop() { } sql, params := genQuarantineQueriesStmt(quarantineRecords) if err := do.execFlushSQL(sql, params); err != nil { - logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(records))) + logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords))) } quarantineRecords = quarantineRecords[:0] } + gcQuarantineRecords := func() { + sql := "DELETE FROM mysql.tidb_runaway_quarantined_watch WHERE end_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)" + params := getFormatCurrentTimeParam() + if err := do.execFlushSQL(sql, params); err != nil { + logutil.BgLogger().Info("delete quarantine records failed", zap.Error(err)) + } + } + gcRunawayRecords := func() { + sql := "DELETE FROM mysql.tidb_runaway_queries WHERE time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)" + params := getFormatCurrentTimeParam() + if err := do.execFlushSQL(sql, params); err != nil { + logutil.BgLogger().Info("delete runaway records failed", zap.Error(err)) + } + } for { select { case <-do.exit: return - case <-timer.C: + case <-runawayRecordFluashTimer.C: flushRunawayRecords() fired = true case r := <-quarantineRecordCh: @@ -1294,6 +1311,7 @@ func (do *Domain) runawayRecordFlushLoop() { // flush as soon as possible. if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold { flushQuarantineRecords() + gcQuarantineRecords() } case r := <-recordCh: records = append(records, r) @@ -1302,8 +1320,12 @@ func (do *Domain) runawayRecordFlushLoop() { } else if fired { fired = false // meet a new record, reset the timer. - timer.Reset(time.Second) + runawayRecordFluashTimer.Reset(time.Second) } + case <-runawayRecordGCTimer.C: + gcRunawayRecords() + case <-quarantineRecordGCTimer.C: + gcQuarantineRecords() } } } @@ -1324,6 +1346,10 @@ func (do *Domain) execFlushSQL(sql string, params []interface{}) error { return err } +func getFormatCurrentTimeParam() []interface{} { + return []interface{}{time.Now().UTC().Format(types.TimeFormat)} +} + func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) { var builder strings.Builder params := make([]interface{}, 0, len(records)*7) diff --git a/session/bootstrap.go b/session/bootstrap.go index 54bef3862a061..5dba7c2fc7231 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -613,7 +613,7 @@ const ( original_sql TEXT NOT NULL, plan_digest TEXT NOT NULL, tidb_server varchar(64), - INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when add global binding query", + INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query", INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` @@ -625,7 +625,7 @@ const ( watch varchar(12) NOT NULL, watch_text TEXT NOT NULL, tidb_server varchar(64), - INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when add global binding query", + INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when select quarantined query", INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` From 113625499cb2951c9545967dd772d94fa58b1608 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 21:34:01 +0800 Subject: [PATCH 02/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/domain.go | 135 ++++++++++++++++++++++++++++++++++--------- session/bootstrap.go | 4 +- 2 files changed, 109 insertions(+), 30 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 49ad729398d86..c6b7fc616bcb1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -66,9 +66,12 @@ import ( "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/sqlbuilder" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" disttaskutil "github.com/pingcap/tidb/util/disttask" "github.com/pingcap/tidb/util/domainutil" @@ -1248,13 +1251,96 @@ func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } +var systemSchemaCIStr = model.NewCIStr("mysql") + +func (do *Domain) gcSystemTable(tableName string, expiredDuration time.Duration) { + expiredTime := time.Now().Add(-expiredDuration) + tbCIStr := model.NewCIStr(tableName) + tbInfo, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr) + if err != nil { + logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + tb, err := cache.NewPhysicalTable(systemSchemaCIStr, tbInfo.Meta(), model.NewCIStr("")) + if err != nil { + logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil) + if err != nil { + logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + var leftRows [][]types.Datum + for { + sql := "" + limit := int(variable.TTLScanBatchSize.Load()) + if sql, err = generator.NextSQL(leftRows, limit); err != nil { + logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + // to remove + logutil.BgLogger().Info("gcRunawayRecords", zap.String("next sql", sql)) + if sql == "" { + return + } + + rows, sqlErr := do.execRestrictedSQL(sql, nil) + if sqlErr != nil { + logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + leftRows = make([][]types.Datum, len(rows)) + for i, row := range rows { + leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes) + } + if len(rows) == 0 { + return + } + + for len(leftRows) > 0 { + maxBatch := 100 + var delBatch [][]types.Datum + if len(leftRows) < maxBatch { + delBatch = leftRows + leftRows = nil + } else { + delBatch = leftRows[0:maxBatch] + leftRows = leftRows[maxBatch:] + } + + sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime) + if err != nil { + logutil.BgLogger().Warn( + "build delete SQL failed when deleting system table", + zap.Error(err), + zap.String("table", tb.Schema.O+"."+tb.Name.O), + ) + return + } + + _, err = do.execRestrictedSQL(sql, nil) + if err != nil { + logutil.BgLogger().Warn( + "delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql), + ) + } + } + } +} + func (do *Domain) runawayRecordFlushLoop() { defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) + + quarantineRecordGCInterval := time.Second * 30 + runawayRecordGCInterval := time.Hour * 24 + runawayRecordExpiredDuration := time.Hour * 24 * 7 + // this times is used to batch flushing rocords, with 1s duration, // we can guarantee a watch record can be seen by the user within 1s. runawayRecordFluashTimer := time.NewTimer(time.Second) - runawayRecordGCTimer := time.NewTimer(time.Hour * 12) - quarantineRecordGCTimer := time.NewTimer(time.Minute * 30) + runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval) + quarantineRecordGCTicker := time.NewTicker(quarantineRecordGCInterval) fired := false recordCh := do.RunawayManager().RunawayRecordChan() @@ -1268,7 +1354,7 @@ func (do *Domain) runawayRecordFlushLoop() { return } sql, params := genRunawayQueriesStmt(records) - if err := do.execFlushSQL(sql, params); err != nil { + if _, err := do.execRestrictedSQL(sql, params); err != nil { logutil.BgLogger().Info("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) } records = records[:0] @@ -1278,26 +1364,12 @@ func (do *Domain) runawayRecordFlushLoop() { return } sql, params := genQuarantineQueriesStmt(quarantineRecords) - if err := do.execFlushSQL(sql, params); err != nil { + if _, err := do.execRestrictedSQL(sql, params); err != nil { logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords))) } quarantineRecords = quarantineRecords[:0] } - gcQuarantineRecords := func() { - sql := "DELETE FROM mysql.tidb_runaway_quarantined_watch WHERE end_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)" - params := getFormatCurrentTimeParam() - if err := do.execFlushSQL(sql, params); err != nil { - logutil.BgLogger().Info("delete quarantine records failed", zap.Error(err)) - } - } - gcRunawayRecords := func() { - sql := "DELETE FROM mysql.tidb_runaway_queries WHERE time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)" - params := getFormatCurrentTimeParam() - if err := do.execFlushSQL(sql, params); err != nil { - logutil.BgLogger().Info("delete runaway records failed", zap.Error(err)) - } - } - + owner := do.DDL().OwnerManager() for { select { case <-do.exit: @@ -1311,7 +1383,10 @@ func (do *Domain) runawayRecordFlushLoop() { // flush as soon as possible. if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold { flushQuarantineRecords() - gcQuarantineRecords() + if owner.IsOwner() { + do.gcSystemTable("tidb_runaway_quarantined_watch", time.Duration(0)) + quarantineRecordGCTicker.Reset(quarantineRecordGCInterval) + } } case r := <-recordCh: records = append(records, r) @@ -1322,28 +1397,32 @@ func (do *Domain) runawayRecordFlushLoop() { // meet a new record, reset the timer. runawayRecordFluashTimer.Reset(time.Second) } - case <-runawayRecordGCTimer.C: - gcRunawayRecords() - case <-quarantineRecordGCTimer.C: - gcQuarantineRecords() + case <-runawayRecordGCTicker.C: + if owner.IsOwner() { + do.gcSystemTable("tidb_runaway_queries", runawayRecordExpiredDuration) + } + case <-quarantineRecordGCTicker.C: + if owner.IsOwner() { + do.gcSystemTable("tidb_runaway_quarantined_watch", time.Duration(0)) + } } } } -func (do *Domain) execFlushSQL(sql string, params []interface{}) error { +func (do *Domain) execRestrictedSQL(sql string, params []interface{}) ([]chunk.Row, error) { se, err := do.sysSessionPool.Get() defer func() { do.sysSessionPool.Put(se) }() if err != nil { - return errors.Annotate(err, "get session failed") + return nil, errors.Annotate(err, "get session failed") } exec := se.(sqlexec.RestrictedSQLExecutor) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - _, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, + r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, params..., ) - return err + return r, err } func getFormatCurrentTimeParam() []interface{} { diff --git a/session/bootstrap.go b/session/bootstrap.go index 5dba7c2fc7231..49c044bd7fef9 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -615,7 +615,7 @@ const ( tidb_server varchar(64), INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query", INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch" - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` + ) TTL = ` + "time" + ` + INTERVAL 7 DAY TTL_ENABLE = 'OFF' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` // CreateRunawayQuarantineWatchTable stores the condition which is used to check whether query should be quarantined. CreateRunawayQuarantineWatchTable = `CREATE TABLE IF NOT EXISTS mysql.tidb_runaway_quarantined_watch ( @@ -627,7 +627,7 @@ const ( tidb_server varchar(64), INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when select quarantined query", INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch" - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` + ) TTL = ` + "end_time" + ` + INTERVAL 10 YEAR TTL_ENABLE = 'OFF' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` // CreateImportJobs is a table that IMPORT INTO uses. CreateImportJobs = `CREATE TABLE IF NOT EXISTS mysql.tidb_import_jobs ( From 92e0e25afe341c4aa2b065448a0c5e9ff871ae63 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 21:44:25 +0800 Subject: [PATCH 03/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/BUILD.bazel | 2 ++ 1 file changed, 2 insertions(+) diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 466e68b794222..306623c339690 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -54,6 +54,8 @@ go_library( "//statistics/handle", "//store/helper", "//telemetry", + "//ttl/cache", + "//ttl/sqlbuilder", "//ttl/ttlworker", "//types", "//util", From 971040a13c14c0b294a2e567508e7788f2f0b6ea Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 22:11:01 +0800 Subject: [PATCH 04/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/domain.go | 19 ++++++++++--------- session/bootstrap.go | 4 ++-- ttl/cache/table.go | 27 ++++++++++++++++++--------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index c6b7fc616bcb1..756f1997f880f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1253,15 +1253,16 @@ func (do *Domain) SetOnClose(onClose func()) { var systemSchemaCIStr = model.NewCIStr("mysql") -func (do *Domain) gcSystemTable(tableName string, expiredDuration time.Duration) { +func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time.Duration) { expiredTime := time.Now().Add(-expiredDuration) tbCIStr := model.NewCIStr(tableName) - tbInfo, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr) + tbl, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr) if err != nil { logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } - tb, err := cache.NewPhysicalTable(systemSchemaCIStr, tbInfo.Meta(), model.NewCIStr("")) + tbInfo := tbl.Meta() + tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), tbInfo.FindPublicColumnByName(colName)) if err != nil { logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) return @@ -1332,9 +1333,9 @@ func (do *Domain) gcSystemTable(tableName string, expiredDuration time.Duration) func (do *Domain) runawayRecordFlushLoop() { defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) - quarantineRecordGCInterval := time.Second * 30 - runawayRecordGCInterval := time.Hour * 24 - runawayRecordExpiredDuration := time.Hour * 24 * 7 + quarantineRecordGCInterval := time.Second * 15 //time.Minute * 15 + runawayRecordGCInterval := time.Second * 15 //time.Hour * 24 + runawayRecordExpiredDuration := time.Duration(0) //time.Hour * 24 * 7 // this times is used to batch flushing rocords, with 1s duration, // we can guarantee a watch record can be seen by the user within 1s. @@ -1384,7 +1385,7 @@ func (do *Domain) runawayRecordFlushLoop() { if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold { flushQuarantineRecords() if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_quarantined_watch", time.Duration(0)) + do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) quarantineRecordGCTicker.Reset(quarantineRecordGCInterval) } } @@ -1399,11 +1400,11 @@ func (do *Domain) runawayRecordFlushLoop() { } case <-runawayRecordGCTicker.C: if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_queries", runawayRecordExpiredDuration) + do.gcSystemTable("tidb_runaway_queries", "time", runawayRecordExpiredDuration) } case <-quarantineRecordGCTicker.C: if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_quarantined_watch", time.Duration(0)) + do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) } } } diff --git a/session/bootstrap.go b/session/bootstrap.go index 49c044bd7fef9..5dba7c2fc7231 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -615,7 +615,7 @@ const ( tidb_server varchar(64), INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query", INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch" - ) TTL = ` + "time" + ` + INTERVAL 7 DAY TTL_ENABLE = 'OFF' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` // CreateRunawayQuarantineWatchTable stores the condition which is used to check whether query should be quarantined. CreateRunawayQuarantineWatchTable = `CREATE TABLE IF NOT EXISTS mysql.tidb_runaway_quarantined_watch ( @@ -627,7 +627,7 @@ const ( tidb_server varchar(64), INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when select quarantined query", INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch" - ) TTL = ` + "end_time" + ` + INTERVAL 10 YEAR TTL_ENABLE = 'OFF' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` // CreateImportJobs is a table that IMPORT INTO uses. CreateImportJobs = `CREATE TABLE IF NOT EXISTS mysql.tidb_import_jobs ( diff --git a/ttl/cache/table.go b/ttl/cache/table.go index 0184726ea9ac1..75d0ebd034a57 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -107,20 +107,14 @@ type PhysicalTable struct { TimeColumn *model.ColumnInfo } -// NewPhysicalTable create a new PhysicalTable -func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr) (*PhysicalTable, error) { +// NewBasePhysicalTable create a new PhysicalTable +func NewBasePhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr, timeColumn *model.ColumnInfo) (*PhysicalTable, error) { if tbl.State != model.StatePublic { return nil, errors.Errorf("table '%s.%s' is not a public table", schema, tbl.Name) } - ttlInfo := tbl.TTLInfo - if ttlInfo == nil { - return nil, errors.Errorf("table '%s.%s' is not a ttl table", schema, tbl.Name) - } - - timeColumn := tbl.FindPublicColumnByName(ttlInfo.ColumnName.L) if timeColumn == nil { - return nil, errors.Errorf("time column '%s' is not public in ttl table '%s.%s'", ttlInfo.ColumnName, schema, tbl.Name) + return nil, errors.Errorf("time column '%s' is not public in table '%s.%s'", timeColumn.Name, schema, tbl.Name) } keyColumns, keyColumTypes, err := getTableKeyColumns(tbl) @@ -166,6 +160,21 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model. }, nil } +// NewPhysicalTable create a new PhysicalTable +func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr) (*PhysicalTable, error) { + ttlInfo := tbl.TTLInfo + if ttlInfo == nil { + return nil, errors.Errorf("table '%s.%s' is not a ttl table", schema, tbl.Name) + } + + timeColumn := tbl.FindPublicColumnByName(ttlInfo.ColumnName.L) + if timeColumn == nil { + return nil, errors.Errorf("time column '%s' is not public in ttl table '%s.%s'", ttlInfo.ColumnName, schema, tbl.Name) + } + + return NewBasePhysicalTable(schema, tbl, partition, timeColumn) +} + // ValidateKeyPrefix validates a key prefix func (t *PhysicalTable) ValidateKeyPrefix(key []types.Datum) error { if len(key) > len(t.KeyColumns) { From 7d791279e0e0c0aaeef56861616f1730fdad0146 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 22:15:17 +0800 Subject: [PATCH 05/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/domain.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 756f1997f880f..9e7ba128f7e05 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1426,10 +1426,6 @@ func (do *Domain) execRestrictedSQL(sql string, params []interface{}) ([]chunk.R return r, err } -func getFormatCurrentTimeParam() []interface{} { - return []interface{}{time.Now().UTC().Format(types.TimeFormat)} -} - func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) { var builder strings.Builder params := make([]interface{}, 0, len(records)*7) From ee61a30bce37233e9720957bfa7311942b1257ca Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 22:24:36 +0800 Subject: [PATCH 06/10] gc runaway record Signed-off-by: Cabinfever_B --- domain/domain.go | 7 ++++++- ttl/cache/table.go | 4 ---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 9e7ba128f7e05..ebc9a87d7e5c1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1262,7 +1262,12 @@ func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time. return } tbInfo := tbl.Meta() - tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), tbInfo.FindPublicColumnByName(colName)) + col := tbInfo.FindPublicColumnByName(colName) + if col == nil { + logutil.BgLogger().Info("time column is not public in table", zap.String("table", tableName), zap.String("column", colName)) + return + } + tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col) if err != nil { logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) return diff --git a/ttl/cache/table.go b/ttl/cache/table.go index 75d0ebd034a57..3fa1580e4f494 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -113,10 +113,6 @@ func NewBasePhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition mo return nil, errors.Errorf("table '%s.%s' is not a public table", schema, tbl.Name) } - if timeColumn == nil { - return nil, errors.Errorf("time column '%s' is not public in table '%s.%s'", timeColumn.Name, schema, tbl.Name) - } - keyColumns, keyColumTypes, err := getTableKeyColumns(tbl) if err != nil { return nil, err From c0a73f805f2444a50b024c0f5526e8a65a23bb80 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 19 Jun 2023 22:32:06 +0800 Subject: [PATCH 07/10] gc runaway record Signed-off-by: Cabinfever_B --- ttl/cache/table.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ttl/cache/table.go b/ttl/cache/table.go index 3fa1580e4f494..1a79065d2c495 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -107,8 +107,12 @@ type PhysicalTable struct { TimeColumn *model.ColumnInfo } -// NewBasePhysicalTable create a new PhysicalTable -func NewBasePhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr, timeColumn *model.ColumnInfo) (*PhysicalTable, error) { +// NewBasePhysicalTable create a new PhysicalTable with specific timeColunm. +func NewBasePhysicalTable(schema model.CIStr, + tbl *model.TableInfo, + partition model.CIStr, + timeColumn *model.ColumnInfo, +) (*PhysicalTable, error) { if tbl.State != model.StatePublic { return nil, errors.Errorf("table '%s.%s' is not a public table", schema, tbl.Name) } From 11e8c9515fe6a4a3de405961b17e796ed2cff047 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 20 Jun 2023 16:30:39 +0800 Subject: [PATCH 08/10] add test Signed-off-by: Cabinfever_B --- .../resourcegroup/resource_group_test.go | 33 +++++++- domain/domain.go | 81 ++++++++++--------- 2 files changed, 75 insertions(+), 39 deletions(-) diff --git a/ddl/tests/resourcegroup/resource_group_test.go b/ddl/tests/resourcegroup/resource_group_test.go index 8c70eb4e0d73d..9f96ad766201c 100644 --- a/ddl/tests/resourcegroup/resource_group_test.go +++ b/ddl/tests/resourcegroup/resource_group_test.go @@ -219,6 +219,10 @@ func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name stri } func TestResourceGroupRunaway(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/FastRunawayGC", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/FastRunawayGC")) + }() store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil)) @@ -229,8 +233,8 @@ func TestResourceGroupRunaway(t *testing.T) { tk.MustExec("set global tidb_enable_resource_control='on'") tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)") - tk.MustExec("create resource group rg2 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT duration '1m')") - tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED=50ms, ACTION=KILL, WATCH=EXACT[1m0s]")) + tk.MustExec("create resource group rg2 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT duration '1s')") + tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED=50ms, ACTION=KILL, WATCH=EXACT[1s]")) tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1")) tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) @@ -241,6 +245,15 @@ func TestResourceGroupRunaway(t *testing.T) { err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t") require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query") + // consifer the low speed of write in test, all check will exec after sleeping 1s. + delayDuration := time.Millisecond * 750 + time.Sleep(time.Millisecond*10 + delayDuration) + tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries"). + Check(testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify")) + // wait for GC, because of FROM_UNIXTIME is second level, so wait for 1s. + time.Sleep(time.Millisecond*1000 + delayDuration) + require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries").Rows(), 0) + tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)") tk.MustQuery("select /*+ resource_group(rg1) */ * from t") @@ -250,10 +263,26 @@ func TestResourceGroupRunaway(t *testing.T) { err = tk.QueryToErr("select /*+ resource_group(rg2) */ * from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") tk.MustGetErrCode("select /*+ resource_group(rg2) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine) + time.Sleep(time.Millisecond*10 + delayDuration) + tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries"). + Check(testkit.Rows( + "rg2 select /*+ resource_group(rg2) */ * from t identify", + "rg2 select /*+ resource_group(rg2) */ * from t watch", + )) + tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch"). + Check(testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t")) + + time.Sleep(time.Millisecond*1000 + delayDuration) + require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries").Rows(), 0) + // watch duration is 1s + time.Sleep(time.Millisecond*1000 + delayDuration) + require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_quarantined_watch").Rows(), 0) + tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=COOLDOWN)") tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=DRYRUN)") tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) + } func TestResourceGroupHint(t *testing.T) { diff --git a/domain/domain.go b/domain/domain.go index 9baa9bf308c7e..265a9dba358b5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1251,73 +1251,82 @@ func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } +const ( + runawayRecordFluashInterval = time.Second + quarantineRecordGCInterval = time.Minute * 10 + runawayRecordGCInterval = time.Hour * 24 + runawayRecordExpiredDuration = time.Hour * 24 * 7 + + runawayRecordGCBatchSize = 100 + runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5 +) + var systemSchemaCIStr = model.NewCIStr("mysql") func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time.Duration) { + if !do.DDL().OwnerManager().IsOwner() { + return + } + failpoint.Inject("FastRunawayGC", func() { + expiredDuration = time.Second * 1 + }) expiredTime := time.Now().Add(-expiredDuration) tbCIStr := model.NewCIStr(tableName) tbl, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr) if err != nil { - logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } tbInfo := tbl.Meta() col := tbInfo.FindPublicColumnByName(colName) if col == nil { - logutil.BgLogger().Info("time column is not public in table", zap.String("table", tableName), zap.String("column", colName)) + logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName)) return } tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col) if err != nil { - logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil) if err != nil { - logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } var leftRows [][]types.Datum for { sql := "" - limit := int(variable.TTLScanBatchSize.Load()) - if sql, err = generator.NextSQL(leftRows, limit); err != nil { - logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } // to remove - logutil.BgLogger().Info("gcRunawayRecords", zap.String("next sql", sql)) if sql == "" { return } rows, sqlErr := do.execRestrictedSQL(sql, nil) if sqlErr != nil { - logutil.BgLogger().Info("delete system table failed", zap.String("table", tableName), zap.Error(err)) + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) return } leftRows = make([][]types.Datum, len(rows)) for i, row := range rows { leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes) } - if len(rows) == 0 { - return - } for len(leftRows) > 0 { - maxBatch := 100 var delBatch [][]types.Datum - if len(leftRows) < maxBatch { + if len(leftRows) < runawayRecordGCBatchSize { delBatch = leftRows leftRows = nil } else { - delBatch = leftRows[0:maxBatch] - leftRows = leftRows[maxBatch:] + delBatch = leftRows[0:runawayRecordGCBatchSize] + leftRows = leftRows[runawayRecordGCBatchSize:] } - sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime) if err != nil { - logutil.BgLogger().Warn( + logutil.BgLogger().Error( "build delete SQL failed when deleting system table", zap.Error(err), zap.String("table", tb.Schema.O+"."+tb.Name.O), @@ -1338,15 +1347,19 @@ func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time. func (do *Domain) runawayRecordFlushLoop() { defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) - quarantineRecordGCInterval := time.Second * 15 //time.Minute * 15 - runawayRecordGCInterval := time.Second * 15 //time.Hour * 24 - runawayRecordExpiredDuration := time.Duration(0) //time.Hour * 24 * 7 - // this times is used to batch flushing rocords, with 1s duration, // we can guarantee a watch record can be seen by the user within 1s. - runawayRecordFluashTimer := time.NewTimer(time.Second) + runawayRecordFluashTimer := time.NewTimer(runawayRecordFluashInterval) runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval) quarantineRecordGCTicker := time.NewTicker(quarantineRecordGCInterval) + failpoint.Inject("FastRunawayGC", func() { + runawayRecordFluashTimer.Stop() + runawayRecordGCTicker.Stop() + quarantineRecordGCTicker.Stop() + runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50) + runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200) + quarantineRecordGCTicker = time.NewTicker(time.Millisecond * 200) + }) fired := false recordCh := do.RunawayManager().RunawayRecordChan() @@ -1361,7 +1374,7 @@ func (do *Domain) runawayRecordFlushLoop() { } sql, params := genRunawayQueriesStmt(records) if _, err := do.execRestrictedSQL(sql, params); err != nil { - logutil.BgLogger().Info("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) + logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) } records = records[:0] } @@ -1371,11 +1384,10 @@ func (do *Domain) runawayRecordFlushLoop() { } sql, params := genQuarantineQueriesStmt(quarantineRecords) if _, err := do.execRestrictedSQL(sql, params); err != nil { - logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords))) + logutil.BgLogger().Error("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords))) } quarantineRecords = quarantineRecords[:0] } - owner := do.DDL().OwnerManager() for { select { case <-do.exit: @@ -1389,28 +1401,23 @@ func (do *Domain) runawayRecordFlushLoop() { // flush as soon as possible. if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold { flushQuarantineRecords() - if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) - quarantineRecordGCTicker.Reset(quarantineRecordGCInterval) - } } case r := <-recordCh: records = append(records, r) + failpoint.Inject("FastRunawayGC", func() { + flushRunawayRecords() + }) if len(records) >= flushThrehold { flushRunawayRecords() } else if fired { fired = false // meet a new record, reset the timer. - runawayRecordFluashTimer.Reset(time.Second) + runawayRecordFluashTimer.Reset(runawayRecordFluashInterval) } case <-runawayRecordGCTicker.C: - if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_queries", "time", runawayRecordExpiredDuration) - } + go do.gcSystemTable("tidb_runaway_queries", "time", runawayRecordExpiredDuration) case <-quarantineRecordGCTicker.C: - if owner.IsOwner() { - do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) - } + go do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) } } } From c7effa207dfaa84cdbaa8d6513f6e35c3d10a5f5 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 20 Jun 2023 16:35:03 +0800 Subject: [PATCH 09/10] add test Signed-off-by: Cabinfever_B --- ddl/tests/resourcegroup/resource_group_test.go | 1 - domain/domain.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ddl/tests/resourcegroup/resource_group_test.go b/ddl/tests/resourcegroup/resource_group_test.go index 9f96ad766201c..70aa349e66237 100644 --- a/ddl/tests/resourcegroup/resource_group_test.go +++ b/ddl/tests/resourcegroup/resource_group_test.go @@ -282,7 +282,6 @@ func TestResourceGroupRunaway(t *testing.T) { tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=DRYRUN)") tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) - } func TestResourceGroupHint(t *testing.T) { diff --git a/domain/domain.go b/domain/domain.go index 265a9dba358b5..fc3036b98b52d 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1336,7 +1336,7 @@ func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time. _, err = do.execRestrictedSQL(sql, nil) if err != nil { - logutil.BgLogger().Warn( + logutil.BgLogger().Error( "delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql), ) } From 644ff7fb1f43cc6abd15ccd1352cf12586ac11d2 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 20 Jun 2023 18:01:48 +0800 Subject: [PATCH 10/10] address comment Signed-off-by: Cabinfever_B --- domain/domain.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index fc3036b98b52d..d9c7caf4ae1ca 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1263,7 +1263,7 @@ const ( var systemSchemaCIStr = model.NewCIStr("mysql") -func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time.Duration) { +func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration time.Duration) { if !do.DDL().OwnerManager().IsOwner() { return } @@ -1301,7 +1301,7 @@ func (do *Domain) gcSystemTable(tableName, colName string, expiredDuration time. return } // to remove - if sql == "" { + if len(sql) == 0 { return } @@ -1415,9 +1415,9 @@ func (do *Domain) runawayRecordFlushLoop() { runawayRecordFluashTimer.Reset(runawayRecordFluashInterval) } case <-runawayRecordGCTicker.C: - go do.gcSystemTable("tidb_runaway_queries", "time", runawayRecordExpiredDuration) + go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration) case <-quarantineRecordGCTicker.C: - go do.gcSystemTable("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) + go do.deleteExpiredRows("tidb_runaway_quarantined_watch", "end_time", time.Duration(0)) } } }