Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: use ServiceClient for discovery #7611

Merged
merged 4 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 6 additions & 97 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,22 @@ package pd
import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -217,9 +210,6 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -316,7 +306,6 @@ type client struct {

// For internal usage.
updateTokenConnectionCh chan struct{}
leaderNetworkFailure int32

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -575,10 +564,6 @@ func (c *client) setup() error {

// Create dispatchers
c.createTokenDispatcher()

// Start the daemons.
c.wg.Add(1)
go c.leaderCheckLoop()
return nil
}

Expand Down Expand Up @@ -719,46 +704,6 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error {
return nil
}

func (c *client) leaderCheckLoop() {
defer c.wg.Done()

leaderCheckLoopCtx, leaderCheckLoopCancel := context.WithCancel(c.ctx)
defer leaderCheckLoopCancel()

ticker := time.NewTicker(LeaderHealthCheckInterval)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.checkLeaderHealth(leaderCheckLoopCtx)
}
}
}

func (c *client) checkLeaderHealth(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
Expand All @@ -778,50 +723,14 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
return resp.GetMembers(), nil
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
return pdpb.NewPDClient(client)
}
return nil
}

// backupClientConn gets a grpc client connection of the current reachable and healthy
// backup service endpoints randomly. Backup service endpoints are followers in a
// quorum-based cluster or secondaries in a primary/secondary configured cluster.
func (c *client) backupClientConn() (*grpc.ClientConn, string) {
addrs := c.pdSvcDiscovery.GetBackupAddrs()
if len(addrs) < 1 {
return nil, ""
}
var (
cc *grpc.ClientConn
err error
)
for i := 0; i < len(addrs); i++ {
addr := addrs[rand.Intn(len(addrs))]
if cc, err = c.pdSvcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
return cc, addr
}
}
return nil, ""
}

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
}
serviceClient := c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
return nil, ctx
}
return c.leaderClient(), ctx
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
Expand Down
Loading
Loading