Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: upgrade the PD client to adopt the latest refactor #58440

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -103,7 +103,7 @@ type CliEnv struct {
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := c.Cache.PDClient().GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/engine"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
Expand All @@ -34,7 +34,7 @@ type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
Expand All @@ -45,7 +45,7 @@ func GetAllTiKVStores(
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -154,11 +156,11 @@ func NewPdController(
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, pdAddrs, securityOption,
pd.WithGRPCDialOptions(maxCallMsgSize...),
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
opt.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
opt.WithCustomTimeoutOption(60*time.Second),
)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/internal/rawkv/rawkv_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// RawkvClient is the interface for rawkv.client
Expand All @@ -29,7 +29,7 @@ func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Secur
ctx,
pdAddrs,
security,
pd.WithCustomTimeoutOption(10*time.Second))
opt.WithCustomTimeoutOption(10*time.Second))
}

type KVPair struct {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/snap_client/placement_rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
)

func generateTables() []*snapclient.CreatedTable {
Expand Down Expand Up @@ -104,8 +104,8 @@ func TestContextManagerOnlineNoStores(t *testing.T) {
require.NoError(t, err)
}

func generateRegions() []*pd.Region {
return []*pd.Region{
func generateRegions() []*router.Region {
return []*router.Region{
{
Meta: &metapb.Region{
Id: 0,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/util/intest"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
logutil.Key("end", v.Region.EndKey),
zap.Uint64("id", v.Region.Id))
}
resp, err := c.client.ScatterRegions(ctx, regionsID, pd.WithSkipStoreLimit())
resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit())
if err != nil {
return err
}
Expand Down
48 changes: 25 additions & 23 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -248,8 +250,8 @@ func (c *MockPDClientForSplit) ScanRegions(
_ context.Context,
key, endKey []byte,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -264,9 +266,9 @@ func (c *MockPDClientForSplit) ScanRegions(
}

regions := c.Regions.ScanRange(key, endKey, limit)
ret := make([]*pd.Region, 0, len(regions))
ret := make([]*router.Region, 0, len(regions))
for _, r := range regions {
ret = append(ret, &pd.Region{
ret = append(ret, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -276,10 +278,10 @@ func (c *MockPDClientForSplit) ScanRegions(

func (c *MockPDClientForSplit) BatchScanRegions(
_ context.Context,
keyRanges []pd.KeyRange,
keyRanges []router.KeyRange,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -293,7 +295,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
c.scanRegions.beforeHook()
}

regions := make([]*pd.Region, 0, len(keyRanges))
regions := make([]*router.Region, 0, len(keyRanges))
var lastRegion *pdtypes.Region
for _, keyRange := range keyRanges {
if lastRegion != nil {
Expand All @@ -307,7 +309,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
rs := c.Regions.ScanRange(keyRange.StartKey, keyRange.EndKey, limit)
for _, r := range rs {
lastRegion = r
regions = append(regions, &pd.Region{
regions = append(regions, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -316,13 +318,13 @@ func (c *MockPDClientForSplit) BatchScanRegions(
return regions, nil
}

func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) {
func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

for _, r := range c.Regions.Regions {
if r.Meta.Id == regionID {
return &pd.Region{
return &router.Region{
Meta: r.Meta,
Leader: r.Leader,
}, nil
Expand Down Expand Up @@ -370,7 +372,7 @@ func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64)
return newRegionNotFullyReplicatedErr(regionID)
}

func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -517,7 +519,7 @@ func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID str
type FakePDClient struct {
pd.Client
stores []*metapb.Store
regions []*pd.Region
regions []*router.Region

notLeader bool
retryTimes *int
Expand All @@ -540,21 +542,21 @@ func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *Fa
}
}

func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) {
func (fpdc *FakePDClient) SetRegions(regions []*router.Region) {
fpdc.regions = regions
}

func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (fpdc *FakePDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (fpdc *FakePDClient) ScanRegions(
ctx context.Context,
key, endKey []byte,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand All @@ -572,11 +574,11 @@ func (fpdc *FakePDClient) ScanRegions(

func (fpdc *FakePDClient) BatchScanRegions(
ctx context.Context,
ranges []pd.KeyRange,
ranges []router.KeyRange,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand Down Expand Up @@ -633,7 +635,7 @@ func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) {
})
}

func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) {
func (f *FakeSplitClient) AppendPdRegion(region *router.Region) {
f.regions = append(f.regions, &RegionInfo{
Region: region.Meta,
Leader: region.Leader,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, lim
}

func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error) {
res, err := c.Client.GetAllStores(ctx, pd.WithExcludeTombstone())
res, err := c.Client.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -855,12 +857,12 @@ type mockPDClient struct {
fakeRegions []*region
}

func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) {
func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*router.Region, error) {
sort.Slice(p.fakeRegions, func(i, j int) bool {
return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0
})

result := make([]*pd.Region, 0, len(p.fakeRegions))
result := make([]*router.Region, 0, len(p.fakeRegions))
for _, region := range p.fakeRegions {
if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit {
regionInfo := newMockRegion(region.id, region.rng.StartKey, region.rng.EndKey)
Expand All @@ -879,7 +881,7 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor
}, nil
}

func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
// only used for GetRegionCache once in resolve lock
return []*metapb.Store{
{
Expand All @@ -893,14 +895,14 @@ func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 {
return 1
}

func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region {
func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *router.Region {
leader := &metapb.Peer{
Id: regionID,
StoreId: 1,
Role: metapb.PeerRole_Voter,
}

return &pd.Region{
return &router.Region{
Meta: &metapb.Region{
Id: regionID,
StartKey: startKey,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/task/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/keepalive"
)

Expand All @@ -56,7 +57,7 @@ func (m mockPDClient) GetClusterID(_ context.Context) uint64 {
return 1
}

func (m mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (m mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
return []*metapb.Store{}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/utils/storewatch/watching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

type SequentialReturningStoreMeta struct {
Expand All @@ -20,7 +20,7 @@ func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) util.StoreMeta
return &SequentialReturningStoreMeta{sequence: sequence}
}

func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
if len(s.sequence) == 0 {
return nil, fmt.Errorf("too many call to `GetAllStores` in test")
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/engine"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ type VerChecker func(store *metapb.Store, ver *semver.Version) error

// CheckClusterVersion check TiKV version.
func CheckClusterVersion(ctx context.Context, client pd.Client, checker VerChecker) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := client.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading
Loading