Skip to content

Commit

Permalink
Merge branch 'master' into update-hb-bench
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 24, 2024
2 parents 0634288 + 1a38582 commit 886b3bf
Show file tree
Hide file tree
Showing 22 changed files with 383 additions and 167 deletions.
6 changes: 6 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ func createClientWithKeyspace(
nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
c.pdSvcDiscovery.Close()
}
return nil, err
}

Expand Down Expand Up @@ -522,6 +525,9 @@ func newClientWithKeyspaceName(
clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
c.pdSvcDiscovery.Close()
}
return nil, err
}
log.Info("[pd] create pd client with endpoints and keyspace",
Expand Down
46 changes: 30 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"io"
"net/http"
"os"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -67,6 +66,8 @@ type clientInner struct {

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
// defaultSD indicates whether the client is created with the default service discovery.
defaultSD bool
}

func newClientInner(ctx context.Context, cancel context.CancelFunc, source string) *clientInner {
Expand All @@ -91,6 +92,10 @@ func (ci *clientInner) close() {
if ci.cli != nil {
ci.cli.CloseIdleConnections()
}
// only close the service discovery if it's created by the client.
if ci.defaultSD && ci.sd != nil {
ci.sd.Close()
}
}

func (ci *clientInner) reqCounter(name, status string) {
Expand Down Expand Up @@ -270,21 +275,6 @@ func WithMetrics(
}
}

// WithLoggerRedirection configures the client with the given logger redirection.
func WithLoggerRedirection(logLevel, fileName string) ClientOption {
cfg := &log.Config{}
cfg.Level = logLevel
if fileName != "" {
f, _ := os.CreateTemp(".", fileName)
fname := f.Name()
f.Close()
cfg.File.Filename = fname
}
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return func(c *client) {}
}

// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
func NewClientWithServiceDiscovery(
source string,
Expand Down Expand Up @@ -314,7 +304,12 @@ func NewClient(
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
return nil
}
c.inner.init(sd)
c.inner.defaultSD = true
return c
}

Expand Down Expand Up @@ -371,6 +366,7 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
headerOpts...)
}

/* The following functions are only for test */
// requestChecker is used to check the HTTP request sent by the client.
type requestChecker func(req *http.Request) error

Expand All @@ -385,3 +381,21 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
Transport: checker,
}
}

// newClientWithoutInitServiceDiscovery creates a PD HTTP client
// with the given PD addresses and TLS config without init service discovery.
func newClientWithoutInitServiceDiscovery(
source string,
pdAddrs []string,
opts ...ClientOption,
) Client {
ctx, cancel := context.WithCancel(context.Background())
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
c.inner.init(sd)
return c
}
6 changes: 3 additions & 3 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
}
return nil
})
c := NewClient("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c := newClientWithoutInitServiceDiscovery("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c.GetRegions(context.Background())
c.GetHistoryHotRegions(context.Background(), &HistoryHotRegionsRequest{})
c.Close()
Expand All @@ -58,7 +58,7 @@ func TestCallerID(t *testing.T) {
}
return nil
})
c := NewClient("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c := newClientWithoutInitServiceDiscovery("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c.GetRegions(context.Background())
expectedVal.Store("test")
c.WithCallerID(expectedVal.Load()).GetRegions(context.Background())
Expand All @@ -69,7 +69,7 @@ func TestWithBackoffer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := NewClient("test-with-backoffer", []string{"http://127.0.0.1"})
c := newClientWithoutInitServiceDiscovery("test-with-backoffer", []string{"http://127.0.0.1"})

base := 100 * time.Millisecond
max := 500 * time.Millisecond
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,9 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
return errors.WithStack(errNoServiceModeReturned)
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (suite *serviceClientTestSuite) TearDownTest() {
func (suite *serviceClientTestSuite) TearDownSuite() {
suite.leaderServer.grpcServer.GracefulStop()
suite.followerServer.grpcServer.GracefulStop()
suite.leaderClient.GetClientConn().Close()
suite.followerClient.GetClientConn().Close()
suite.clean()
}

Expand Down
5 changes: 0 additions & 5 deletions client/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ var LeakOptions = []goleak.Option{
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).createTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*Server).handleRawConn"),
// TODO: remove the below options once we fixed the http connection leak problems
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Server).keepalive"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
}
14 changes: 9 additions & 5 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
// https://pkg.go.dev/time@master#Timer.Reset
timer.Reset(l.leaseTimeout)
case <-timer.C:
log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose))
log.Info("keep alive lease too slow", zap.Duration("timeout-duration", l.leaseTimeout), zap.Time("actual-expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose))
return
case <-ctx.Done():
return
Expand All @@ -154,11 +154,14 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c

log.Info("start lease keep alive worker", zap.Duration("interval", interval), zap.String("purpose", l.Purpose))
defer log.Info("stop lease keep alive worker", zap.String("purpose", l.Purpose))

lastTime := time.Now()
for {
go func() {
start := time.Now()
if start.Sub(lastTime) > interval*2 {
log.Warn("the interval between keeping alive lease is too long", zap.Time("last-time", lastTime))
}
go func(start time.Time) {
defer logutil.LogPanic()
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, l.leaseTimeout)
defer cancel()
var leaseID clientv3.LeaseID
Expand All @@ -180,12 +183,13 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c
} else {
log.Error("keep alive response ttl is zero", zap.String("purpose", l.Purpose))
}
}()
}(start)

select {
case <-ctx.Done():
return
case <-ticker.C:
lastTime = start
}
}
}()
Expand Down
13 changes: 11 additions & 2 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,21 +266,30 @@ func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {

// Patch updates multiple region rules in a batch.
func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
// setRulesMap is used to solve duplicate entries in DeleteRules and SetRules.
// Note: We maintain compatibility with the previous behavior, which is to process DeleteRules before SetRules
// If there are duplicate rules, we will prioritize SetRules and select the last one from SetRules.
setRulesMap := make(map[string]*LabelRule)

for _, rule := range patch.SetRules {
if err := rule.checkAndAdjust(); err != nil {
return err
}
setRulesMap[rule.ID] = rule
}

// save to storage
var batch []func(kv.Txn) error
for _, key := range patch.DeleteRules {
if _, ok := setRulesMap[key]; ok {
continue
}
localKey := key
batch = append(batch, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, localKey)
})
}
for _, rule := range patch.SetRules {
for _, rule := range setRulesMap {
localID, localRule := rule.ID, rule
batch = append(batch, func(txn kv.Txn) error {
return l.storage.SaveRegionRule(txn, localID, localRule)
Expand All @@ -297,7 +306,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
for _, key := range patch.DeleteRules {
delete(l.labelRules, key)
}
for _, rule := range patch.SetRules {
for _, rule := range setRulesMap {
l.labelRules[rule.ID] = rule
}
l.BuildRangeListLocked()
Expand Down
37 changes: 34 additions & 3 deletions pkg/schedule/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
)

func TestAdjustRule(t *testing.T) {
Expand Down Expand Up @@ -138,11 +140,18 @@ func TestGetSetRule(t *testing.T) {
labeler.DeleteLabelRule(r.ID)
}
re.Empty(labeler.GetAllLabelRules())
}

func TestTxnWithEtcd(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
store := storage.NewStorageWithEtcdBackend(client, "")
labeler, err := NewRegionLabeler(context.Background(), store, time.Millisecond*10)
re.NoError(err)
// test patch rules in batch
rulesNum := 200
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
patch := LabelRulePatch{}
for i := 1; i <= rulesNum; i++ {
patch.SetRules = append(patch.SetRules, &LabelRule{
ID: fmt.Sprintf("rule_%d", i),
Expand All @@ -155,7 +164,7 @@ func TestGetSetRule(t *testing.T) {
}
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
allRules := labeler.GetAllLabelRules()
re.Len(allRules, rulesNum)
sort.Slice(allRules, func(i, j int) bool {
i1, err := strconv.Atoi(allRules[i].ID[5:])
Expand All @@ -176,6 +185,28 @@ func TestGetSetRule(t *testing.T) {
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Empty(allRules)

// test patch rules in batch with duplicated rule id
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
for i := 0; i <= 3; i++ {
patch.SetRules = append(patch.SetRules, &LabelRule{
ID: "rule_1",
Labels: []RegionLabel{
{Key: fmt.Sprintf("k_%d", i), Value: fmt.Sprintf("v_%d", i)},
},
RuleType: "key-range",
Data: MakeKeyRanges("", ""),
})
}
patch.DeleteRules = append(patch.DeleteRules, "rule_1")
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Len(allRules, 1)
re.Equal("rule_1", allRules[0].ID)
re.Len(allRules[0].Labels, 1)
re.Equal("k_3", allRules[0].Labels[0].Key)
}

func TestIndex(t *testing.T) {
Expand Down
Loading

0 comments on commit 886b3bf

Please sign in to comment.