Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: extract conn and replace GetAllocator #6557

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mock/mockid"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
Expand Down Expand Up @@ -99,9 +98,9 @@ func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
// AllocID returns a new unique ID.
func (mc *Cluster) AllocID() (uint64, error) {
return mc.IDAllocator.Alloc()
}

// GetPersistOptions returns the persist options.
Expand Down Expand Up @@ -185,7 +184,7 @@ func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.GetAllocator().Alloc()
peerID, err := mc.AllocID()
if err != nil {
log.Error("failed to alloc peer", errs.ZapError(err))
return nil, err
Expand Down Expand Up @@ -358,7 +357,7 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea
}
mc.AddRegionStore(storeID, regionCount)
for i := 0; i < leaderCount; i++ {
id, _ := mc.GetAllocator().Alloc()
id, _ := mc.AllocID()
mc.AddLeaderRegion(id, storeID)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error {
m.Lock()
defer m.Unlock()

id, err := m.cluster.GetAllocator().Alloc()
id, err := m.cluster.AllocID()
if err != nil {
log.Warn("failed to switch to async wait state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -257,7 +257,7 @@ func (m *ModeManager) drSwitchToAsync(availableStores []uint64) error {
}

func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
id, err := m.cluster.GetAllocator().Alloc()
id, err := m.cluster.AllocID()
if err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -280,7 +280,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error {
}

func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
id, err := m.cluster.GetAllocator().Alloc()
id, err := m.cluster.AllocID()
if err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -301,7 +301,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
func (m *ModeManager) drSwitchToSync() error {
m.Lock()
defer m.Unlock()
id, err := m.cluster.GetAllocator().Alloc()
id, err := m.cluster.AllocID()
if err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package core

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/id"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -49,7 +48,7 @@ type ScheduleCluster interface {
GetRegionLabeler() *labeler.RegionLabeler
GetBasicCluster() *core.BasicCluster
GetStoreConfig() sc.StoreConfig
GetAllocator() id.Allocator
AllocID() (uint64, error)
}

// BasicCluster is an aggregate interface that wraps multiple interfaces
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (b *Builder) prepareBuild() (string, error) {
if o == nil || (!b.useJointConsensus && !core.IsLearner(o) && core.IsLearner(n)) {
if n.GetId() == 0 {
// Allocate peer ID if need.
id, err := b.GetAllocator().Alloc()
id, err := b.AllocID()
if err != nil {
return "", err
}
Expand Down
57 changes: 4 additions & 53 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand All @@ -29,17 +28,10 @@ import (
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)

const (
keepaliveTime = 10 * time.Second
keepaliveTimeout = 3 * time.Second
)

// StopSyncWithLeader stop to sync the region with leader.
func (s *RegionSyncer) StopSyncWithLeader() {
s.reset()
Expand All @@ -56,38 +48,6 @@ func (s *RegionSyncer) reset() {
s.mu.clientCancel, s.mu.clientCtx = nil, nil
}

func (s *RegionSyncer) establish(ctx context.Context, addr string) (*grpc.ClientConn, error) {
tlsCfg, err := s.tlsConfig.ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(
ctx,
addr,
tlsCfg,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second, // Default was 1s.
Multiplier: 1.6, // Default
Jitter: 0.2, // Default
MaxDelay: 3 * time.Second, // Default was 120s.
},
MinConnectTimeout: 5 * time.Second,
}),
// WithBlock will block the dial step until success or cancel the context.
grpc.WithBlock(),
)
if err != nil {
return nil, errors.WithStack(err)
}
return cc, nil
}

func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) {
cli := pdpb.NewPDClient(conn)
syncStream, err := cli.SyncRegions(ctx)
Expand Down Expand Up @@ -131,19 +91,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Warn("failed to load regions", errs.ZapError(err))
}
// establish client.
var conn *grpc.ClientConn
for {
select {
case <-ctx.Done():
return
default:
}
conn, err = s.establish(ctx, addr)
if err != nil {
log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
continue
}
break
conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig)
// it means the context is canceled.
if conn == nil {
return
}
defer conn.Close()

Expand Down
1 change: 0 additions & 1 deletion pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
)

const (
msgSize = 8 * units.MiB
defaultBucketRate = 20 * units.MiB // 20MB/s
defaultBucketCapacity = 20 * units.MiB // 20MB
maxSyncRegionBatchSize = 100
Expand Down
7 changes: 3 additions & 4 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -108,7 +107,7 @@ type cluster interface {
core.StoreSetInformer

DropCacheAllRegion()
GetAllocator() id.Allocator
AllocID() (uint64, error)
BuryStore(storeID uint64, forceBury bool) error
GetPersistOptions() *config.PersistOptions
}
Expand Down Expand Up @@ -1135,11 +1134,11 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree,
hasPlan := false

createRegion := func(startKey, endKey []byte, storeID uint64) (*metapb.Region, error) {
regionID, err := u.cluster.GetAllocator().Alloc()
regionID, err := u.cluster.AllocID()
if err != nil {
return nil, err
}
peerID, err := u.cluster.GetAllocator().Alloc()
peerID, err := u.cluster.AllocID()
if err != nil {
return nil, err
}
Expand Down
69 changes: 67 additions & 2 deletions pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@ import (
"crypto/tls"
"crypto/x509"
"net/url"
"time"

"github.com/docker/go-units"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

// ForwardMetadataKey is used to record the forwarded host of PD.
const ForwardMetadataKey = "pd-forwarded-host"
const (
// ForwardMetadataKey is used to record the forwarded host of PD.
ForwardMetadataKey = "pd-forwarded-host"
msgSize = 8 * units.MiB
keepaliveTime = 10 * time.Second
keepaliveTimeout = 3 * time.Second
)

// TLSConfig is the configuration for supporting tls.
type TLSConfig struct {
Expand Down Expand Up @@ -160,3 +171,57 @@ func GetForwardedHost(ctx context.Context) string {
}
return ""
}

func establish(ctx context.Context, addr string, tlsConfig *TLSConfig) (*grpc.ClientConn, error) {
tlsCfg, err := tlsConfig.ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := GetClientConn(
ctx,
addr,
tlsCfg,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second, // Default was 1s.
Multiplier: 1.6, // Default
Jitter: 0.2, // Default
MaxDelay: 3 * time.Second, // Default was 120s.
},
MinConnectTimeout: 5 * time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we've extracted conn, how about extracting these parameters as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds great

}),
// WithBlock will block the dial step until success or cancel the context.
grpc.WithBlock(),
)
if err != nil {
return nil, errors.WithStack(err)
}
return cc, nil
}

// CreateClientConn creates a client connection to the given target.
func CreateClientConn(ctx context.Context, addr string, tlsConfig *TLSConfig) *grpc.ClientConn {
var (
conn *grpc.ClientConn
err error
)
for {
select {
case <-ctx.Done():
return nil
default:
}
conn, err = establish(ctx, addr, tlsConfig)
if err != nil {
log.Error("cannot establish connection", zap.String("addr", addr), errs.ZapError(err))
continue
}
break
}
return conn
}
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,9 +707,9 @@ func (c *RaftCluster) PauseOrResumeChecker(name string, t int64) error {
return c.coordinator.PauseOrResumeChecker(name, t)
}

// GetAllocator returns cluster's id allocator.
func (c *RaftCluster) GetAllocator() id.Allocator {
return c.id
// AllocID returns a global unique ID.
func (c *RaftCluster) AllocID() (uint64, error) {
return c.id.Alloc()
}

// GetRegionSyncer returns the region syncer.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind oper
}

func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
id, err := c.GetAllocator().Alloc()
id, err := c.AllocID()
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,23 +294,23 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus
re.NoError(err)
re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())

rc.GetAllocator().Alloc()
id, err := rc.GetAllocator().Alloc()
rc.AllocID()
id, err := rc.AllocID()
re.NoError(err)
// Put new store with a duplicated address when old store is up will fail.
resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id)))
re.NoError(err)
re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType())

id, err = rc.GetAllocator().Alloc()
id, err = rc.AllocID()
re.NoError(err)
// Put new store with a duplicated address when old store is offline will fail.
resetStoreState(re, rc, store.GetId(), metapb.StoreState_Offline)
resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id)))
re.NoError(err)
re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType())

id, err = rc.GetAllocator().Alloc()
id, err = rc.AllocID()
re.NoError(err)
// Put new store with a duplicated address when old store is tombstone is OK.
resetStoreState(re, rc, store.GetId(), metapb.StoreState_Tombstone)
Expand All @@ -319,7 +319,7 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus
re.NoError(err)
re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())

id, err = rc.GetAllocator().Alloc()
id, err = rc.AllocID()
re.NoError(err)
deployPath := getTestDeployPath(id)
// Put a new store.
Expand Down