diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 65ff4288987a1..9f8e01f96007e 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -29,7 +29,6 @@ go_library( "//distsql", "//kv", "//meta", - "//meta/autoid", "//parser/model", "//statistics/handle", "//util", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index daac9609ae5f4..49bba26612578 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" @@ -561,24 +560,21 @@ func BuildBackupRangeAndSchema( zap.String("table", tableInfo.Name.O), ) - tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version) - idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer) - seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer) - randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer) + autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID) var globalAutoID int64 switch { case tableInfo.IsSequence(): - globalAutoID, err = seqAlloc.NextGlobalAutoID() + globalAutoID, err = autoIDAccess.SequenceCycle().Get() case tableInfo.IsView() || !utils.NeedAutoID(tableInfo): // no auto ID for views or table without either rowID nor auto_increment ID. default: - globalAutoID, err = idAlloc.NextGlobalAutoID() + globalAutoID, err = autoIDAccess.RowID().Get() } if err != nil { return nil, nil, nil, errors.Trace(err) } - tableInfo.AutoIncID = globalAutoID + tableInfo.AutoIncID = globalAutoID + 1 if !isFullBackup { // according to https://github.com/pingcap/tidb/issues/32290. // ignore placement policy when not in full backup @@ -591,11 +587,11 @@ func BuildBackupRangeAndSchema( if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() { // this table has auto_random id, we need backup and rebase in restoration var globalAutoRandID int64 - globalAutoRandID, err = randAlloc.NextGlobalAutoID() + globalAutoRandID, err = autoIDAccess.RandomID().Get() if err != nil { return nil, nil, nil, errors.Trace(err) } - tableInfo.AutoRandID = globalAutoRandID + tableInfo.AutoRandID = globalAutoRandID + 1 logger.Debug("change table AutoRandID", zap.Int64("AutoRandID", globalAutoRandID)) } diff --git a/ddl/cluster.go b/ddl/cluster.go index de16b0e60e8b2..ea0567dfb9c73 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -623,8 +623,8 @@ func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) { // A Flashback has 4 different stages. // 1. before lock flashbackClusterJobID, check clusterJobID and lock it. -// 2. before flashback start, check timestamp, disable GC and close PD schedule. -// 3. phase 1, get key ranges, lock all regions. +// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges. +// 3. phase 1, lock flashback key ranges. // 4. phase 2, send flashback RPC, do flashback jobs. func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { inFlashbackTest := false @@ -692,7 +692,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue job.SchemaState = model.StateDeleteOnly return ver, nil - // Stage 2, check flashbackTS, close GC and PD schedule. + // Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges. case model.StateDeleteOnly: if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { job.State = model.JobStateCancelled @@ -711,8 +711,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } job.Args[keyRangesOffset] = keyRanges job.SchemaState = model.StateWriteOnly - return ver, nil - // Stage 3, get key ranges and get locks. + return updateSchemaVersion(d, t, job) + // Stage 3, lock related key ranges. case model.StateWriteOnly: // TODO: Support flashback in unistore. if inFlashbackTest { @@ -742,7 +742,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } job.Args[commitTSOffset] = commitTS job.SchemaState = model.StateWriteReorganization - return updateSchemaVersion(d, t, job) + return ver, nil // Stage 4, get key ranges and send flashback RPC. case model.StateWriteReorganization: // TODO: Support flashback in unistore. diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index bff7a0af9a75f..29c237270c5f9 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -136,7 +136,8 @@ func TestAddDDLDuringFlashback(t *testing.T) { hook.OnJobRunBeforeExported = func(job *model.Job) { assert.Equal(t, model.ActionFlashbackCluster, job.Type) if job.SchemaState == model.StateWriteOnly { - _, err := tk.Exec("alter table t add column b int") + tk1 := testkit.NewTestKit(t, store) + _, err := tk1.Exec("alter table test.t add column b int") assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job") } } diff --git a/domain/domain.go b/domain/domain.go index 20bd33ca3f794..07b69fb365f4f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -471,6 +471,18 @@ func (do *Domain) GetScope(status string) variable.ScopeFlag { return variable.DefaultStatusVarScopeFlag } +func getFlashbackStartTSFromErrorMsg(err error) uint64 { + slices := strings.Split(err.Error(), "is in flashback progress, FlashbackStartTS is ") + if len(slices) != 2 { + return 0 + } + version, err := strconv.ParseUint(slices[1], 10, 0) + if err != nil { + return 0 + } + return version +} + // Reload reloads InfoSchema. // It's public in order to do the test. func (do *Domain) Reload() error { @@ -490,7 +502,15 @@ func (do *Domain) Reload() error { return err } - is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(ver.Ver) + version := ver.Ver + is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(version) + if err != nil { + if version = getFlashbackStartTSFromErrorMsg(err); version != 0 { + // use the lastest available version to create domain + version -= 1 + is, hitCache, oldSchemaVersion, changes, err = do.loadInfoSchema(version) + } + } metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds()) if err != nil { metrics.LoadSchemaCounter.WithLabelValues("failed").Inc() @@ -519,7 +539,7 @@ func (do *Domain) Reload() error { } // lease renew, so it must be executed despite it is cache or not - do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes) + do.SchemaValidator.Update(version, oldSchemaVersion, is.SchemaMetaVersion(), changes) lease := do.DDL().GetLease() sub := time.Since(startTime) // Reload interval is lease / 2, if load schema time elapses more than this interval, diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 592f558f0b27c..20018e9c82b19 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -153,7 +153,9 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha s.do.Store().GetMemCache().Delete(tblIDs[idx]) } if ac == uint64(model.ActionFlashbackCluster) { - s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn() + if s.do != nil && s.do.InfoSyncer() != nil && s.do.InfoSyncer().GetSessionManager() != nil { + s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn() + } } } logutil.BgLogger().Debug("update schema validator", zap.Int64("oldVer", oldVer), diff --git a/tests/realtikvtest/flashbacktest/BUILD.bazel b/tests/realtikvtest/flashbacktest/BUILD.bazel index e2cf7c91cc7c4..a3d81277e6e06 100644 --- a/tests/realtikvtest/flashbacktest/BUILD.bazel +++ b/tests/realtikvtest/flashbacktest/BUILD.bazel @@ -9,13 +9,17 @@ go_test( flaky = True, race = "on", deps = [ + "//ddl", "//ddl/util", + "//domain", "//errno", + "//meta", "//parser/model", "//testkit", "//testkit/testsetup", "//tests/realtikvtest", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//util", diff --git a/tests/realtikvtest/flashbacktest/flashback_test.go b/tests/realtikvtest/flashbacktest/flashback_test.go index e3350f4a1774a..f40632484c4d5 100644 --- a/tests/realtikvtest/flashbacktest/flashback_test.go +++ b/tests/realtikvtest/flashbacktest/flashback_test.go @@ -17,15 +17,20 @@ package flashbacktest import ( "context" "fmt" + "strings" "testing" "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" tikvutil "github.com/tikv/client-go/v2/util" @@ -524,3 +529,71 @@ func TestFlashbackTmpTable(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } + +func TestFlashbackInProcessErrorMsg(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + + originHook := dom.DDL().GetHook() + + tk := testkit.NewTestKit(t, store) + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + // do some ddl and dml + tk.MustExec("alter table t add index k(a)") + tk.MustExec("insert into t values (1), (2), (3)") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + hook := newTestCallBack(t, dom) + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionFlashbackCluster && job.SchemaState == model.StateWriteReorganization { + txn, err := store.Begin() + assert.NoError(t, err) + _, err = meta.NewMeta(txn).ListDatabases() + errorMsg := err.Error() + assert.Contains(t, errorMsg, "is in flashback progress, FlashbackStartTS is ") + slices := strings.Split(errorMsg, "is in flashback progress, FlashbackStartTS is ") + assert.Equal(t, len(slices), 2) + assert.NotEqual(t, slices[1], "0") + txn.Rollback() + } + } + dom.DDL().SetHook(hook) + tk.Exec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + dom.DDL().SetHook(originHook) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +type testCallback struct { + ddl.Callback + OnJobRunBeforeExported func(job *model.Job) +} + +func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback { + defHookFactory, err := ddl.GetCustomizedHook("default_hook") + require.NoError(t, err) + return &testCallback{ + Callback: defHookFactory(dom), + } +} + +func (c *testCallback) OnJobRunBefore(job *model.Job) { + if c.OnJobRunBeforeExported != nil { + c.OnJobRunBeforeExported(job) + } +}