Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: support GC runaway record #44784

Merged
merged 11 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name stri
}

func TestResourceGroupRunaway(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/FastRunawayGC"))
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))
Expand All @@ -229,8 +233,8 @@ func TestResourceGroupRunaway(t *testing.T) {

tk.MustExec("set global tidb_enable_resource_control='on'")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)")
tk.MustExec("create resource group rg2 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT duration '1m')")
tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED=50ms, ACTION=KILL, WATCH=EXACT[1m0s]"))
tk.MustExec("create resource group rg2 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT duration '1s')")
tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED=50ms, ACTION=KILL, WATCH=EXACT[1s]"))
tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1"))
tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1"))

Expand All @@ -241,6 +245,15 @@ func TestResourceGroupRunaway(t *testing.T) {
err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")

// consifer the low speed of write in test, all check will exec after sleeping 1s.
delayDuration := time.Millisecond * 750
time.Sleep(time.Millisecond*10 + delayDuration)
tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries").
Check(testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify"))
// wait for GC, because of FROM_UNIXTIME is second level, so wait for 1s.
time.Sleep(time.Millisecond*1000 + delayDuration)
require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries").Rows(), 0)

tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)")
tk.MustQuery("select /*+ resource_group(rg1) */ * from t")

Expand All @@ -250,6 +263,21 @@ func TestResourceGroupRunaway(t *testing.T) {
err = tk.QueryToErr("select /*+ resource_group(rg2) */ * from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
tk.MustGetErrCode("select /*+ resource_group(rg2) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
time.Sleep(time.Millisecond*10 + delayDuration)
tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries").
Check(testkit.Rows(
"rg2 select /*+ resource_group(rg2) */ * from t identify",
"rg2 select /*+ resource_group(rg2) */ * from t watch",
))
tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch").
Check(testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t"))

time.Sleep(time.Millisecond*1000 + delayDuration)
require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries").Rows(), 0)
// watch duration is 1s
time.Sleep(time.Millisecond*1000 + delayDuration)
require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_quarantined_watch").Rows(), 0)

tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=COOLDOWN)")
tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1"))
tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=DRYRUN)")
Expand Down
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ go_library(
"//statistics/handle",
"//store/helper",
"//telemetry",
"//ttl/cache",
"//ttl/sqlbuilder",
"//ttl/ttlworker",
"//types",
"//util",
Expand Down
138 changes: 126 additions & 12 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/sqlbuilder"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -1248,11 +1251,116 @@ func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it to resource group package?

runawayRecordFluashInterval = time.Second
quarantineRecordGCInterval = time.Minute * 10
runawayRecordGCInterval = time.Hour * 24
runawayRecordExpiredDuration = time.Hour * 24 * 7

runawayRecordGCBatchSize = 100
runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5
)

var systemSchemaCIStr = model.NewCIStr("mysql")

func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration time.Duration) {
if !do.DDL().OwnerManager().IsOwner() {
return
}
failpoint.Inject("FastRunawayGC", func() {
expiredDuration = time.Second * 1
})
expiredTime := time.Now().Add(-expiredDuration)
tbCIStr := model.NewCIStr(tableName)
tbl, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
tbInfo := tbl.Meta()
col := tbInfo.FindPublicColumnByName(colName)
if col == nil {
logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName))
return
}
tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
var leftRows [][]types.Datum
for {
sql := ""
if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
// to remove
if len(sql) == 0 {
return
}

rows, sqlErr := do.execRestrictedSQL(sql, nil)
if sqlErr != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
leftRows = make([][]types.Datum, len(rows))
for i, row := range rows {
leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes)
}

for len(leftRows) > 0 {
var delBatch [][]types.Datum
if len(leftRows) < runawayRecordGCBatchSize {
delBatch = leftRows
leftRows = nil
} else {
delBatch = leftRows[0:runawayRecordGCBatchSize]
leftRows = leftRows[runawayRecordGCBatchSize:]
}
sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime)
if err != nil {
logutil.BgLogger().Error(
"build delete SQL failed when deleting system table",
zap.Error(err),
zap.String("table", tb.Schema.O+"."+tb.Name.O),
)
return
}

_, err = do.execRestrictedSQL(sql, nil)
if err != nil {
logutil.BgLogger().Error(
"delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql),
)
}
}
}
}

func (do *Domain) runawayRecordFlushLoop() {
defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false)

// this times is used to batch flushing rocords, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
timer := time.NewTimer(time.Second)
runawayRecordFluashTimer := time.NewTimer(runawayRecordFluashInterval)
runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval)
quarantineRecordGCTicker := time.NewTicker(quarantineRecordGCInterval)
failpoint.Inject("FastRunawayGC", func() {
runawayRecordFluashTimer.Stop()
runawayRecordGCTicker.Stop()
quarantineRecordGCTicker.Stop()
runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200)
quarantineRecordGCTicker = time.NewTicker(time.Millisecond * 200)
})

fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
Expand All @@ -1265,8 +1373,8 @@ func (do *Domain) runawayRecordFlushLoop() {
return
}
sql, params := genRunawayQueriesStmt(records)
if err := do.execFlushSQL(sql, params); err != nil {
logutil.BgLogger().Info("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
}
records = records[:0]
}
Expand All @@ -1275,17 +1383,16 @@ func (do *Domain) runawayRecordFlushLoop() {
return
}
sql, params := genQuarantineQueriesStmt(quarantineRecords)
if err := do.execFlushSQL(sql, params); err != nil {
logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(records)))
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords)))
}
quarantineRecords = quarantineRecords[:0]
}

for {
select {
case <-do.exit:
return
case <-timer.C:
case <-runawayRecordFluashTimer.C:
flushRunawayRecords()
fired = true
case r := <-quarantineRecordCh:
Expand All @@ -1297,31 +1404,38 @@ func (do *Domain) runawayRecordFlushLoop() {
}
case r := <-recordCh:
records = append(records, r)
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThrehold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
timer.Reset(time.Second)
runawayRecordFluashTimer.Reset(runawayRecordFluashInterval)
}
case <-runawayRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration)
case <-quarantineRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_quarantined_watch", "end_time", time.Duration(0))
}
}
}

func (do *Domain) execFlushSQL(sql string, params []interface{}) error {
func (do *Domain) execRestrictedSQL(sql string, params []interface{}) ([]chunk.Row, error) {
se, err := do.sysSessionPool.Get()
defer func() {
do.sysSessionPool.Put(se)
}()
if err != nil {
return errors.Annotate(err, "get session failed")
return nil, errors.Annotate(err, "get session failed")
}
exec := se.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
sql, params...,
)
return err
return r, err
}

func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ const (
original_sql TEXT NOT NULL,
plan_digest TEXT NOT NULL,
tidb_server varchar(64),
INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when add global binding query",
INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query",
INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch"
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`

Expand All @@ -625,7 +625,7 @@ const (
watch varchar(12) NOT NULL,
watch_text TEXT NOT NULL,
tidb_server varchar(64),
INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when add global binding query",
INDEX sql_index(watch_text(700)) COMMENT "accelerate the speed when select quarantined query",
INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch"
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`

Expand Down
33 changes: 21 additions & 12 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,16 @@ type PhysicalTable struct {
TimeColumn *model.ColumnInfo
}

// NewPhysicalTable create a new PhysicalTable
func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr) (*PhysicalTable, error) {
// NewBasePhysicalTable create a new PhysicalTable with specific timeColunm.
func NewBasePhysicalTable(schema model.CIStr,
tbl *model.TableInfo,
partition model.CIStr,
timeColumn *model.ColumnInfo,
) (*PhysicalTable, error) {
if tbl.State != model.StatePublic {
return nil, errors.Errorf("table '%s.%s' is not a public table", schema, tbl.Name)
}

ttlInfo := tbl.TTLInfo
if ttlInfo == nil {
return nil, errors.Errorf("table '%s.%s' is not a ttl table", schema, tbl.Name)
}

timeColumn := tbl.FindPublicColumnByName(ttlInfo.ColumnName.L)
if timeColumn == nil {
return nil, errors.Errorf("time column '%s' is not public in ttl table '%s.%s'", ttlInfo.ColumnName, schema, tbl.Name)
}

keyColumns, keyColumTypes, err := getTableKeyColumns(tbl)
if err != nil {
return nil, err
Expand Down Expand Up @@ -166,6 +160,21 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
}, nil
}

// NewPhysicalTable create a new PhysicalTable
func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr) (*PhysicalTable, error) {
ttlInfo := tbl.TTLInfo
if ttlInfo == nil {
return nil, errors.Errorf("table '%s.%s' is not a ttl table", schema, tbl.Name)
}

timeColumn := tbl.FindPublicColumnByName(ttlInfo.ColumnName.L)
if timeColumn == nil {
return nil, errors.Errorf("time column '%s' is not public in ttl table '%s.%s'", ttlInfo.ColumnName, schema, tbl.Name)
}

return NewBasePhysicalTable(schema, tbl, partition, timeColumn)
}

// ValidateKeyPrefix validates a key prefix
func (t *PhysicalTable) ValidateKeyPrefix(key []types.Datum) error {
if len(key) > len(t.KeyColumns) {
Expand Down