From a7bc4b1c037ab2e1b30f5b18e2b57d9ea723c574 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 9 May 2019 11:35:25 +0800 Subject: [PATCH 1/2] server: set timeout for MoveLeader Signed-off-by: disksing --- server/leader.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/server/leader.go b/server/leader.go index fcc03d401eb..def7b8bfcb7 100644 --- a/server/leader.go +++ b/server/leader.go @@ -30,6 +30,9 @@ import ( "go.uber.org/zap" ) +// The timeout to wait transfer etcd leader to complete. +const moveLeaderTimeout = 5 * time.Second + // IsLeader returns whether the server is leader or not. func (s *Server) IsLeader() bool { // If server is not started. Both leaderID and ID could be 0. @@ -144,7 +147,7 @@ func (s *Server) etcdLeaderLoop() { break } if myPriority > leaderPriority { - err := s.etcd.Server.MoveLeader(ctx, etcdLeader, s.ID()) + err := s.MoveEtcdLeader(ctx, etcdLeader, s.ID()) if err != nil { log.Error("failed to transfer etcd leader", zap.Error(err)) } else { @@ -160,6 +163,13 @@ func (s *Server) etcdLeaderLoop() { } } +// MoveEtcdLeader tries to transfer etcd leader. +func (s *Server) MoveEtcdLeader(ctx context.Context, old, new uint64) error { + moveCtx, cancel := context.WithTimeout(ctx, moveLeaderTimeout) + defer cancel() + return errors.WithStack(s.etcd.Server.MoveLeader(moveCtx, old, new)) +} + // getLeader gets server leader from etcd. func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { leader := &pdpb.Member{} @@ -373,8 +383,7 @@ func (s *Server) ResignLeader(nextLeader string) error { return errors.New("no valid pd to transfer leader") } nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))] - err = s.etcd.Server.MoveLeader(s.serverLoopCtx, s.ID(), nextLeaderID) - return errors.WithStack(err) + return s.MoveEtcdLeader(s.serverLoopCtx, s.ID(), nextLeaderID) } func (s *Server) deleteLeaderKey() error { From 9abaabba793af966cb5cf6cdc2b5498b152de8d8 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 9 May 2019 12:10:44 +0800 Subject: [PATCH 2/2] add test Signed-off-by: disksing --- tests/cluster.go | 19 ++++++++++ tests/server/move_leader_test.go | 60 ++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 tests/server/move_leader_test.go diff --git a/tests/cluster.go b/tests/cluster.go index dee9369feec..051f3f7d52f 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -201,6 +201,25 @@ func (s *TestServer) GetEtcdLeader() (string, error) { return members.GetEtcdLeader().GetName(), nil } +// GetEtcdLeaderID returns the builtin etcd leader ID. +func (s *TestServer) GetEtcdLeaderID() (uint64, error) { + s.RLock() + defer s.RUnlock() + req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}} + members, err := s.server.GetMembers(context.TODO(), req) + if err != nil { + return 0, errors.WithStack(err) + } + return members.GetEtcdLeader().GetMemberId(), nil +} + +// MoveEtcdLeader moves etcd leader from old to new. +func (s *TestServer) MoveEtcdLeader(old, new uint64) error { + s.RLock() + defer s.RUnlock() + return s.server.MoveEtcdLeader(context.Background(), old, new) +} + // GetEtcdClient returns the builtin etcd client. func (s *TestServer) GetEtcdClient() *clientv3.Client { s.RLock() diff --git a/tests/server/move_leader_test.go b/tests/server/move_leader_test.go new file mode 100644 index 00000000000..b6549fbe3b6 --- /dev/null +++ b/tests/server/move_leader_test.go @@ -0,0 +1,60 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server_test + +import ( + "sync" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/pd/tests" +) + +func (s *serverTestSuite) TestMoveLeader(c *C) { + c.Parallel() + + cluster, err := tests.NewTestCluster(5) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + var wg sync.WaitGroup + wg.Add(5) + for _, s := range cluster.GetServers() { + go func(s *tests.TestServer) { + defer wg.Done() + if s.IsLeader() { + s.ResignLeader() + } else { + old, _ := s.GetEtcdLeaderID() + s.MoveEtcdLeader(old, s.GetServerID()) + } + }(s) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + c.Fatal("move etcd leader does not return in 10 seconds") + } +}