Skip to content

Commit

Permalink
ddl: add ddl job error count limit, exceed the limit should cancel th…
Browse files Browse the repository at this point in the history
…e ddl job (#9295)
  • Loading branch information
crazycs520 authored Feb 27, 2019
1 parent 839772b commit 4449eb0
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 12 deletions.
23 changes: 23 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/admin"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -542,10 +545,30 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

job.Error = toTError(err)
job.ErrorCount++
// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
log.Errorf("[ddl-%s] load ddl global variable error: %v", w, err1)
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
log.Warnf("[ddl-%s] the job id %v error count exceed the limit: %v, cancelling it now", w, job.ID, variable.GetDDLErrorCountLimit())
job.State = model.JobStateCancelling
}
}
return
}

func loadDDLVars(w *worker) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return util.LoadDDLVars(ctx)
}

func toTError(err error) *terror.Error {
originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
Expand Down
11 changes: 11 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,14 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) {
tk.MustExec("insert into t_recover values (4),(5),(6)")
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
}

func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
gofail.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
_, err := tk.Exec("create table t (a int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// gofail: var mockExceedErrorLimit bool
// if mockExceedErrorLimit {
// return ver, errors.New("mock do job error")
// }

schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
Expand Down
26 changes: 21 additions & 5 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,30 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old
return errors.Trace(err)
}

const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" +
variable.TiDBDDLReorgWorkerCount + "', '" +
variable.TiDBDDLReorgBatchSize + "')"

// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables.
func LoadDDLReorgVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize})
}

// LoadDDLVars loads ddl variable from mysql.global_variables.
func LoadDDLVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit})
}

const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)"

// LoadGlobalVars loads global variable from mysql.global_variables.
func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error {
if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL)
nameList := ""
for i, name := range varNames {
if i > 0 {
nameList += ", "
}
nameList += fmt.Sprintf("'%s'", name)
}
sql := fmt.Sprintf(loadGlobalVarsSQL, nameList)
rows, _, err := sctx.ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down
27 changes: 27 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,33 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) {
res.Check(testkit.Rows("1000"))
}

func (s *testSuite3) TestSetDDLErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
err := ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(variable.DefTiDBDDLErrorCountLimit))

tk.MustExec("set @@global.tidb_ddl_error_count_limit = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '-1'"))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(0))
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_error_count_limit = %v", uint64(math.MaxInt64)+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '%d'", uint64(math.MaxInt64)+1)))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(math.MaxInt64))
_, err = tk.Exec("set @@global.tidb_ddl_error_count_limit = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set @@global.tidb_ddl_error_count_limit = 100")
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(100))
res := tk.MustQuery("select @@global.tidb_ddl_error_count_limit")
res.Check(testkit.Rows("100"))
}

// Test issue #9205, fix the precision problem for time type default values
// See https://github.com/pingcap/tidb/issues/9205 for details
func (s *testSuite3) TestIssue9205(c *C) {
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,7 @@ var builtinGlobalVariable = []string{
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
variable.TiDBDDLErrorCountLimit,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func SetLocalSystemVar(name string, val string) {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLErrorCountLimit:
SetDDLErrorCountLimit(tidbOptInt64(val, DefTiDBDDLErrorCountLimit))
}
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ const (
// tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers.
TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size"

// tidb_ddl_error_count_limit defines the count of ddl error limit.
TiDBDDLErrorCountLimit = "tidb_ddl_error_count_limit"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
Expand Down Expand Up @@ -276,6 +279,7 @@ const (
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBDDLReorgBatchSize = 1024
DefTiDBDDLErrorCountLimit = 512
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
Expand All @@ -289,6 +293,7 @@ var (
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func GetDDLReorgBatchSize() int32 {
return atomic.LoadInt32(&ddlReorgBatchSize)
}

// SetDDLErrorCountLimit sets ddlErrorCountlimit size.
func SetDDLErrorCountLimit(cnt int64) {
atomic.StoreInt64(&ddlErrorCountlimit, cnt)
}

// GetDDLErrorCountLimit gets ddlErrorCountlimit size.
func GetDDLErrorCountLimit() int64 {
return atomic.LoadInt64(&ddlErrorCountlimit)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -363,6 +373,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBDDLReorgBatchSize:
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBDDLErrorCountLimit:
return checkUInt64SystemVar(name, value, uint64(0), math.MaxInt64, vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
Expand Down
15 changes: 8 additions & 7 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,31 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
// IsJobRollbackable checks whether the job can be rollback.
func IsJobRollbackable(job *model.Job) bool {
switch job.Type {
case model.ActionDropIndex:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization, otherwise will cause inconsistent between record and index.
if job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropColumn, model.ActionModifyColumn,
model.ActionDropTablePartition, model.ActionAddTablePartition,
model.ActionRebaseAutoID, model.ActionShardRowID,
model.ActionTruncateTable, model.ActionAddForeignKey,
model.ActionDropForeignKey:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
}
return nil
return true
}

// CancelJobs cancels the DDL jobs.
Expand Down Expand Up @@ -139,8 +140,8 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
if errs[i] != nil {
if !IsJobRollbackable(job) {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}

Expand Down

0 comments on commit 4449eb0

Please sign in to comment.