Skip to content

Commit

Permalink
Merge branch 'master' into simulator_add_node
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jun 21, 2024
2 parents 8cd1cdd + 3b051d7 commit 5c83eae
Show file tree
Hide file tree
Showing 79 changed files with 1,608 additions and 944 deletions.
13 changes: 12 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ linters:
- testifylint
- gofmt
- revive
disable:
- errcheck
linters-settings:
gocritic:
Expand Down Expand Up @@ -199,3 +198,15 @@ linters-settings:
severity: warning
disabled: false
exclude: [""]
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
linters:
- errcheck
# following path will enable in the future
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|server/api/.*\.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/schedule/schedulers/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
114 changes: 113 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package pd
import (
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand Down Expand Up @@ -85,11 +87,18 @@ type RPCClient interface {
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -337,6 +346,38 @@ type SecurityOption struct {
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
svrAddrs []string, security SecurityOption, opts ...ClientOption,
Expand Down Expand Up @@ -1094,6 +1135,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
Expand All @@ -1103,6 +1145,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err = protoClient.ScanRegions(cctx, req)
}

Expand All @@ -1113,6 +1156,74 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
return handleRegionsResponse(resp), nil
}

func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
if _, ok := ctx.Deadline(); !ok {
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
pbRanges := make([]*pdpb.KeyRange, 0, len(ranges))
for _, r := range ranges {
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.BatchScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
}
regions = append(regions, region)
}
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
if len(resp.GetRegions()) == 0 {
Expand All @@ -1131,6 +1242,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
39 changes: 6 additions & 33 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
package http

import (
"encoding/hex"
"encoding/json"
"fmt"
"net/url"
"time"

"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
pd "github.com/tikv/pd/client"
)

// ClusterState saves some cluster state information.
Expand All @@ -43,37 +42,11 @@ type State struct {
StartTimestamp int64 `json:"start_timestamp"`
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
startKey []byte
endKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}
// KeyRange alias pd.KeyRange to avoid break client compatibility.
type KeyRange = pd.KeyRange

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.startKey))
endKeyStr = url.QueryEscape(string(r.endKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.startKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.endKey))
return
}
// NewKeyRange alias pd.NewKeyRange to avoid break client compatibility.
var NewKeyRange = pd.NewKeyRange

// NOTICE: the structures below are copied from the PD API definitions.
// Please make sure the consistency if any change happens to the PD API.
Expand Down Expand Up @@ -366,7 +339,7 @@ func (r *Rule) String() string {
// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
_ = json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
Expand Down
27 changes: 27 additions & 0 deletions client/http/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,30 @@ func mustMarshalAndUnmarshalRuleOp(re *require.Assertions, ruleOp *RuleOp) *Rule
re.NoError(err)
return newRuleOp
}

// startKey and endKey are json:"-" which means cannot be Unmarshal from json
// We need to take care of `Clone` method.
func TestRuleKeyClone(t *testing.T) {
re := require.New(t)
r := &Rule{
StartKey: []byte{1, 2, 3},
EndKey: []byte{4, 5, 6},
}

clone := r.Clone()
// Modify the original rule
r.StartKey[0] = 9
r.EndKey[0] = 9

// The clone should not be affected
re.Equal([]byte{1, 2, 3}, clone.StartKey)
re.Equal([]byte{4, 5, 6}, clone.EndKey)

// Modify the clone
clone.StartKey[0] = 8
clone.EndKey[0] = 8

// The original rule should not be affected
re.Equal([]byte{9, 2, 3}, r.StartKey)
re.Equal([]byte{9, 5, 6}, r.EndKey)
}
4 changes: 4 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ var (
cmdDurationGetPrevRegion prometheus.Observer
cmdDurationGetRegionByID prometheus.Observer
cmdDurationScanRegions prometheus.Observer
cmdDurationBatchScanRegions prometheus.Observer
cmdDurationGetStore prometheus.Observer
cmdDurationGetAllStores prometheus.Observer
cmdDurationUpdateGCSafePoint prometheus.Observer
Expand All @@ -151,6 +152,7 @@ var (
cmdFailDurationGetPrevRegion prometheus.Observer
cmdFailedDurationGetRegionByID prometheus.Observer
cmdFailedDurationScanRegions prometheus.Observer
cmdFailedDurationBatchScanRegions prometheus.Observer
cmdFailedDurationGetStore prometheus.Observer
cmdFailedDurationGetAllStores prometheus.Observer
cmdFailedDurationUpdateGCSafePoint prometheus.Observer
Expand All @@ -174,6 +176,7 @@ func initCmdDurations() {
cmdDurationGetPrevRegion = cmdDuration.WithLabelValues("get_prev_region")
cmdDurationGetRegionByID = cmdDuration.WithLabelValues("get_region_byid")
cmdDurationScanRegions = cmdDuration.WithLabelValues("scan_regions")
cmdDurationBatchScanRegions = cmdDuration.WithLabelValues("batch_scan_regions")
cmdDurationGetStore = cmdDuration.WithLabelValues("get_store")
cmdDurationGetAllStores = cmdDuration.WithLabelValues("get_all_stores")
cmdDurationUpdateGCSafePoint = cmdDuration.WithLabelValues("update_gc_safe_point")
Expand All @@ -197,6 +200,7 @@ func initCmdDurations() {
cmdFailDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region")
cmdFailedDurationGetRegionByID = cmdFailedDuration.WithLabelValues("get_region_byid")
cmdFailedDurationScanRegions = cmdFailedDuration.WithLabelValues("scan_regions")
cmdFailedDurationBatchScanRegions = cmdFailedDuration.WithLabelValues("batch_scan_regions")
cmdFailedDurationGetStore = cmdFailedDuration.WithLabelValues("get_store")
cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores")
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLoca
func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
callback(url)
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
}
}
c.tsoGlobalAllocLeaderUpdatedCb = callback
}
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
// If the stream is nil or the leader has changed, try to reconnect.
if toReconnect {
connection.reset()
c.tryResourceManagerConnect(dispatcherCtx, &connection)
if err := c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil {
log.Error("[resource_manager] try to connect token leader failed", errs.ZapError(err))
}
log.Info("[resource_manager] token leader may change, try to reconnect the stream")
stream, streamCtx = connection.stream, connection.ctx
}
Expand Down
4 changes: 3 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (c *tsoClient) getOption() *option { return c.option }
func (c *tsoClient) getServiceDiscovery() ServiceDiscovery { return c.svcDiscovery }

func (c *tsoClient) setup() {
c.svcDiscovery.CheckMemberChanged()
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
log.Warn("[tso] failed to check member changed", errs.ZapError(err))
}
c.updateTSODispatcher()

// Start the daemons.
Expand Down
8 changes: 6 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in
// a primary/secondary configured cluster.
func (c *tsoServiceDiscovery) CheckMemberChanged() error {
c.apiSvcDiscovery.CheckMemberChanged()
if err := c.apiSvcDiscovery.CheckMemberChanged(); err != nil {
log.Warn("[tso] failed to check member changed", errs.ZapError(err))
}
if err := c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember); err != nil {
log.Error("[tso] failed to update member", errs.ZapError(err))
return err
Expand All @@ -366,7 +368,9 @@ func (c *tsoServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLoc
func (c *tsoServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
url := c.getPrimaryURL()
if len(url) > 0 {
callback(url)
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
}
}
c.globalAllocPrimariesUpdatedCb = callback
}
Expand Down
Loading

0 comments on commit 5c83eae

Please sign in to comment.