diff --git a/DEPS.bzl b/DEPS.bzl index 1e0c6c11a8a32..bdaf8ccb2841a 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -6946,26 +6946,26 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "81065eb2f65e0874e7519d41580bdbe279bc93b1b9631bdc6c3092bc9f9458a1", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241023023120-691e80ae0ea9", + sha256 = "537e3204b8178e2ce0ce43c744fc2699883bb33e718e267da2f1dd6c389968c2", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241111090227-70049ae310bf", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip", ], ) go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "da76c552e8a9cfcc707d462d7cdc1ea6bea4b8e9017a60fc0a23200db500978d", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20241016064947-b70107ec31e6", + sha256 = "52a62b6f6247ce31ee9d0a5dbde941ba3be3db74a713fd79643d015d98a15c5f", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20241111073742-238d4d79ea31", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index abc3f53956022..48aa84d9f786a 100644 --- a/go.mod +++ b/go.mod @@ -110,8 +110,8 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tidwall/btree v1.7.0 - github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9 - github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6 + github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf + github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index d29d71107fd3a..65f1a263cc50f 100644 --- a/go.sum +++ b/go.sum @@ -826,10 +826,10 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= -github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9 h1:1Fgp6FqjgXEj/CKcegdXu3wLo77sx7JM9NPC7sF0io0= -github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9/go.mod h1:WAp0oZxDL+3GX+QhJdG0quubJUzEH8LrFofmIxleJhs= -github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6 h1:u0z6yR68sg0pextuabJv/bD4mvwBe8iFeQOdymBUy0E= -github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= +github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf h1:qCi6BiBUPk3Ky4f2CCgBxgUmi3ZpuQLYDLgxw1ilXPA= +github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf/go.mod h1:p9zPFlKBrxhp3b/cBmKBWL9M0X4HtJjgi1ThUtQYF7o= +github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk= +github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a/go.mod h1:mkjARE7Yr8qU23YcGMSALbIxTQ9r9QBVahQOBRfU460= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index 565b092172a22..e38326cd18d43 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/tablecodec" @@ -113,16 +112,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { - currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD. + currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate flashback timestamp: %v", err) - } - currentTS = currentVer.Ver + return errors.Errorf("fail to validate flashback timestamp: %v", err) } + currentTS := currentVer.Ver + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 8d88a2d782b45..d39ce5bffe43c 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -222,10 +222,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - if name == variable.TiDBTxnReadTS { - err = sessionctx.ValidateStaleReadTS(ctx, e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetStore(), newSnapshotTS) - } else { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx(), newSnapshotTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS) + if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts // Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor. diff --git a/pkg/executor/stale_txn_test.go b/pkg/executor/stale_txn_test.go index 0b5f959f09d40..3de2cdb16ea64 100644 --- a/pkg/executor/stale_txn_test.go +++ b/pkg/executor/stale_txn_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "strconv" "testing" "time" @@ -1372,14 +1373,30 @@ func TestStaleTSO(t *testing.T) { tk.MustExec("create table t (id int)") tk.MustExec("insert into t values(1)") + ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + + // Wait until the physical advances for 1s + var currentTS uint64 + for { + tk.MustExec("begin") + currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + tk.MustExec("rollback") + if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) { + break + } + time.Sleep(time.Millisecond * 100) + } asOfExprs := []string{ - "now(3) - interval 1 second", - "current_time() - interval 1 second", - "curtime() - interval 1 second", + "now(3) - interval 10 second", + "current_time() - interval 10 second", + "curtime() - interval 10 second", } - nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second)) + nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second)) + nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS)) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO))) defer failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO") for _, expr := range asOfExprs { diff --git a/pkg/planner/core/plan_cache_utils.go b/pkg/planner/core/plan_cache_utils.go index 6901f152b323a..1588da04ded82 100644 --- a/pkg/planner/core/plan_cache_utils.go +++ b/pkg/planner/core/plan_cache_utils.go @@ -558,7 +558,7 @@ type PlanCacheStmt struct { SQLDigest *parser.Digest PlanDigest *parser.Digest ForUpdateRead bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) BindingInfo bindinfo.BindingMatchInfo diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 1d39b6e546159..591c5420b7b51 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3643,7 +3643,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3657,7 +3657,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/pkg/planner/core/preprocess.go b/pkg/planner/core/preprocess.go index ad1bf32e066b8..209f540a7b25f 100644 --- a/pkg/planner/core/preprocess.go +++ b/pkg/planner/core/preprocess.go @@ -180,7 +180,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS type PreprocessorReturn struct { initedLastSnapshotTS bool IsStaleness bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) // LastSnapshotTS is the last evaluated snapshotTS if any // otherwise it defaults to zero LastSnapshotTS uint64 diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel index 723a23f8b0fc0..2536dfeb64937 100644 --- a/pkg/sessionctx/BUILD.bazel +++ b/pkg/sessionctx/BUILD.bazel @@ -13,11 +13,9 @@ go_library( "//pkg/kv", "//pkg/lock/context", "//pkg/meta/model", - "//pkg/metrics", "//pkg/planner/planctx", "//pkg/session/cursor", "//pkg/sessionctx/sessionstates", - "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics/handle/usage/indexusage", "//pkg/table/tblctx", @@ -27,7 +25,6 @@ go_library( "//pkg/util/sli", "//pkg/util/sqlexec", "//pkg/util/topsql/stmtstats", - "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index 6f10e4ac5c1f7..ddb8c6664a503 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -17,9 +17,7 @@ package sessionctx import ( "context" "sync" - "time" - "github.com/pingcap/errors" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/expression/exprctx" "github.com/pingcap/tidb/pkg/extension" @@ -27,11 +25,9 @@ import ( "github.com/pingcap/tidb/pkg/kv" tablelock "github.com/pingcap/tidb/pkg/lock/context" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/planner/planctx" "github.com/pingcap/tidb/pkg/session/cursor" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/table/tblctx" @@ -244,42 +240,6 @@ const ( ) // ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error { - latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check - if err != nil || readTS > latestTS { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - if readTS > currentVer.Ver { - return errors.Errorf("cannot set read timestamp to a future time") - } - } - return nil -} - -// How far future from now ValidateStaleReadTS allows at most -const allowedTimeFromNow = 100 * time.Millisecond - -// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly. -func ValidateStaleReadTS(ctx context.Context, sc *stmtctx.StatementContext, store kv.Storage, readTS uint64) error { - currentTS, err := sc.GetStaleTSO() - if currentTS == 0 || err != nil { - currentTS, err = store.GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - } - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD - if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - currentTS = currentVer.Ver - } - if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { - return errors.Errorf("cannot set read timestamp to a future time") - } - return nil +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { + return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } diff --git a/pkg/sessiontxn/staleread/processor.go b/pkg/sessiontxn/staleread/processor.go index 78620657d2840..e1e87b958c345 100644 --- a/pkg/sessiontxn/staleread/processor.go +++ b/pkg/sessiontxn/staleread/processor.go @@ -31,7 +31,7 @@ import ( var _ Processor = &staleReadProcessor{} // StalenessTSEvaluator is a function to get staleness ts -type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) +type StalenessTSEvaluator func(ctx context.Context, sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { @@ -101,7 +101,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { return err } - return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return p.setEvaluatedValues(ts, is, func(_ context.Context, sctx sessionctx.Context) (uint64, error) { return ts, nil }) } @@ -117,7 +117,7 @@ func (p *baseProcessor) setEvaluatedTSWithoutEvaluator(ts uint64) (err error) { } func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { - ts, err := evaluator(p.sctx) + ts, err := evaluator(p.ctx, p.sctx) if err != nil { return err } @@ -168,10 +168,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' - evaluateTS := func(sctx sessionctx.Context) (uint64, error) { - return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf) + evaluateTS := func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return parseAndValidateAsOf(ctx, p.sctx, tn.AsOf) } - stmtAsOfTS, err := evaluateTS(p.sctx) + stmtAsOfTS, err := evaluateTS(p.ctx, p.sctx) if err != nil { return err } @@ -201,7 +201,7 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness var stmtTS uint64 if preparedTSEvaluator != nil { // If the `preparedTSEvaluator` is not nil, it means the prepared statement is stale read - if stmtTS, err = preparedTSEvaluator(p.sctx); err != nil { + if stmtTS, err = preparedTSEvaluator(p.ctx, p.sctx); err != nil { return err } } @@ -286,7 +286,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateStaleReadTS(ctx, sctx.GetSessionVars().StmtCtx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { return 0, err } @@ -299,8 +299,8 @@ func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluat return nil } - return func(sctx sessionctx.Context) (uint64, error) { - return CalculateTsWithReadStaleness(sctx, readStaleness) + return func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(ctx, sctx, readStaleness) } } diff --git a/pkg/sessiontxn/staleread/processor_test.go b/pkg/sessiontxn/staleread/processor_test.go index 9f6adc817a46d..441528aca1ce8 100644 --- a/pkg/sessiontxn/staleread/processor_test.go +++ b/pkg/sessiontxn/staleread/processor_test.go @@ -52,7 +52,7 @@ func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.P evaluator := processor.GetStalenessTSEvaluatorForPrepare() if hasEvaluator { require.NotNil(t, evaluator) - ts, err := evaluator(p.tk.Session()) + ts, err := evaluator(context.Background(), p.tk.Session()) require.NoError(t, err) require.Equal(t, processor.GetStalenessReadTS(), ts) } else { @@ -109,6 +109,7 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { tn := astTableWithAsOf(t, "") p1 := genStaleReadPoint(t, tk) p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") @@ -158,19 +159,19 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator := processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err := evaluator(tk.Session()) + evaluatorTS, err := evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") tk.MustExec("do sleep(0.01)") - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) - expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS2, evaluatorTS) @@ -217,11 +218,11 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator = processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") @@ -234,13 +235,14 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { tk := testkit.NewTestKit(t, store) p1 := genStaleReadPoint(t, tk) //p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") // execute prepared stmt with ts evaluator processor := createProcessor(t, tk.Session()) - err := processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -248,7 +250,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // will get an error when ts evaluator fails processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return 0, errors.New("mock error") }) require.Error(t, err) @@ -273,7 +275,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // prepared ts is not allowed when @@txn_read_ts is set tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) @@ -286,7 +288,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -294,7 +296,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` tk.MustExec("set @@tidb_read_staleness=-100") processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -337,7 +339,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -377,7 +379,7 @@ func TestStaleReadProcessorInTxn(t *testing.T) { // return an error when execute prepared stmt with as of processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) diff --git a/pkg/sessiontxn/staleread/util.go b/pkg/sessiontxn/staleread/util.go index a02a6110e9d81..57320ddbbf274 100644 --- a/pkg/sessiontxn/staleread/util.go +++ b/pkg/sessiontxn/staleread/util.go @@ -67,15 +67,26 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx planctx.PlanContext, tsExpr a } // CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration -func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { +func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { nowVal, err := expression.GetStmtTimestamp(sctx.GetExprCtx().GetEvalCtx()) if err != nil { return 0, err } tsVal := nowVal.Add(readStaleness) sc := sctx.GetSessionVars().StmtCtx - minTsVal := expression.GetStmtMinSafeTime(sc, sctx.GetStore(), sc.TimeZone()) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil + minSafeTSVal := expression.GetStmtMinSafeTime(sc, sctx.GetStore(), sc.TimeZone()) + calculatedTime := expression.CalAppropriateTime(tsVal, nowVal, minSafeTSVal) + readTS := oracle.GoTimeToTS(calculatedTime) + if calculatedTime.After(minSafeTSVal) { + // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that + // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). + // So in this case, do an extra check on it. + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + if err != nil { + return 0, err + } + } + return readTS, nil } // IsStmtStaleness indicates whether the current statement is staleness or not