diff --git a/ddl/cluster.go b/ddl/cluster.go index 5e0ea17907e7b..fbcfa9cd8a49f 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" @@ -324,6 +325,14 @@ func SendPrepareFlashbackToVersionRPC( if err != nil { return taskStat, err } + failpoint.Inject("mockPrepareMeetsEpochNotMatch", func(val failpoint.Value) { + if val.(bool) && bo.ErrorsNum() == 0 { + regionErr = &errorpb.Error{ + Message: "stale epoch", + EpochNotMatch: &errorpb.EpochNotMatch{}, + } + } + }) if regionErr != nil { err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) if err != nil { diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go index 322359fff411a..470a62fb90d93 100644 --- a/tests/realtikvtest/brietest/flashback_test.go +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" @@ -90,3 +91,48 @@ func TestFlashback(t *testing.T) { require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } } + +func TestPrepareFlashbackFailed(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string) + job := model.Job{} + require.NoError(t, job.Decode([]byte(jobMeta))) + require.Equal(t, job.ErrorCount, int64(0)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) + } +}