Skip to content

Commit

Permalink
Merge branch 'master' into fix-tso-job3
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 26, 2024
2 parents 97163fb + 52b42b2 commit 7905290
Show file tree
Hide file tree
Showing 56 changed files with 1,101 additions and 2,265 deletions.
112 changes: 38 additions & 74 deletions client/client.go

Large diffs are not rendered by default.

51 changes: 0 additions & 51 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/testutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)

func TestMain(m *testing.M) {
Expand All @@ -43,36 +41,6 @@ func TestTSLessEqual(t *testing.T) {
re.True(tsoutil.TSLessEqual(9, 6, 9, 8))
}

func TestUpdateURLs(t *testing.T) {
re := require.New(t)
members := []*pdpb.Member{
{Name: "pd4", ClientUrls: []string{"tmp://pd4"}},
{Name: "pd1", ClientUrls: []string{"tmp://pd1"}},
{Name: "pd3", ClientUrls: []string{"tmp://pd3"}},
{Name: "pd2", ClientUrls: []string{"tmp://pd2"}},
}
getURLs := func(ms []*pdpb.Member) (urls []string) {
for _, m := range ms {
urls = append(urls, m.GetClientUrls()[0])
}
return
}
cli := &pdServiceDiscovery{option: opt.NewOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[2:])
re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[3:])
re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"

func TestClientCtx(t *testing.T) {
Expand All @@ -95,25 +63,6 @@ func TestClientWithRetry(t *testing.T) {
re.Less(time.Since(start), time.Second*10)
}

func TestGRPCDialOption(t *testing.T) {
re := require.New(t)
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
tlsCfg: nil,
option: opt.NewOption(),
}
cli.urls.Store([]string{testClientURL})
cli.option.GRPCDialOptions = []grpc.DialOption{grpc.WithBlock()}
err := cli.updateMember()
re.Error(err)
re.Greater(time.Since(start), 500*time.Millisecond)
}

func TestTsoRequestWait(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
32 changes: 32 additions & 0 deletions client/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package constants

const (
// DefaultKeyspaceID is the default keyspace ID.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized
// when PD bootstrap and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)
// MaxKeyspaceID is the maximum keyspace ID.
MaxKeyspaceID = uint32(0xFFFFFF)
// NullKeyspaceID is used for API v1 or legacy path where is keyspace agnostic.
NullKeyspaceID = uint32(0xFFFFFFFF)
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)
// DefaultKeyspaceName is the default keyspace name.
DefaultKeyspaceName = "DEFAULT"
)
14 changes: 14 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ const (
NotPrimaryErr = "not primary"
)

// internal errors
var (
// ErrUnmatchedClusterID is returned when found a PD with a different cluster ID.
ErrUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
// ErrFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
ErrFailInitClusterID = errors.New("[pd] failed to get cluster id")
// ErrClosing is returned when request is canceled when client is closing.
ErrClosing = errors.New("[pd] closing")
// ErrTSOLength is returned when the number of response timestamps is inconsistent with request.
ErrTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// ErrNoServiceModeReturned is returned when the response doesn't contain service mode info unexpectedly.
ErrNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
Expand Down
6 changes: 6 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// IsLeaderChange will determine whether there is a leader/primary change.
Expand All @@ -38,6 +39,11 @@ func IsLeaderChange(err error) bool {
strings.Contains(errMsg, NotPrimaryErr)
}

// IsNetworkError returns true if the error is a network error.
func IsNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
9 changes: 5 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"go.uber.org/zap"
)

Expand All @@ -39,7 +40,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Expand All @@ -55,7 +56,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
Expand All @@ -68,7 +69,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Expand All @@ -85,7 +86,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
Expand Down
12 changes: 6 additions & 6 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ type clientInner struct {
ctx context.Context
cancel context.CancelFunc

sd pd.ServiceDiscovery
sd sd.ServiceDiscovery

// source is used to mark the source of the client creation,
// it will also be used in the caller ID of the inner client.
Expand All @@ -74,7 +74,7 @@ func newClientInner(ctx context.Context, cancel context.CancelFunc, source strin
return &clientInner{ctx: ctx, cancel: cancel, source: source}
}

func (ci *clientInner) init(sd pd.ServiceDiscovery) {
func (ci *clientInner) init(sd sd.ServiceDiscovery) {
// Init the HTTP client if it's not configured.
if ci.cli == nil {
ci.cli = &http.Client{Timeout: defaultTimeout}
Expand Down Expand Up @@ -305,7 +305,7 @@ func WithMetrics(
// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
func NewClientWithServiceDiscovery(
source string,
sd pd.ServiceDiscovery,
sd sd.ServiceDiscovery,
opts ...ClientOption,
) Client {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -330,7 +330,7 @@ func NewClient(
for _, opt := range opts {
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := sd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down Expand Up @@ -430,7 +430,7 @@ func newClientWithMockServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := pd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
sd := sd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down
24 changes: 13 additions & 11 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand All @@ -23,7 +25,7 @@ const (
type innerClient struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery *pdServiceDiscovery
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
Expand All @@ -39,8 +41,8 @@ type innerClient struct {
option *opt.Option
}

func (c *innerClient) init(updateKeyspaceIDCb updateKeyspaceIDFunc) error {
c.pdSvcDiscovery = newPDServiceDiscovery(
func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
c.pdSvcDiscovery = sd.NewPDServiceDiscovery(
c.ctx, c.cancel, &c.wg, c.setServiceMode,
updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
Expand Down Expand Up @@ -82,14 +84,14 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
newTSOSvcDiscovery sd.ServiceDiscovery
)
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
c.ctx, c, c.pdSvcDiscovery,
c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
Expand Down Expand Up @@ -151,7 +153,7 @@ func (c *innerClient) close() {
c.pdSvcDiscovery.Close()

if c.tokenDispatcher != nil {
tokenErr := errors.WithStack(errClosing)
tokenErr := errors.WithStack(errs.ErrClosing)
c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr)
c.tokenDispatcher.dispatcherCancel()
}
Expand All @@ -160,7 +162,7 @@ func (c *innerClient) close() {
func (c *innerClient) setup() error {
// Init the metrics.
if c.option.InitMetrics {
initAndRegisterMetrics(c.option.MetricsLabels)
metrics.InitAndRegisterMetrics(c.option.MetricsLabels)
}

// Init the client base.
Expand All @@ -178,10 +180,10 @@ func (c *innerClient) setup() error {

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) {
var serviceClient ServiceClient
func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (sd.ServiceClient, context.Context) {
var serviceClient sd.ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind)
serviceClient = c.pdSvcDiscovery.GetServiceClientByKind(sd.UniversalAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
Expand All @@ -201,7 +203,7 @@ func (c *innerClient) gRPCErrorHandler(err error) {
}

func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.getLeaderURL())
cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.GetServingURL())
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7905290

Please sign in to comment.