diff --git a/config/config.go b/config/config.go index 7c216c85db53e..265ef8e089253 100644 --- a/config/config.go +++ b/config/config.go @@ -455,7 +455,7 @@ var defaultConf = Config{ HistorySize: 24, }, IsolationRead: IsolationRead{ - Engines: []string{"tikv", "tiflash", "tidb"}, + Engines: []string{"tikv", "tiflash"}, }, } @@ -640,7 +640,7 @@ func (c *Config) Valid() error { return fmt.Errorf("the number of [isolation-read]engines for isolation read should be at least 1") } for _, engine := range c.IsolationRead.Engines { - if engine != "tidb" && engine != "tikv" && engine != "tiflash" { + if engine != "tikv" && engine != "tiflash" { return fmt.Errorf("type of [isolation-read]engines can't be %v should be one of tidb or tikv or tiflash", engine) } } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index a66df24117558..244f53c334066 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "sync/atomic" "time" @@ -37,6 +38,8 @@ import ( const ( // ServerInformationPath store server information such as IP, port and so on. ServerInformationPath = "/tidb/server/info" + // TiFlashTableSyncProgressPath store the tiflash table replica sync progress. + TiFlashTableSyncProgressPath = "/tiflash/table/sync" // keyOpDefaultRetryCnt is the default retry count for etcd store. keyOpDefaultRetryCnt = 2 // keyOpDefaultTimeout is the default time out for etcd store. @@ -127,6 +130,67 @@ func GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { return is.getServerInfoByID(ctx, id) } +// UpdateTiFlashTableSyncProgress is used to update the tiflash table replica sync progress. +func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress float64) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return err + } + if is.etcdCli == nil { + return nil + } + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid) + return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64)) +} + +// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress. +func DeleteTiFlashTableSyncProgress(tid int64) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return err + } + if is.etcdCli == nil { + return nil + } + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid) + return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) +} + +// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress. +func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) { + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + progressMap := make(map[int64]float64) + if is.etcdCli == nil { + return progressMap, nil + } + for i := 0; i < keyOpDefaultRetryCnt; i++ { + resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath+"/", clientv3.WithPrefix()) + if err != nil { + logutil.Logger(context.Background()).Info("get tiflash table replica sync progress failed, continue checking.", zap.Error(err)) + continue + } + for _, kv := range resp.Kvs { + tid, err := strconv.ParseInt(string(kv.Key[len(TiFlashTableSyncProgressPath)+1:]), 10, 64) + if err != nil { + logutil.Logger(context.Background()).Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key))) + continue + } + progress, err := strconv.ParseFloat(string(kv.Value), 64) + if err != nil { + logutil.Logger(context.Background()).Info("invalid tiflash table replica sync progress value.", + zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + continue + } + progressMap[tid] = progress + } + break + } + return progressMap, nil +} + func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { if is.etcdCli == nil || id == is.info.ID { return is.info, nil diff --git a/infoschema/tables.go b/infoschema/tables.go index 80aba3ab96761..643f9398e3107 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -14,6 +14,7 @@ package infoschema import ( + "context" "encoding/json" "fmt" "sort" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/privilege" @@ -934,6 +936,7 @@ var tableTableTiFlashReplicaCols = []columnInfo{ {"REPLICA_COUNT", mysql.TypeLonglong, 64, 0, nil, nil}, {"LOCATION_LABELS", mysql.TypeVarchar, 64, 0, nil, nil}, {"AVAILABLE", mysql.TypeTiny, 1, 0, nil, nil}, + {"PROGRESS", mysql.TypeDouble, 22, 0, nil, nil}, } func dataForSchemata(schemas []*model.DBInfo) [][]types.Datum { @@ -1731,13 +1734,30 @@ func DataForAnalyzeStatus() (rows [][]types.Datum) { } // dataForTableTiFlashReplica constructs data for table tiflash replica info. -func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum { +func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum { var rows [][]types.Datum + progressMap, err := infosync.GetTiFlashTableSyncProgress(context.Background()) + if err != nil { + ctx.GetSessionVars().StmtCtx.AppendWarning(err) + } for _, schema := range schemas { for _, tbl := range schema.Tables { if tbl.TiFlashReplica == nil { continue } + progress := 1.0 + if !tbl.TiFlashReplica.Available { + if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 { + progress = 0 + for _, p := range pi.Definitions { + // TODO: need check partition replica available. + progress += progressMap[p.ID] + } + progress = progress / float64(len(pi.Definitions)) + } else { + progress = progressMap[tbl.ID] + } + } record := types.MakeDatums( schema.Name.O, // TABLE_SCHEMA tbl.Name.O, // TABLE_NAME @@ -1745,6 +1765,7 @@ func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum { int64(tbl.TiFlashReplica.Count), // REPLICA_COUNT strings.Join(tbl.TiFlashReplica.LocationLabels, ","), // LOCATION_LABELS tbl.TiFlashReplica.Available, // AVAILABLE + progress, // PROGRESS ) rows = append(rows, record) } @@ -1896,7 +1917,7 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) case tableTiKVRegionPeers: fullRows, err = dataForTikVRegionPeers(ctx) case tableTiFlashReplica: - fullRows = dataForTableTiFlashReplica(dbs) + fullRows = dataForTableTiFlashReplica(ctx, dbs) } if err != nil { return nil, err diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index e0d82b849206a..c34825a7ae4db 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -567,9 +567,9 @@ func (s *testTableSuite) TestForTableTiFlashReplica(c *C) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, index idx(a))") tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") - tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0")) + tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0 0")) tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tbl.Meta().TiFlashReplica.Available = true - tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1")) + tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1 1")) } diff --git a/server/http_handler.go b/server/http_handler.go index 94f08b60e4cb9..8c1964757f1f5 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -738,8 +738,10 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) type tableFlashReplicaStatus struct { // Modifying the field name needs to negotiate with TiFlash colleague. - ID int64 `json:"id"` - RegionCount uint64 `json:"region_count"` + ID int64 `json:"id"` + // RegionCount is the number of regions that need sync. + RegionCount uint64 `json:"region_count"` + // FlashRegionCount is the number of regions that already sync completed. FlashRegionCount uint64 `json:"flash_region_count"` } @@ -765,7 +767,16 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http writeError(w, err) return } - err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable()) + available := status.checkTableFlashReplicaAvailable() + err = do.DDL().UpdateTableReplicaInfo(s, status.ID, available) + if err != nil { + writeError(w, err) + } + if available { + err = infosync.DeleteTiFlashTableSyncProgress(status.ID) + } else { + err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.FlashRegionCount)/float64(status.RegionCount)) + } if err != nil { writeError(w, err) }