diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index efb13797819..051856abf81 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" @@ -99,9 +98,9 @@ func (mc *Cluster) GetStorage() storage.Storage { return mc.Storage } -// GetAllocator returns the ID allocator. -func (mc *Cluster) GetAllocator() id.Allocator { - return mc.IDAllocator +// AllocID returns a new unique ID. +func (mc *Cluster) AllocID() (uint64, error) { + return mc.IDAllocator.Alloc() } // GetPersistOptions returns the persist options. @@ -185,7 +184,7 @@ func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics // AllocPeer allocs a new peer on a store. func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { - peerID, err := mc.GetAllocator().Alloc() + peerID, err := mc.AllocID() if err != nil { log.Error("failed to alloc peer", errs.ZapError(err)) return nil, err @@ -358,7 +357,7 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea } mc.AddRegionStore(storeID, regionCount) for i := 0; i < leaderCount; i++ { - id, _ := mc.GetAllocator().Alloc() + id, _ := mc.AllocID() mc.AddLeaderRegion(id, storeID) } } diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 1aa01b6a243..1a94abb09fe 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -234,7 +234,7 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error { m.Lock() defer m.Unlock() - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to async wait state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -257,7 +257,7 @@ func (m *ModeManager) drSwitchToAsync(availableStores []uint64) error { } func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -280,7 +280,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error { } func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -301,7 +301,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { func (m *ModeManager) drSwitchToSync() error { m.Lock() defer m.Unlock() - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 658a476766b..c7eb0d7581c 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -16,7 +16,6 @@ package core import ( "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/id" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -49,7 +48,7 @@ type ScheduleCluster interface { GetRegionLabeler() *labeler.RegionLabeler GetBasicCluster() *core.BasicCluster GetStoreConfig() sc.StoreConfig - GetAllocator() id.Allocator + AllocID() (uint64, error) } // BasicCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index 2cc5a6e7102..06e8628cc6a 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -483,7 +483,7 @@ func (b *Builder) prepareBuild() (string, error) { if o == nil || (!b.useJointConsensus && !core.IsLearner(o) && core.IsLearner(n)) { if n.GetId() == 0 { // Allocate peer ID if need. - id, err := b.GetAllocator().Alloc() + id, err := b.AllocID() if err != nil { return "", err } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 42e552cb050..ac409f90115 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -18,7 +18,7 @@ import ( "context" "time" - "github.com/pingcap/errors" + "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -38,6 +38,7 @@ import ( const ( keepaliveTime = 10 * time.Second keepaliveTimeout = 3 * time.Second + msgSize = 8 * units.MiB ) // StopSyncWithLeader stop to sync the region with leader. @@ -56,38 +57,6 @@ func (s *RegionSyncer) reset() { s.mu.clientCancel, s.mu.clientCtx = nil, nil } -func (s *RegionSyncer) establish(ctx context.Context, addr string) (*grpc.ClientConn, error) { - tlsCfg, err := s.tlsConfig.ToTLSConfig() - if err != nil { - return nil, err - } - cc, err := grpcutil.GetClientConn( - ctx, - addr, - tlsCfg, - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: keepaliveTime, - Timeout: keepaliveTimeout, - }), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, // Default was 1s. - Multiplier: 1.6, // Default - Jitter: 0.2, // Default - MaxDelay: 3 * time.Second, // Default was 120s. - }, - MinConnectTimeout: 5 * time.Second, - }), - // WithBlock will block the dial step until success or cancel the context. - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.WithStack(err) - } - return cc, nil -} - func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) { cli := pdpb.NewPDClient(conn) syncStream, err := cli.SyncRegions(ctx) @@ -131,19 +100,26 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Warn("failed to load regions", errs.ZapError(err)) } // establish client. - var conn *grpc.ClientConn - for { - select { - case <-ctx.Done(): - return - default: - } - conn, err = s.establish(ctx, addr) - if err != nil { - log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) - continue - } - break + conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: keepaliveTime, + Timeout: keepaliveTimeout, + }), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, // Default was 1s. + Multiplier: 1.6, // Default + Jitter: 0.2, // Default + MaxDelay: 3 * time.Second, // Default was 120s. + }, + MinConnectTimeout: 5 * time.Second, + }), + // WithBlock will block the dial step until success or cancel the context. + grpc.WithBlock()) + // it means the context is canceled. + if conn == nil { + return } defer conn.Close() diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index fe28434ddb1..7d339e75dbe 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -39,7 +39,6 @@ import ( ) const ( - msgSize = 8 * units.MiB defaultBucketRate = 20 * units.MiB // 20MB/s defaultBucketCapacity = 20 * units.MiB // 20MB maxSyncRegionBatchSize = 100 diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 0c7d087aa72..5db1168c4ce 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server/config" @@ -108,7 +107,7 @@ type cluster interface { core.StoreSetInformer DropCacheAllRegion() - GetAllocator() id.Allocator + AllocID() (uint64, error) BuryStore(storeID uint64, forceBury bool) error GetPersistOptions() *config.PersistOptions } @@ -1135,11 +1134,11 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree, hasPlan := false createRegion := func(startKey, endKey []byte, storeID uint64) (*metapb.Region, error) { - regionID, err := u.cluster.GetAllocator().Alloc() + regionID, err := u.cluster.AllocID() if err != nil { return nil, err } - peerID, err := u.cluster.GetAllocator().Alloc() + peerID, err := u.cluster.AllocID() if err != nil { return nil, err } diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 5aae6d97e34..78895a07a08 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -21,15 +21,19 @@ import ( "net/url" "github.com/pingcap/log" + "github.com/pkg/errors" "github.com/tikv/pd/pkg/errs" "go.etcd.io/etcd/pkg/transport" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" ) -// ForwardMetadataKey is used to record the forwarded host of PD. -const ForwardMetadataKey = "pd-forwarded-host" +const ( + // ForwardMetadataKey is used to record the forwarded host of PD. + ForwardMetadataKey = "pd-forwarded-host" +) // TLSConfig is the configuration for supporting tls. type TLSConfig struct { @@ -160,3 +164,42 @@ func GetForwardedHost(ctx context.Context) string { } return "" } + +func establish(ctx context.Context, addr string, tlsConfig *TLSConfig, do ...grpc.DialOption) (*grpc.ClientConn, error) { + tlsCfg, err := tlsConfig.ToTLSConfig() + if err != nil { + return nil, err + } + cc, err := GetClientConn( + ctx, + addr, + tlsCfg, + do..., + ) + if err != nil { + return nil, errors.WithStack(err) + } + return cc, nil +} + +// CreateClientConn creates a client connection to the given target. +func CreateClientConn(ctx context.Context, addr string, tlsConfig *TLSConfig, do ...grpc.DialOption) *grpc.ClientConn { + var ( + conn *grpc.ClientConn + err error + ) + for { + select { + case <-ctx.Done(): + return nil + default: + } + conn, err = establish(ctx, addr, tlsConfig, do...) + if err != nil { + log.Error("cannot establish connection", zap.String("addr", addr), errs.ZapError(err)) + continue + } + break + } + return conn +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7238b121076..67e4e3c70cf 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -707,9 +707,9 @@ func (c *RaftCluster) PauseOrResumeChecker(name string, t int64) error { return c.coordinator.PauseOrResumeChecker(name, t) } -// GetAllocator returns cluster's id allocator. -func (c *RaftCluster) GetAllocator() id.Allocator { - return c.id +// AllocID returns a global unique ID. +func (c *RaftCluster) AllocID() (uint64, error) { + return c.id.Alloc() } // GetRegionSyncer returns the region syncer. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 4ca04add818..a722fed9bda 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2157,7 +2157,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind oper } func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { - id, err := c.GetAllocator().Alloc() + id, err := c.AllocID() if err != nil { return nil, err } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 4b9f608d47f..cba7584efd9 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -294,15 +294,15 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) - rc.GetAllocator().Alloc() - id, err := rc.GetAllocator().Alloc() + rc.AllocID() + id, err := rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is up will fail. resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) re.NoError(err) re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is offline will fail. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Offline) @@ -310,7 +310,7 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is tombstone is OK. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Tombstone) @@ -319,7 +319,7 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) deployPath := getTestDeployPath(id) // Put a new store.