Skip to content

Commit

Permalink
client: avoid to add redundant grpc metadata (#7471)
Browse files Browse the repository at this point in the history
close #7469

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] authored Nov 29, 2023
1 parent 54bf70e commit 180ff57
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 80 deletions.
130 changes: 61 additions & 69 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -825,6 +824,17 @@ func (c *client) getClient() pdpb.PDClient {
return c.leaderClient()
}

func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
}
}
return c.leaderClient(), ctx
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -929,39 +939,6 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
return r
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func isNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}
Expand Down Expand Up @@ -1004,6 +981,38 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
Expand All @@ -1022,8 +1031,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1055,8 +1063,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1090,8 +1097,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
EndKey: endKey,
Limit: int32(limit),
}
scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1146,8 +1152,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
Header: c.requestHeader(),
StoreId: storeID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1191,8 +1196,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
Header: c.requestHeader(),
ExcludeTombstoneStores: options.excludeTombstone,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1219,8 +1223,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
Header: c.requestHeader(),
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1254,8 +1257,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
TTL: ttl,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1287,8 +1289,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
RegionId: regionID,
Group: group,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1332,8 +1333,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
RetryLimit: options.retryLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1355,8 +1355,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
Header: c.requestHeader(),
RegionId: regionID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1383,8 +1382,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R
SplitKeys: splitKeys,
RetryLimit: options.retryLimit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1414,8 +1412,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
SkipStoreLimit: options.skipStoreLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1465,8 +1462,7 @@ func trimHTTPPrefix(str string) string {
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1497,8 +1493,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand All @@ -1515,8 +1510,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1564,8 +1558,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
Expand All @@ -1585,8 +1578,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand Down
10 changes: 3 additions & 7 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

Expand All @@ -48,8 +47,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
KeyspaceId: keyspaceID,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -80,8 +78,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
SafePoint: safePoint,
Ttl: ttl,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand All @@ -104,8 +101,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
Expand Down
4 changes: 0 additions & 4 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/client/grpcutil"
)

// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -57,7 +56,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
Header: c.requestHeader(),
Name: name,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().LoadKeyspace(ctx, req)
cancel()

Expand Down Expand Up @@ -98,7 +96,6 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp
Id: id,
State: state,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().UpdateKeyspaceState(ctx, req)
cancel()

Expand Down Expand Up @@ -138,7 +135,6 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint
StartId: startID,
Limit: limit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req)
cancel()

Expand Down

0 comments on commit 180ff57

Please sign in to comment.