From 207bf2a3ffd65af4d296fb32a90c71bf7a366d26 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Nov 2023 18:05:48 +0800 Subject: [PATCH] This is an automated cherry-pick of #7471 close tikv/pd#7469 Signed-off-by: ti-chi-bot --- client/client.go | 145 ++++++++++++++++++++++---------------- client/gc_client.go | 136 +++++++++++++++++++++++++++++++++++ client/keyspace_client.go | 47 +++++++++++- 3 files changed, 267 insertions(+), 61 deletions(-) create mode 100644 client/gc_client.go diff --git a/client/client.go b/client/client.go index 249caa4d149..35a3c3acd22 100644 --- a/client/client.go +++ b/client/client.go @@ -526,8 +526,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) req := &pdpb.GetMembersRequest{Header: c.requestHeader()} - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -586,6 +585,17 @@ func (c *client) getClient() pdpb.PDClient { return c.leaderClient() } +func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { + if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { + backupClientConn, addr := c.backupClientConn() + if backupClientConn != nil { + log.Debug("[pd] use follower client", zap.String("addr", addr)) + return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + } + } + return c.leaderClient(), ctx +} + func (c *client) GetTSAsync(ctx context.Context) TSFuture { return c.GetLocalTSAsync(ctx, globalDCLocation) } @@ -646,39 +656,6 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { return r } -func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - start := time.Now() - defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - - options := &GetRegionOp{} - for _, opt := range opts { - opt(options) - } - req := &pdpb.GetRegionRequest{ - Header: c.requestHeader(), - RegionKey: key, - NeedBuckets: options.needBuckets, - } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() - if protoClient == nil { - cancel() - return nil, errs.ErrClientGetProtoClient - } - resp, err := protoClient.GetRegion(ctx, req) - cancel() - - if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { - return nil, err - } - return handleRegionResponse(resp), nil -} - func isNetworkError(code codes.Code) bool { return code == codes.Unavailable || code == codes.DeadlineExceeded } @@ -721,6 +698,38 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs return handleRegionResponse(resp), nil } +func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + + options := &GetRegionOp{} + for _, opt := range opts { + opt(options) + } + req := &pdpb.GetRegionRequest{ + Header: c.requestHeader(), + RegionKey: key, + NeedBuckets: options.needBuckets, + } + protoClient, ctx := c.getClientAndContext(ctx) + if protoClient == nil { + cancel() + return nil, errs.ErrClientGetProtoClient + } + resp, err := protoClient.GetRegion(ctx, req) + cancel() + + if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + return nil, err + } + return handleRegionResponse(resp), nil +} + func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) @@ -739,8 +748,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio RegionKey: key, NeedBuckets: options.needBuckets, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -772,8 +780,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get RegionId: regionID, NeedBuckets: options.needBuckets, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -807,8 +814,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) EndKey: endKey, Limit: int32(limit), } - scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, scanCtx := c.getClientAndContext(scanCtx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -863,8 +869,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e Header: c.requestHeader(), StoreId: storeID, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -908,8 +913,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m Header: c.requestHeader(), ExcludeTombstoneStores: options.excludeTombstone, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -936,8 +940,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 Header: c.requestHeader(), SafePoint: safePoint, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -971,8 +974,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, TTL: ttl, SafePoint: safePoint, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -1004,8 +1006,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g RegionId: regionID, Group: group, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return errs.ErrClientGetProtoClient @@ -1049,8 +1050,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, RetryLimit: options.retryLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1072,8 +1072,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe Header: c.requestHeader(), RegionId: regionID, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1100,8 +1099,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R SplitKeys: splitKeys, RetryLimit: options.retryLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1130,8 +1128,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint RetryLimit: options.retryLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1179,7 +1176,13 @@ func trimHTTPPrefix(str string) string { } func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { +<<<<<<< HEAD protoClient := c.getClient() +======= + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) if protoClient == nil { return nil, 0, errs.ErrClientGetProtoClient } @@ -1208,7 +1211,13 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items for i, it := range items { resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad} } +<<<<<<< HEAD protoClient := c.getClient() +======= + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) if protoClient == nil { return errs.ErrClientGetProtoClient } @@ -1223,7 +1232,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // TODO: Add retry mechanism // register watch components there globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) +<<<<<<< HEAD protoClient := c.getClient() +======= + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) if protoClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1269,7 +1284,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis } func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { +<<<<<<< HEAD protoClient := c.getClient() +======= + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) if protoClient == nil { return 0, errs.ErrClientGetProtoClient } @@ -1287,7 +1308,13 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { } func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { +<<<<<<< HEAD protoClient := c.getClient() +======= + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) if protoClient == nil { return errs.ErrClientGetProtoClient } diff --git a/client/gc_client.go b/client/gc_client.go new file mode 100644 index 00000000000..fff292405c2 --- /dev/null +++ b/client/gc_client.go @@ -0,0 +1,136 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "go.uber.org/zap" +) + +// GCClient is a client for doing GC +type GCClient interface { + UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) + UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) + WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) +} + +// UpdateGCSafePointV2 update gc safe point for the given keyspace. +func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &pdpb.UpdateGCSafePointV2Request{ + Header: c.requestHeader(), + KeyspaceId: keyspaceID, + SafePoint: safePoint, + } + protoClient, ctx := c.getClientAndContext(ctx) + if protoClient == nil { + cancel() + return 0, errs.ErrClientGetProtoClient + } + resp, err := protoClient.UpdateGCSafePointV2(ctx, req) + cancel() + + if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil { + return 0, err + } + return resp.GetNewSafePoint(), nil +} + +// UpdateServiceSafePointV2 update service safe point for the given keyspace. +func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &pdpb.UpdateServiceSafePointV2Request{ + Header: c.requestHeader(), + KeyspaceId: keyspaceID, + ServiceId: []byte(serviceID), + SafePoint: safePoint, + Ttl: ttl, + } + protoClient, ctx := c.getClientAndContext(ctx) + if protoClient == nil { + cancel() + return 0, errs.ErrClientGetProtoClient + } + resp, err := protoClient.UpdateServiceSafePointV2(ctx, req) + cancel() + if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil { + return 0, err + } + return resp.GetMinSafePoint(), nil +} + +// WatchGCSafePointV2 watch gc safe point change. +func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) { + SafePointEventsChan := make(chan []*pdpb.SafePointEvent) + req := &pdpb.WatchGCSafePointV2Request{ + Header: c.requestHeader(), + Revision: revision, + } + + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + protoClient, ctx := c.getClientAndContext(ctx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + stream, err := protoClient.WatchGCSafePointV2(ctx, req) + if err != nil { + close(SafePointEventsChan) + return nil, err + } + go func() { + defer func() { + close(SafePointEventsChan) + if r := recover(); r != nil { + log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r)) + return + } + }() + for { + select { + case <-ctx.Done(): + return + default: + resp, err := stream.Recv() + if err != nil { + log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err)) + return + } + SafePointEventsChan <- resp.GetEvents() + } + } + }() + return SafePointEventsChan, err +} diff --git a/client/keyspace_client.go b/client/keyspace_client.go index 218ac404d2f..725a320734b 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -21,9 +21,12 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" +<<<<<<< HEAD "github.com/pingcap/log" "github.com/tikv/pd/client/grpcutil" "go.uber.org/zap" +======= +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471)) ) // KeyspaceClient manages keyspace metadata. @@ -57,7 +60,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key Header: c.requestHeader(), Name: name, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.keyspaceClient().LoadKeyspace(ctx, req) cancel() @@ -136,7 +138,6 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp Id: id, State: state, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.keyspaceClient().UpdateKeyspaceState(ctx, req) cancel() @@ -153,3 +154,45 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp return resp.Keyspace, nil } +<<<<<<< HEAD +======= + +// WatchKeyspaces watches keyspace meta changes. +// It returns a stream of slices of keyspace metadata. +// The first message in stream contains all current keyspaceMeta, +// all subsequent messages contains new put events for all keyspaces. +func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) { + return nil, errors.Errorf("WatchKeyspaces unimplemented") +} + +// GetAllKeyspaces get all keyspaces metadata. +func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &keyspacepb.GetAllKeyspacesRequest{ + Header: c.requestHeader(), + StartId: startID, + Limit: limit, + } + resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req) + cancel() + + if err != nil { + cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + c.pdSvcDiscovery.ScheduleCheckMemberChanged() + return nil, err + } + + if resp.Header.GetError() != nil { + cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String()) + } + + return resp.Keyspaces, nil +} +>>>>>>> 180ff57af (client: avoid to add redundant grpc metadata (#7471))