diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 24d269c1f6244..04a95637cb138 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -1090,7 +1090,7 @@ func (s *testIntegrationSuite5) TestBackwardCompatibility(c *C) { ticker := time.NewTicker(s.lease) defer ticker.Stop() for range ticker.C { - historyJob, err := s.getHistoryDDLJob(job.ID) + historyJob, err := getHistoryDDLJob(s.store, job.ID) c.Assert(err, IsNil) if historyJob == nil { @@ -1173,10 +1173,10 @@ func checkGetMaxTableRowID(ctx *testMaxTableRowIDContext, store kv.Storage, expe c.Assert(maxID, Equals, expectMaxID) } -func (s *testIntegrationSuite) getHistoryDDLJob(id int64) (*model.Job, error) { +func getHistoryDDLJob(store kv.Storage, id int64) (*model.Job, error) { var job *model.Job - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error job, err1 = t.GetHistoryDDLJob(id) diff --git a/ddl/db_test.go b/ddl/db_test.go index d470a78a7d127..d2e8d9cd27429 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" @@ -4478,6 +4479,61 @@ func (s *testDBSuite6) TestAlterOrderBy(c *C) { s.tk.MustExec("drop table if exists ob") } +func (s *testSerialDBSuite) TestDDLJobErrorCount(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists ddl_error_table, new_ddl_error_table") + tk.MustExec("create table ddl_error_table(a int)") + is := s.dom.InfoSchema() + schemaName := model.NewCIStr("test") + tableName := model.NewCIStr("ddl_error_table") + schema, ok := is.SchemaByName(schemaName) + c.Assert(ok, IsTrue) + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + newTableName := model.NewCIStr("new_ddl_error_table") + job := &model.Job{ + SchemaID: schema.ID, + TableID: tbl.Meta().ID, + SchemaName: schema.Name.L, + Type: model.ActionRenameTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{schema.ID, newTableName}, + } + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockErrEntrySizeTooLarge", `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockErrEntrySizeTooLarge"), IsNil) + }() + + txn, err := s.store.Begin() + c.Assert(err, IsNil) + t := meta.NewMeta(txn) + job.ID, err = t.GenGlobalID() + c.Assert(err, IsNil) + job.Version = 1 + job.StartTS = txn.StartTS() + + err = t.EnQueueDDLJob(job) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + + ticker := time.NewTicker(s.lease) + defer ticker.Stop() + for range ticker.C { + historyJob, err := getHistoryDDLJob(s.store, job.ID) + c.Assert(err, IsNil) + if historyJob == nil { + continue + } + c.Assert(historyJob.ErrorCount, Equals, int64(1)) + kv.ErrEntryTooLarge.Equal(historyJob.Error) + break + } +} + func init() { // Make sure it will only be executed once. domain.SchemaOutOfDateRetryInterval = int64(50 * time.Millisecond) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 83af5c404f70a..6e9b0b1dec514 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -283,6 +283,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e // Reduce this txn entry size. job.BinlogInfo.Clean() job.Error = toTError(err) + job.ErrorCount++ job.SchemaState = model.StateNone job.State = model.JobStateCancelled err = w.finishDDLJob(t, job) @@ -293,6 +294,11 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e // updateDDLJob updates the DDL job information. // Every time we enter another state except final state, we must call this function. func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error { + failpoint.Inject("mockErrEntrySizeTooLarge", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(kv.ErrEntryTooLarge) + } + }) updateRawArgs := true // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, // so we shouldn't replace RawArgs with the marshaling Args.