Skip to content

Commit

Permalink
*: add tiflash replica sync progress pingcap#14713
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Feb 13, 2020
1 parent 501334f commit 23410f5
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 9 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ var defaultConf = Config{
HistorySize: 24,
},
IsolationRead: IsolationRead{
Engines: []string{"tikv", "tiflash", "tidb"},
Engines: []string{"tikv", "tiflash"},
},
}

Expand Down Expand Up @@ -640,7 +640,7 @@ func (c *Config) Valid() error {
return fmt.Errorf("the number of [isolation-read]engines for isolation read should be at least 1")
}
for _, engine := range c.IsolationRead.Engines {
if engine != "tidb" && engine != "tikv" && engine != "tiflash" {
if engine != "tikv" && engine != "tiflash" {
return fmt.Errorf("type of [isolation-read]engines can't be %v should be one of tidb or tikv or tiflash", engine)
}
}
Expand Down
64 changes: 64 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync/atomic"
"time"

Expand All @@ -37,6 +38,8 @@ import (
const (
// ServerInformationPath store server information such as IP, port and so on.
ServerInformationPath = "/tidb/server/info"
// TiFlashTableSyncProgressPath store the tiflash table replica sync progress.
TiFlashTableSyncProgressPath = "/tiflash/table/sync"
// keyOpDefaultRetryCnt is the default retry count for etcd store.
keyOpDefaultRetryCnt = 2
// keyOpDefaultTimeout is the default time out for etcd store.
Expand Down Expand Up @@ -127,6 +130,67 @@ func GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) {
return is.getServerInfoByID(ctx, id)
}

// UpdateTiFlashTableSyncProgress is used to update the tiflash table replica sync progress.
func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress float64) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64))
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tid int64) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
}

// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress.
func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}
progressMap := make(map[int64]float64)
if is.etcdCli == nil {
return progressMap, nil
}
for i := 0; i < keyOpDefaultRetryCnt; i++ {
resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath+"/", clientv3.WithPrefix())
if err != nil {
logutil.Logger(context.Background()).Info("get tiflash table replica sync progress failed, continue checking.", zap.Error(err))
continue
}
for _, kv := range resp.Kvs {
tid, err := strconv.ParseInt(string(kv.Key[len(TiFlashTableSyncProgressPath)+1:]), 10, 64)
if err != nil {
logutil.Logger(context.Background()).Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key)))
continue
}
progress, err := strconv.ParseFloat(string(kv.Value), 64)
if err != nil {
logutil.Logger(context.Background()).Info("invalid tiflash table replica sync progress value.",
zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
continue
}
progressMap[tid] = progress
}
break
}
return progressMap, nil
}

func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) {
if is.etcdCli == nil || id == is.info.ID {
return is.info, nil
Expand Down
25 changes: 23 additions & 2 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package infoschema

import (
"context"
"encoding/json"
"fmt"
"sort"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/privilege"
Expand Down Expand Up @@ -934,6 +936,7 @@ var tableTableTiFlashReplicaCols = []columnInfo{
{"REPLICA_COUNT", mysql.TypeLonglong, 64, 0, nil, nil},
{"LOCATION_LABELS", mysql.TypeVarchar, 64, 0, nil, nil},
{"AVAILABLE", mysql.TypeTiny, 1, 0, nil, nil},
{"PROGRESS", mysql.TypeDouble, 22, 0, nil, nil},
}

func dataForSchemata(schemas []*model.DBInfo) [][]types.Datum {
Expand Down Expand Up @@ -1731,20 +1734,38 @@ func DataForAnalyzeStatus() (rows [][]types.Datum) {
}

// dataForTableTiFlashReplica constructs data for table tiflash replica info.
func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum {
func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum {
var rows [][]types.Datum
progressMap, err := infosync.GetTiFlashTableSyncProgress(context.Background())
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
for _, schema := range schemas {
for _, tbl := range schema.Tables {
if tbl.TiFlashReplica == nil {
continue
}
progress := 1.0
if !tbl.TiFlashReplica.Available {
if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 {
progress = 0
for _, p := range pi.Definitions {
// TODO: need check partition replica available.
progress += progressMap[p.ID]
}
progress = progress / float64(len(pi.Definitions))
} else {
progress = progressMap[tbl.ID]
}
}
record := types.MakeDatums(
schema.Name.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
tbl.ID, // TABLE_ID
int64(tbl.TiFlashReplica.Count), // REPLICA_COUNT
strings.Join(tbl.TiFlashReplica.LocationLabels, ","), // LOCATION_LABELS
tbl.TiFlashReplica.Available, // AVAILABLE
progress, // PROGRESS
)
rows = append(rows, record)
}
Expand Down Expand Up @@ -1896,7 +1917,7 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
case tableTiKVRegionPeers:
fullRows, err = dataForTikVRegionPeers(ctx)
case tableTiFlashReplica:
fullRows = dataForTableTiFlashReplica(dbs)
fullRows = dataForTableTiFlashReplica(ctx, dbs)
}
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@ func (s *testTableSuite) TestForTableTiFlashReplica(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, index idx(a))")
tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';")
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0"))
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0 0"))
tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tbl.Meta().TiFlashReplica.Available = true
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1"))
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1 1"))
}
17 changes: 14 additions & 3 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,10 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

type tableFlashReplicaStatus struct {
// Modifying the field name needs to negotiate with TiFlash colleague.
ID int64 `json:"id"`
RegionCount uint64 `json:"region_count"`
ID int64 `json:"id"`
// RegionCount is the number of regions that need sync.
RegionCount uint64 `json:"region_count"`
// FlashRegionCount is the number of regions that already sync completed.
FlashRegionCount uint64 `json:"flash_region_count"`
}

Expand All @@ -765,7 +767,16 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http
writeError(w, err)
return
}
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable())
available := status.checkTableFlashReplicaAvailable()
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, available)
if err != nil {
writeError(w, err)
}
if available {
err = infosync.DeleteTiFlashTableSyncProgress(status.ID)
} else {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.FlashRegionCount)/float64(status.RegionCount))
}
if err != nil {
writeError(w, err)
}
Expand Down

0 comments on commit 23410f5

Please sign in to comment.