Skip to content

Commit

Permalink
Merge branch 'master' into support_hb_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 29, 2024
2 parents 37d30f6 + 11da622 commit 1dfcbfc
Show file tree
Hide file tree
Showing 306 changed files with 7,401 additions and 3,880 deletions.
23 changes: 17 additions & 6 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,23 @@ jobs:
- worker_id: 2
name: 'Unit Test(2)'
- worker_id: 3
name: 'Tools Test'
name: 'Unit Test(3)'
- worker_id: 4
name: 'Client Integration Test'
name: 'Tests(1)'
- worker_id: 5
name: 'TSO Integration Test'
name: 'Tests(2)'
- worker_id: 6
name: 'MicroService Integration Test'
name: 'Tools Test'
- worker_id: 7
name: 'Client Integration Test'
- worker_id: 8
name: 'TSO Integration Test'
- worker_id: 9
name: 'MicroService Integration(!TSO)'
- worker_id: 10
name: 'MicroService Integration(TSO)'
outputs:
job-total: 6
job-total: 10
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -52,6 +60,9 @@ jobs:
run: |
make ci-test-job JOB_INDEX=$WORKER_ID
mv covprofile covprofile_$WORKER_ID
if [ -f junitfile ]; then
cat junitfile
fi
- name: Upload coverage result ${{ matrix.worker_id }}
uses: actions/upload-artifact@v4
with:
Expand All @@ -77,7 +88,7 @@ jobs:
# only keep the first line(`mode: aomic`) of the coverage profile
sed -i '2,${/mode: atomic/d;}' covprofile
- name: Send coverage
uses: codecov/codecov-action@v4.2.0
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV }}
file: ./covprofile
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ docs/swagger/*
*.before
covprofile
coverage.tmp
junitfile
package.list
report.xml
coverage.xml
coverage
*.txt
go.work*
embedded_assets_handler.go
*.log
19 changes: 17 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ linters:
- testifylint
- gofmt
- revive
disable:
- errcheck
linters-settings:
gocritic:
Expand All @@ -30,7 +29,6 @@ linters-settings:
excludes:
- G402
- G404
- G601
testifylint:
enable:
- bool-compare
Expand Down Expand Up @@ -199,3 +197,20 @@ linters-settings:
severity: warning
disabled: false
exclude: [""]
errcheck:
exclude-functions:
- (*github.com/unrolled/render.Render).JSON
- (*github.com/unrolled/render.Render).Data
- (*github.com/unrolled/render.Render).Text
- (net/http.ResponseWriter).Write
- github.com/pingcap/log.Sync
- (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
linters:
- errcheck
# following path will enable in the future
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pd-analysis:
pd-heartbeat-bench:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench pd-heartbeat-bench/main.go
simulator:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_CGO_ENABLED) go build $(BUILD_FLAGS) -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
regions-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
Expand Down Expand Up @@ -228,7 +228,7 @@ failpoint-disable: install-tools
ut: pd-ut
@$(FAILPOINT_ENABLE)
# only run unit tests
./bin/pd-ut run --ignore tests --race
./bin/pd-ut run --ignore tests --race --junitfile ./junitfile
@$(CLEAN_UT_BINARY)
@$(FAILPOINT_DISABLE)

Expand All @@ -254,7 +254,7 @@ basic-test: install-tools

ci-test-job: install-tools dashboard-ui pd-ut
@$(FAILPOINT_ENABLE)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
./scripts/ci-subtask.sh $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand All @@ -280,6 +280,7 @@ test-tso-consistency: install-tools
REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster

test-real-cluster:
@ rm -rf ~/.tiup/data/pd_real_cluster_test
# testing with the real cluster...
cd $(REAL_CLUSTER_TEST_PATH) && $(MAKE) check

Expand Down
6 changes: 6 additions & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Sort the member alphabetically.
aliases:
sig-critical-approvers-config:
- easonn7
- kevin-xianliu
- niubell
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ If you're interested in contributing to PD, see [CONTRIBUTING.md](./CONTRIBUTING

## Build

1. Make sure [*Go*](https://golang.org/) (version 1.20) is installed.
2. Use `make` to install PD. PD is installed in the `bin` directory.
1. Make sure [*Go*](https://golang.org/) (version 1.21) is installed.
2. Use `make` to install PD. `pd-server` will be installed in the `bin` directory.

## Usage

Expand Down
150 changes: 136 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package pd
import (
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand Down Expand Up @@ -85,11 +87,18 @@ type RPCClient interface {
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -205,8 +214,9 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -222,6 +232,11 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -259,6 +274,15 @@ func WithForwardingOption(enableForwarding bool) ClientOption {
}
}

// WithTSOServerProxyOption configures the client to use TSO server proxy,
// i.e., the client will send TSO requests to the API leader (the TSO server
// proxy) which will forward the requests to the TSO servers.
func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
return func(c *client) {
c.option.useTSOServerProxy = useTSOServerProxy
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
Expand Down Expand Up @@ -337,6 +361,38 @@ type SecurityOption struct {
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
svrAddrs []string, security SecurityOption, opts ...ClientOption,
Expand Down Expand Up @@ -607,6 +663,11 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if c.option.useTSOServerProxy {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}

if newMode == c.serviceMode {
return
}
Expand Down Expand Up @@ -1094,6 +1155,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
Expand All @@ -1103,6 +1165,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err = protoClient.ScanRegions(cctx, req)
}

Expand All @@ -1113,6 +1176,75 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
return handleRegionsResponse(resp), nil
}

func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
if _, ok := ctx.Deadline(); !ok {
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
pbRanges := make([]*pdpb.KeyRange, 0, len(ranges))
for _, r := range ranges {
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
ContainAllKeyRange: options.outputMustContainAllKeyRange,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.BatchScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
}
regions = append(regions, region)
}
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
if len(resp.GetRegions()) == 0 {
Expand All @@ -1131,6 +1263,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
Expand Down Expand Up @@ -1431,17 +1564,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
Loading

0 comments on commit 1dfcbfc

Please sign in to comment.