diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 6917589720e41..66d08cc739709 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -17,6 +17,8 @@ import ( "sync" "time" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -72,7 +74,7 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator { return &schemaValidator{ isStarted: true, lease: lease, - deltaSchemaInfos: make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad), + deltaSchemaInfos: make([]deltaSchemaInfo, 0, variable.DefTiDBMaxDeltaSchemaCount), } } @@ -85,14 +87,16 @@ func (s *schemaValidator) IsStarted() bool { func (s *schemaValidator) Stop() { logutil.BgLogger().Info("the schema validator stops") + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorStop).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = false s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Restart() { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorRestart).Inc() logutil.BgLogger().Info("the schema validator restarts") s.mux.Lock() defer s.mux.Unlock() @@ -100,11 +104,12 @@ func (s *schemaValidator) Restart() { } func (s *schemaValidator) Reset() { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorReset).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = true s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) { @@ -146,13 +151,15 @@ func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool { // NOTE, this function should be called under lock! func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool { if len(s.deltaSchemaInfos) == 0 { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc() logutil.BgLogger().Info("schema change history is empty", zap.Int64("currVer", currVer)) return true } newerDeltas := s.findNewerDeltas(currVer) if len(newerDeltas) == len(s.deltaSchemaInfos) { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc() logutil.BgLogger().Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer), - zap.Int64("latestSchemaVer", s.latestSchemaVer)) + zap.Int64("latestSchemaVer", s.latestSchemaVer), zap.Reflect("deltas", newerDeltas)) return true } for _, item := range newerDeltas { @@ -208,8 +215,54 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ } func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) { - s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs}) - if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad { + maxCnt := int(variable.GetMaxDeltaSchemaCount()) + if maxCnt <= 0 { + logutil.BgLogger().Info("the schema validator enqueue", zap.Int("delta max count", maxCnt)) + return + } + + delta := deltaSchemaInfo{schemaVersion, relatedTableIDs} + if len(s.deltaSchemaInfos) == 0 { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + return + } + + lastOffset := len(s.deltaSchemaInfos) - 1 + // The first item we needn't to merge, because we hope to cover more versions. + if lastOffset != 0 && ids(s.deltaSchemaInfos[lastOffset].relatedTableIDs).containIn(delta.relatedTableIDs) { + s.deltaSchemaInfos[lastOffset] = delta + } else { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + } + + if len(s.deltaSchemaInfos) > maxCnt { + logutil.BgLogger().Info("the schema validator enqueue, queue is too long", + zap.Int("delta max count", maxCnt), zap.Int64("remove schema version", s.deltaSchemaInfos[0].schemaVersion)) s.deltaSchemaInfos = s.deltaSchemaInfos[1:] } } + +type ids []int64 + +// containIn is checks if a is included in b. +func (a ids) containIn(b []int64) bool { + if len(a) > len(b) { + return false + } + + var isEqual bool + for _, i := range a { + isEqual = false + for _, j := range b { + if i == j { + isEqual = true + break + } + } + if !isEqual { + return false + } + } + + return true +} diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 7dd59b620de7e..08b6aae000e20 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testleak" ) @@ -143,3 +144,61 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh } } } + +func (*testSuite) TestEnqueue(c *C) { + lease := 10 * time.Millisecond + originalCnt := variable.GetMaxDeltaSchemaCount() + defer variable.SetMaxDeltaSchemaCount(originalCnt) + + validator := NewSchemaValidator(lease).(*schemaValidator) + c.Assert(validator.IsStarted(), IsTrue) + // maxCnt is 0. + variable.SetMaxDeltaSchemaCount(0) + validator.enqueue(1, []int64{11}) + c.Assert(validator.deltaSchemaInfos, HasLen, 0) + + // maxCnt is 10. + variable.SetMaxDeltaSchemaCount(10) + ds := []deltaSchemaInfo{ + {0, []int64{1}}, + {1, []int64{1}}, + {2, []int64{1}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {5, []int64{1, 4}}, + {6, []int64{1, 4}}, + {7, []int64{3, 1, 3}}, + {8, []int64{1, 2, 3}}, + {9, []int64{1, 2, 3}}, + } + for _, d := range ds { + validator.enqueue(d.schemaVersion, d.relatedTableIDs) + } + validator.enqueue(10, []int64{1}) + ret := []deltaSchemaInfo{ + {0, []int64{1}}, + {2, []int64{1}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {6, []int64{1, 4}}, + {9, []int64{1, 2, 3}}, + {10, []int64{1}}, + } + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // The Items' relatedTableIDs have different order. + validator.enqueue(11, []int64{1, 2, 3, 4}) + validator.enqueue(12, []int64{4, 1, 2, 3, 1}) + validator.enqueue(13, []int64{4, 1, 3, 2, 5}) + ret[len(ret)-1] = deltaSchemaInfo{13, []int64{4, 1, 3, 2, 5}} + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // The length of deltaSchemaInfos is greater then maxCnt. + validator.enqueue(14, []int64{1}) + validator.enqueue(15, []int64{2}) + validator.enqueue(16, []int64{3}) + validator.enqueue(17, []int64{4}) + ret = append(ret, deltaSchemaInfo{14, []int64{1}}) + ret = append(ret, deltaSchemaInfo{15, []int64{2}}) + ret = append(ret, deltaSchemaInfo{16, []int64{3}}) + ret = append(ret, deltaSchemaInfo{17, []int64{4}}) + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:]) +} diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 391efeb5d43da..b3cbb6d7bb460 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -36,6 +36,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" @@ -1053,3 +1054,31 @@ func (s *seqTestSuite) TestAutoIDInRetry(c *C) { tk.MustExec("insert into t values ()") tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2", "3", "4", "5")) } + +func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) + gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() + gvc.Disable() + + tk.MustExec("set @@global.tidb_max_delta_schema_count= -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'")) + // Make sure a new session will load global variables. + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(100)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_max_delta_schema_count= %v", uint64(math.MaxInt64))) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '%d'", uint64(math.MaxInt64)))) + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(16384)) + _, err := tk.Exec("set @@global.tidb_max_delta_schema_count= invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + + tk.MustExec("set @@global.tidb_max_delta_schema_count= 2048") + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(2048)) + tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) +} diff --git a/metrics/domain.go b/metrics/domain.go index 017e007e4fb98..a8ea4e3d4cbbf 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Metrics for the domain package. var ( // LoadSchemaCounter records the counter of load schema. LoadSchemaCounter = prometheus.NewCounterVec( @@ -45,4 +46,18 @@ var ( Name: "load_privilege_total", Help: "Counter of load privilege", }, []string{LblType}) + + SchemaValidatorStop = "stop" + SchemaValidatorRestart = "restart" + SchemaValidatorReset = "reset" + SchemaValidatorCacheEmpty = "cache_empty" + SchemaValidatorCacheMiss = "cache_miss" + // HandleSchemaValidate records the counter of handling schema validate. + HandleSchemaValidate = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "handle_schema_validate", + Help: "Counter of handle schema validate", + }, []string{LblType}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index fc5ecd3e55638..c9ebc168d8491 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -153,6 +153,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) + prometheus.MustRegister(HandleSchemaValidate) prometheus.MustRegister(TiKVTokenWaitDuration) prometheus.MustRegister(TiKVTxnHeartBeatHistogram) prometheus.MustRegister(GRPCConnTransientFailureCounter) diff --git a/session/session.go b/session/session.go index 3b6ed134716de..1a36431cc355d 100644 --- a/session/session.go +++ b/session/session.go @@ -1821,6 +1821,7 @@ var builtinGlobalVariable = []string{ variable.TiDBEnableIndexMerge, variable.TiDBTxnMode, variable.TiDBEnableStmtSummary, + variable.TiDBMaxDeltaSchemaCount, } var ( diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e8b59cf55df31..abc80587df603 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -897,6 +897,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { } case TiDBAllowRemoveAutoInc: s.AllowRemoveAutoInc = TiDBOptOn(val) + // It's a global variable, but it also wants to be cached in server. + case TiDBMaxDeltaSchemaCount: + SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 5b97fa5e7d168..3c9789b85480d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -702,6 +702,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, + {ScopeGlobal, TiDBMaxDeltaSchemaCount, strconv.Itoa(DefTiDBMaxDeltaSchemaCount)}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)}, {ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index b489bc56face2..cf1be91298687 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -252,6 +252,10 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // tidb_max_delta_schema_count defines the max length of deltaSchemaInfos. + // deltaSchemaInfos is a queue that maintains the history of schema changes. + TiDBMaxDeltaSchemaCount = "tidb_max_delta_schema_count" + // tidb_scatter_region will scatter the regions for DDLs when it is ON. TiDBScatterRegion = "tidb_scatter_region" @@ -350,6 +354,7 @@ const ( DefTiDBDDLReorgWorkerCount = 4 DefTiDBDDLReorgBatchSize = 256 DefTiDBDDLErrorCountLimit = 512 + DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -375,6 +380,7 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index e02efe3ed51d5..a622a0f8558ff 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -74,6 +74,16 @@ func GetDDLErrorCountLimit() int64 { return atomic.LoadInt64(&ddlErrorCountlimit) } +// SetMaxDeltaSchemaCount sets maxDeltaSchemaCount size. +func SetMaxDeltaSchemaCount(cnt int64) { + atomic.StoreInt64(&maxDeltaSchemaCount, cnt) +} + +// GetMaxDeltaSchemaCount gets maxDeltaSchemaCount size. +func GetMaxDeltaSchemaCount() int64 { + return atomic.LoadInt64(&maxDeltaSchemaCount) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -321,6 +331,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 0, 4294967295, vars) case OldPasswords: return checkUInt64SystemVar(name, value, 0, 2, vars) + case TiDBMaxDeltaSchemaCount: + return checkInt64SystemVar(name, value, 100, 16384, vars) case SessionTrackGtids: if strings.EqualFold(value, "OFF") || value == "0" { return "OFF", nil