Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7471
Browse files Browse the repository at this point in the history
close tikv#7469

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Nov 29, 2023
1 parent 85f1525 commit 207bf2a
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 61 deletions.
145 changes: 86 additions & 59 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -586,6 +585,17 @@ func (c *client) getClient() pdpb.PDClient {
return c.leaderClient()
}

func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
}
}
return c.leaderClient(), ctx
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -646,39 +656,6 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
return r
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func isNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}
Expand Down Expand Up @@ -721,6 +698,38 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
Expand All @@ -739,8 +748,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -772,8 +780,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -807,8 +814,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
EndKey: endKey,
Limit: int32(limit),
}
scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -863,8 +869,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
Header: c.requestHeader(),
StoreId: storeID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -908,8 +913,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
Header: c.requestHeader(),
ExcludeTombstoneStores: options.excludeTombstone,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -936,8 +940,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
Header: c.requestHeader(),
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -971,8 +974,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
TTL: ttl,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1004,8 +1006,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
RegionId: regionID,
Group: group,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1049,8 +1050,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
RetryLimit: options.retryLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1072,8 +1072,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
Header: c.requestHeader(),
RegionId: regionID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1100,8 +1099,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R
SplitKeys: splitKeys,
RetryLimit: options.retryLimit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1130,8 +1128,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
RetryLimit: options.retryLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1179,7 +1176,13 @@ func trimHTTPPrefix(str string) string {
}

func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
<<<<<<< HEAD
protoClient := c.getClient()
=======
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1208,7 +1211,13 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad}
}
<<<<<<< HEAD
protoClient := c.getClient()
=======
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand All @@ -1223,7 +1232,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
// TODO: Add retry mechanism
// register watch components there
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
<<<<<<< HEAD
protoClient := c.getClient()
=======
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1269,7 +1284,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
}

func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
<<<<<<< HEAD
protoClient := c.getClient()
=======
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
Expand All @@ -1287,7 +1308,13 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
}

func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
<<<<<<< HEAD
protoClient := c.getClient()
=======
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand Down
Loading

0 comments on commit 207bf2a

Please sign in to comment.