From 767126993b69610188944116b2a58e58f6f41d82 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 6 Feb 2023 19:39:23 +0800 Subject: [PATCH 1/7] flashback: retry getMinSafeTS during flashback --- ddl/cluster.go | 28 ++++++- ddl/cluster_test.go | 28 ++----- ddl/tiflashtest/ddl_tiflash_test.go | 7 +- executor/recover_test.go | 63 +++++++++++---- .../flashbacktest/flashback_test.go | 81 ++++++------------- 5 files changed, 106 insertions(+), 101 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 32695ff01b819..44e90f4c2e53b 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" - "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -54,6 +53,8 @@ var pdScheduleKey = []string{ "merge-schedule-limit", } +var FlashbackGetMinSafeTimeTimeout = time.Minute // 1min + const ( flashbackMaxBackoff = 1800000 // 1800s flashbackTimeout = 3 * time.Minute // 3min @@ -99,6 +100,16 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error { return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam) } +func getMinSafeTS(s kv.Storage) time.Time { + minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope) + // Inject mocked SafeTS for test. + failpoint.Inject("injectSafeTS", func(val failpoint.Value) { + injectTS := val.(int) + minSafeTS = uint64(injectTS) + }) + return oracle.GetTimeFromTS(minSafeTS) +} + // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) @@ -111,12 +122,21 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack } currentTS = currentVer.Ver } - if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) { + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) + if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") } - if oracle.GetTimeFromTS(flashBackTS).After(expression.GetMinSafeTime(sctx)) { - return errors.Errorf("cannot set flashback timestamp to too close to present time") + + start := time.Now() + minSafeTime := getMinSafeTS(sctx.GetStore()) + for oracleFlashbackTS.After(minSafeTime) { + if time.Since(start) >= FlashbackGetMinSafeTimeTimeout { + return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) + } + time.Sleep(time.Second) + minSafeTime = getMinSafeTS(sctx.GetStore()) } + gcSafePoint, err := gcutil.GetGCSafePoint(sctx) if err != nil { return err diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 55c780d55e536..bff7a0af9a75f 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -71,9 +71,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) oldValue := map[string]interface{}{ @@ -111,8 +109,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { require.EqualValues(t, finishValue["merge-schedule-limit"], 1) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestAddDDLDuringFlashback(t *testing.T) { @@ -128,9 +125,7 @@ func TestAddDDLDuringFlashback(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -150,8 +145,7 @@ func TestAddDDLDuringFlashback(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestGlobalVariablesOnFlashback(t *testing.T) { @@ -167,9 +161,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -232,8 +224,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestCancelFlashbackCluster(t *testing.T) { @@ -247,9 +238,7 @@ func TestCancelFlashbackCluster(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -284,6 +273,5 @@ func TestCancelFlashbackCluster(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go index c3ec3a1d2b0fb..6526dbc01553d 100644 --- a/ddl/tiflashtest/ddl_tiflash_test.go +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -458,9 +458,7 @@ func TestTiFlashFlashbackCluster(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s") @@ -473,8 +471,7 @@ func TestTiFlashFlashbackCluster(t *testing.T) { tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) { diff --git a/executor/recover_test.go b/executor/recover_test.go index cbe3ec2c6799b..847d603eac059 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" @@ -307,9 +308,7 @@ func TestRecoverClusterMeetError(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) // Get GC safe point error. @@ -346,8 +345,7 @@ func TestRecoverClusterMeetError(t *testing.T) { errorMsg = fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String()) tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } @@ -355,6 +353,12 @@ func TestFlashbackWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + originTimeout := ddl.FlashbackGetMinSafeTimeTimeout + ddl.FlashbackGetMinSafeTimeTimeout = 0 + defer func() { + ddl.FlashbackGetMinSafeTimeTimeout = originTimeout + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -380,9 +384,8 @@ func TestFlashbackWithSafeTs(t *testing.T) { compareWithSafeTS: 0, }, { - name: "10 seconds ago to now, safeTS 5 secs ago", - // Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped. - sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)), + name: "10 seconds ago to now, safeTS 5 secs ago", + sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)), compareWithSafeTS: -1, }, @@ -395,19 +398,51 @@ func TestFlashbackWithSafeTs(t *testing.T) { } for _, testcase := range testcases { t.Log(testcase.name) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", testcase.injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS))) if testcase.compareWithSafeTS == 1 { tk.MustContainErrMsg(testcase.sql, - "cannot set flashback timestamp to too close to present time") + "cannot set flashback timestamp after min-resolved-ts") } else { tk.MustExec(testcase.sql) } } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) +} + +func TestFlashbackRetryGetMinSafeTime(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + // Set GC safe point. + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + time.Sleep(time.Second) + ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + flashbackTs := oracle.GetTimeFromTS(ts) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(-10*time.Minute))))) + + go func() { + time.Sleep(2 * time.Second) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(10*time.Minute))))) + }() + + start := time.Now() + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs)) + duration := time.Since(start) + require.Greater(t, duration, 2*time.Second) + require.Less(t, duration, 5*time.Second) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } diff --git a/tests/realtikvtest/flashbacktest/flashback_test.go b/tests/realtikvtest/flashbacktest/flashback_test.go index 809750548a4e4..e3350f4a1774a 100644 --- a/tests/realtikvtest/flashbacktest/flashback_test.go +++ b/tests/realtikvtest/flashbacktest/flashback_test.go @@ -76,9 +76,7 @@ func TestFlashback(t *testing.T) { require.NoError(t, err) injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec("insert t values (4), (5), (6)") @@ -88,8 +86,7 @@ func TestFlashback(t *testing.T) { require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -114,9 +111,7 @@ func TestPrepareFlashbackFailed(t *testing.T) { require.NoError(t, err) injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) @@ -132,8 +127,7 @@ func TestPrepareFlashbackFailed(t *testing.T) { require.NoError(t, job.Decode([]byte(jobMeta))) require.Equal(t, job.ErrorCount, int64(0)) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) } } @@ -166,9 +160,7 @@ func TestFlashbackAddDropIndex(t *testing.T) { require.Greater(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec("insert t values (4), (5), (6)") @@ -179,8 +171,7 @@ func TestFlashbackAddDropIndex(t *testing.T) { tk.MustGetErrCode("select max(a) from t use index(k)", errno.ErrKeyDoesNotExist) require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -214,9 +205,7 @@ func TestFlashbackAddDropModifyColumn(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec("insert t values (4, 4), (5, 5), (6, 6)") @@ -230,8 +219,7 @@ func TestFlashbackAddDropModifyColumn(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") require.Equal(t, tk.MustQuery("select max(b) from t").Rows()[0][0], "3") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -270,9 +258,7 @@ func TestFlashbackBasicRenameDropCreateTable(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -282,8 +268,7 @@ func TestFlashbackBasicRenameDropCreateTable(t *testing.T) { require.Equal(t, tk.MustQuery("select max(a) from t1").Rows()[0][0], "6") require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -311,17 +296,14 @@ func TestFlashbackCreateDropTableWithData(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) tk.MustExec("admin check table t") require.Equal(t, tk.MustQuery("select count(a) from t").Rows()[0][0], "0") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -353,9 +335,7 @@ func TestFlashbackCreateDropSchema(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -365,8 +345,7 @@ func TestFlashbackCreateDropSchema(t *testing.T) { tk.MustGetErrCode("use test1", errno.ErrBadDB) tk.MustGetErrCode("use test2", errno.ErrBadDB) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -395,9 +374,7 @@ func TestFlashbackAutoID(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -408,8 +385,7 @@ func TestFlashbackAutoID(t *testing.T) { res = tk.MustQuery("select max(a) from t").Rows() require.Equal(t, res[0][0], "101") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -438,9 +414,7 @@ func TestFlashbackSequence(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -448,8 +422,7 @@ func TestFlashbackSequence(t *testing.T) { res = tk.MustQuery("select nextval(seq)").Rows() require.Equal(t, res[0][0], "101") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -485,9 +458,7 @@ func TestFlashbackPartitionTable(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -502,8 +473,7 @@ func TestFlashbackPartitionTable(t *testing.T) { require.Equal(t, res[0][1], "-1") require.Equal(t, res[0][2], "102") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } @@ -529,9 +499,7 @@ func TestFlashbackTmpTable(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) @@ -547,15 +515,12 @@ func TestFlashbackTmpTable(t *testing.T) { injectSafeTS = oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) tk.MustGetErrCode("select * from t", errno.ErrNoSuchTable) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } } From 2c04223c7c07eea4eac3dd268d918e5ee5a63db0 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 6 Feb 2023 19:40:36 +0800 Subject: [PATCH 2/7] update name --- ddl/cluster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 44e90f4c2e53b..df4b9ab70cf18 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -100,7 +100,7 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error { return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam) } -func getMinSafeTS(s kv.Storage) time.Time { +func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope) // Inject mocked SafeTS for test. failpoint.Inject("injectSafeTS", func(val failpoint.Value) { @@ -128,13 +128,13 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack } start := time.Now() - minSafeTime := getMinSafeTS(sctx.GetStore()) + minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore()) for oracleFlashbackTS.After(minSafeTime) { if time.Since(start) >= FlashbackGetMinSafeTimeTimeout { return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) } time.Sleep(time.Second) - minSafeTime = getMinSafeTS(sctx.GetStore()) + minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore()) } gcSafePoint, err := gcutil.GetGCSafePoint(sctx) From 984a16b6e007ec6eb4911cb804b6470ffd491727 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 6 Feb 2023 19:47:32 +0800 Subject: [PATCH 3/7] fix test --- ddl/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/cluster.go b/ddl/cluster.go index df4b9ab70cf18..7104f7ef85922 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -53,6 +53,7 @@ var pdScheduleKey = []string{ "merge-schedule-limit", } +// FlashbackGetMinSafeTimeTimeout, only for test, don't change it. var FlashbackGetMinSafeTimeTimeout = time.Minute // 1min const ( From da9722c12c4e2ec0334cf00411a95270c53ff839 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 6 Feb 2023 20:05:13 +0800 Subject: [PATCH 4/7] fix ci --- ddl/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 7104f7ef85922..0ef318e6c3215 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -53,7 +53,7 @@ var pdScheduleKey = []string{ "merge-schedule-limit", } -// FlashbackGetMinSafeTimeTimeout, only for test, don't change it. +// FlashbackGetMinSafeTimeTimeout only for test, don't change it. var FlashbackGetMinSafeTimeTimeout = time.Minute // 1min const ( From 750be6e4cde10900cc1e1cc83e9857f7252dd212 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 7 Feb 2023 11:00:36 +0800 Subject: [PATCH 5/7] follow comments --- ddl/cluster.go | 10 ++++++++-- executor/recover_test.go | 3 +++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 0ef318e6c3215..c99f36b75e26b 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -130,12 +130,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack start := time.Now() minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore()) + ticker := time.NewTicker(time.Second) for oracleFlashbackTS.After(minSafeTime) { if time.Since(start) >= FlashbackGetMinSafeTimeTimeout { return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) } - time.Sleep(time.Second) - minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore()) + select { + case <-ticker.C: + minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore()) + break + case <-ctx.Done(): + return ctx.Err() + } } gcSafePoint, err := gcutil.GetGCSafePoint(sctx) diff --git a/executor/recover_test.go b/executor/recover_test.go index 847d603eac059..ebb998f53a766 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -401,8 +401,11 @@ func TestFlashbackWithSafeTs(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS))) if testcase.compareWithSafeTS == 1 { + start := time.Now() tk.MustContainErrMsg(testcase.sql, "cannot set flashback timestamp after min-resolved-ts") + // When set `FlashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`. + require.Less(t, time.Since(start), time.Second) } else { tk.MustExec(testcase.sql) } From 18149f618d863a94fcdb5c09f2d3f5b6cf25d3d9 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 7 Feb 2023 11:02:37 +0800 Subject: [PATCH 6/7] add defer --- ddl/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/cluster.go b/ddl/cluster.go index c99f36b75e26b..ac4defb13ef35 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -131,6 +131,7 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack start := time.Now() minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore()) ticker := time.NewTicker(time.Second) + defer ticker.Stop() for oracleFlashbackTS.After(minSafeTime) { if time.Since(start) >= FlashbackGetMinSafeTimeTimeout { return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) From d9898f9e6bec16763b3d06595550fe34924000b8 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 7 Feb 2023 11:30:22 +0800 Subject: [PATCH 7/7] follow comments --- ddl/cluster.go | 11 +++++++---- executor/recover_test.go | 11 +++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index ac4defb13ef35..de16b0e60e8b2 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -53,9 +53,6 @@ var pdScheduleKey = []string{ "merge-schedule-limit", } -// FlashbackGetMinSafeTimeTimeout only for test, don't change it. -var FlashbackGetMinSafeTimeTimeout = time.Minute // 1min - const ( flashbackMaxBackoff = 1800000 // 1800s flashbackTimeout = 3 * time.Minute // 3min @@ -128,12 +125,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return errors.Errorf("cannot set flashback timestamp to future time") } + flashbackGetMinSafeTimeTimeout := time.Minute + failpoint.Inject("changeFlashbackGetMinSafeTimeTimeout", func(val failpoint.Value) { + t := val.(int) + flashbackGetMinSafeTimeTimeout = time.Duration(t) + }) + start := time.Now() minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore()) ticker := time.NewTicker(time.Second) defer ticker.Stop() for oracleFlashbackTS.After(minSafeTime) { - if time.Since(start) >= FlashbackGetMinSafeTimeTimeout { + if time.Since(start) >= flashbackGetMinSafeTimeTimeout { return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) } select { diff --git a/executor/recover_test.go b/executor/recover_test.go index ebb998f53a766..806e251f17769 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" @@ -353,13 +352,8 @@ func TestFlashbackWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - originTimeout := ddl.FlashbackGetMinSafeTimeTimeout - ddl.FlashbackGetMinSafeTimeTimeout = 0 - defer func() { - ddl.FlashbackGetMinSafeTimeTimeout = originTimeout - }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -404,7 +398,7 @@ func TestFlashbackWithSafeTs(t *testing.T) { start := time.Now() tk.MustContainErrMsg(testcase.sql, "cannot set flashback timestamp after min-resolved-ts") - // When set `FlashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`. + // When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`. require.Less(t, time.Since(start), time.Second) } else { tk.MustExec(testcase.sql) @@ -412,6 +406,7 @@ func TestFlashbackWithSafeTs(t *testing.T) { } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout")) } func TestFlashbackRetryGetMinSafeTime(t *testing.T) {