Skip to content

Commit

Permalink
release 6.4 support tiflash multi-tenant (pingcap#159)
Browse files Browse the repository at this point in the history
* release 6.4 support tiflash multi-tenant

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix make check

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
  • Loading branch information
iosmanthus and AmoebaProtozoa authored Dec 20, 2022
1 parent 3580738 commit af7df70
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 51 deletions.
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export-20*/
# Files generated when testing
out
/_bazel
bazel-bin
bazel-out
bazel-testlogs
bazel-tidb-cse
/bazel-bin
/bazel-out
/bazel-testlogs
/bazel-tidb*
.ijwb/
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ go_test(
"//errno",
"//executor",
"//infoschema",
"//keyspace",
"//kv",
"//meta",
"//meta/autoid",
Expand Down
13 changes: 7 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/gcutil"
Expand Down Expand Up @@ -269,7 +270,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -327,7 +328,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -380,7 +381,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -444,7 +445,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -504,7 +505,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -530,7 +531,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), keyspace.CodecV1, true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
16 changes: 0 additions & 16 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,22 +355,6 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error {
return nil
}

func getTiFlashPeerWithoutLagCount(pollTiFlashContext *TiFlashManagementContext, tableID int64) (int, error) {
// storeIDs -> regionID, PD will not create two peer on the same store
var flashPeerCount int
for _, store := range pollTiFlashContext.TiFlashStores {
regionReplica := make(map[int64]int)
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, &regionReplica)
if err != nil {
logutil.BgLogger().Error("Fail to get peer status from TiFlash.",
zap.Int64("tableID", tableID))
return 0, err
}
flashPeerCount += len(regionReplica)
}
return flashPeerCount, nil
}

func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) {
pollMaxCount := RefreshProgressMaxTableCount
failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) {
Expand Down
3 changes: 2 additions & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/testkit/testsetup"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, keyspace.CodecV1, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//ddl",
"//domain/infosync",
"//errno",
"//keyspace",
"//kv",
"//metrics",
"//parser/ast",
Expand Down
5 changes: 3 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, keyspace.CodecV1, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, keyspace.CodecV1, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
8 changes: 7 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,13 @@ func (do *Domain) Init(

// step 1: prepare the info/schema syncer which domain reload needed.
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx,
do.ddl.GetID(),
do.ServerID,
do.etcdClient,
do.Store().GetCodec(),
skipRegisterToDashboard,
)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//ddl/placement",
"//ddl/util",
"//errno",
"//keyspace",
"//kv",
"//metrics",
"//parser/model",
Expand All @@ -40,8 +41,10 @@ go_library(
"@com_github_gorilla_mux//:mux",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand All @@ -59,6 +62,7 @@ go_test(
deps = [
"//ddl/placement",
"//ddl/util",
"//keyspace",
"//parser/model",
"//testkit/testsetup",
"//util",
Expand Down
18 changes: 14 additions & 4 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/versioninfo"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -178,7 +179,12 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
func GlobalInfoSyncerInit(ctx context.Context,
id string,
serverIDGetter func() uint64,
etcdCli *clientv3.Client,
codec tikv.Codec,
skipRegisterToDashBoard bool) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
info: getServerInfo(id, serverIDGetter),
Expand All @@ -192,7 +198,7 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec)
setGlobalInfoSyncer(is)
return is, nil
}
Expand Down Expand Up @@ -237,13 +243,17 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
return &PDPlacementManager{etcdCli: etcdCli}
}

func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager {
func initTiFlashReplicaManager(etcdCli *clientv3.Client, codec tikv.Codec) TiFlashReplicaManager {
if etcdCli == nil {
m := mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)}
return &m
}
logutil.BgLogger().Warn("init TiFlashReplicaManager", zap.Strings("pd addrs", etcdCli.Endpoints()))
return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64)}
return &TiFlashReplicaManagerCtx{
etcdCli: etcdCli,
tiflashProgressCache: make(map[int64]float64),
codec: codec,
}
}

func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager {
Expand Down
7 changes: 4 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit/testsetup"
util2 "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, keyspace.CodecV1, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, keyspace.CodecV1, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +217,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, keyspace.CodecV1, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
Loading

0 comments on commit af7df70

Please sign in to comment.