diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 3d3740f09e..d2bf792ea7 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -110,6 +110,8 @@ ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium] ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed" ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid" ErrRelayLogGivenPosTooBig,[code=11106:class=functional:scope=internal:level=high],"the given relay log pos %s of meta config is too big, please check it again" +ErrElectionCampaignFail,[code=11107:class=functional:scope=internal:level=high],"fail to campaign leader: %s" +ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=medium],"fail to get leader ID" ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s" ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s" ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s" diff --git a/dm/master/election.go b/dm/master/election.go new file mode 100644 index 0000000000..06e26bd34a --- /dev/null +++ b/dm/master/election.go @@ -0,0 +1,49 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "context" + + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" +) + +func (s *Server) electionNotify(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case leader := <-s.election.LeaderNotify(): + // output the leader info. + if leader { + log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name)) + } else { + _, leaderID, err2 := s.election.LeaderInfo(ctx) + if err2 == nil { + log.L().Info("current member retire from the leader", zap.String("leader", leaderID), zap.String("current member", s.cfg.Name)) + } else { + log.L().Warn("get leader info", zap.Error(err2)) + } + } + case err := <-s.election.ErrorNotify(): + // handle errors here, we do no meaningful things now. + // but maybe: + // 1. trigger an alert + // 2. shutdown the DM-master process + log.L().Error("receive error from election", zap.Error(err)) + } + } +} diff --git a/dm/master/etcd.go b/dm/master/etcd.go index 6bd5429640..a45aa193b1 100644 --- a/dm/master/etcd.go +++ b/dm/master/etcd.go @@ -22,7 +22,6 @@ import ( "strings" "time" - "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" @@ -119,10 +118,7 @@ func prepareJoinEtcd(cfg *Config) error { } // if without previous data, we need a client to contact with the existing cluster. - client, err := clientv3.New(clientv3.Config{ - Endpoints: strings.Split(cfg.Join, ","), - DialTimeout: etcdutil.DefaultDialTimeout, - }) + client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ",")) if err != nil { return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join)) } diff --git a/dm/master/server.go b/dm/master/server.go index b4db47b1c8..36433b25d0 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go/sync2" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" @@ -34,12 +35,23 @@ import ( operator "github.com/pingcap/dm/dm/master/sql-operator" "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/election" + "github.com/pingcap/dm/pkg/etcdutil" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/tracing" "github.com/pingcap/dm/syncer" ) +const ( + // the session's TTL in seconds for leader election. + // NOTE: select this value carefully when adding a mechanism relying on leader election. + electionTTL = 60 + // the DM-master leader election key prefix + // DM-master cluster : etcd cluster = 1 : 1 now. + electionKey = "/dm-master/leader" +) + var ( fetchDDLInfoRetryTimeout = 5 * time.Second ) @@ -53,6 +65,9 @@ type Server struct { // the embed etcd server, and the gRPC/HTTP API server also attached to it. etcd *embed.Etcd + etcdClient *clientv3.Client + election *election.Election + // WaitGroup for background functions. bgFunWg sync.WaitGroup @@ -151,6 +166,19 @@ func (s *Server) Start(ctx context.Context) (err error) { return } + // create an etcd client used in the whole server instance. + // NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed. + s.etcdClient, err = etcdutil.CreateClient([]string{s.cfg.MasterAddr}) + if err != nil { + return + } + + // start leader election + s.election, err = election.NewElection(ctx, s.etcdClient, electionTTL, electionKey, s.cfg.Name) + if err != nil { + return + } + s.closed.Set(false) // the server started now. s.bgFunWg.Add(1) @@ -159,6 +187,12 @@ func (s *Server) Start(ctx context.Context) (err error) { s.ap.Start(ctx) }() + s.bgFunWg.Add(1) + go func() { + defer s.bgFunWg.Done() + s.electionNotify(ctx) + }() + s.bgFunWg.Add(1) go func() { defer s.bgFunWg.Done() @@ -194,6 +228,14 @@ func (s *Server) Close() { // wait for background functions returned s.bgFunWg.Wait() + if s.election != nil { + s.election.Close() + } + + if s.etcdClient != nil { + s.etcdClient.Close() + } + // close the etcd and other attached servers if s.etcd != nil { s.etcd.Close() diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 12d9c1fa14..0edaa46fa0 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/pd/pkg/tempurl" - "go.etcd.io/etcd/clientv3" "github.com/pingcap/dm/checker" "github.com/pingcap/dm/dm/config" @@ -1548,6 +1547,11 @@ func (t *testMaster) TestJoinMember(c *check.C) { c.Assert(s1.Start(ctx), check.IsNil) defer s1.Close() + // wait the first one become the leader + c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool { + return s1.election.IsLeader() + }), check.IsTrue) + // join to an existing cluster cfg2 := NewConfig() c.Assert(cfg2.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) @@ -1562,14 +1566,11 @@ func (t *testMaster) TestJoinMember(c *check.C) { c.Assert(s2.Start(ctx), check.IsNil) defer s2.Close() - client, err := clientv3.New(clientv3.Config{ - Endpoints: strings.Split(cfg1.AdvertisePeerUrls, ","), - DialTimeout: etcdutil.DefaultDialTimeout, - }) + client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ",")) c.Assert(err, check.IsNil) defer client.Close() - // verify membersm + // verify members listResp, err := etcdutil.ListMembers(client) c.Assert(err, check.IsNil) c.Assert(listResp.Members, check.HasLen, 2) @@ -1582,5 +1583,10 @@ func (t *testMaster) TestJoinMember(c *check.C) { _, ok = names[cfg2.Name] c.Assert(ok, check.IsTrue) + // s1 is still the leader + _, leaderID, err := s2.election.LeaderInfo(ctx) + c.Assert(err, check.IsNil) + c.Assert(leaderID, check.Equals, cfg1.Name) + cancel() } diff --git a/pkg/election/election.go b/pkg/election/election.go new file mode 100644 index 0000000000..86453397a2 --- /dev/null +++ b/pkg/election/election.go @@ -0,0 +1,285 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +import ( + "context" + "math" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/siddontang/go/sync2" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" +) + +const ( + // newSessionDefaultRetryCnt is the default retry times when creating new session. + newSessionDefaultRetryCnt = 3 + // newSessionRetryUnlimited is the unlimited retry times when creating new session. + newSessionRetryUnlimited = math.MaxInt64 + // newSessionRetryInterval is the interval time when retrying to create a new session. + newSessionRetryInterval = 200 * time.Millisecond +) + +// Election implements the leader election based on etcd. +type Election struct { + // the Election instance does not own the client instance, + // so do not close it in the methods of Election. + cli *clientv3.Client + sessionTTL int + key string + id string + ech chan error + leaderCh chan bool + isLeader sync2.AtomicBool + + closed sync2.AtomicInt32 + cancel context.CancelFunc + bgWg sync.WaitGroup + + l log.Logger +} + +// NewElection creates a new etcd leader Election instance and starts the campaign loop. +func NewElection(ctx context.Context, cli *clientv3.Client, sessionTTL int, key, id string) (*Election, error) { + ctx2, cancel2 := context.WithCancel(ctx) + e := &Election{ + cli: cli, + sessionTTL: sessionTTL, + key: key, + id: id, + leaderCh: make(chan bool, 1), + ech: make(chan error, 1), // size 1 is enough + cancel: cancel2, + l: log.With(zap.String("component", "election")), + } + + // try create a session before enter the campaign loop. + // so we can detect potential error earlier. + session, err := e.newSession(ctx, newSessionDefaultRetryCnt) + if err != nil { + cancel2() + return nil, terror.ErrElectionCampaignFail.Delegate(err, "create the initial session") + } + + e.bgWg.Add(1) + go func() { + defer e.bgWg.Done() + e.campaignLoop(ctx2, session) + }() + return e, nil +} + +// IsLeader returns whether this member is the leader. +func (e *Election) IsLeader() bool { + return e.isLeader.Get() +} + +// ID returns the current member's ID. +func (e *Election) ID() string { + return e.id +} + +// LeaderInfo returns the current leader's key and ID. +// it's similar with https://github.com/etcd-io/etcd/blob/v3.4.3/clientv3/concurrency/election.go#L147. +func (e *Election) LeaderInfo(ctx context.Context) (string, string, error) { + resp, err := e.cli.Get(ctx, e.key, clientv3.WithFirstCreate()...) + if err != nil { + return "", "", terror.ErrElectionGetLeaderIDFail.Delegate(err) + } else if len(resp.Kvs) == 0 { + // no leader currently elected + return "", "", terror.ErrElectionGetLeaderIDFail.Delegate(concurrency.ErrElectionNoLeader) + } + return string(resp.Kvs[0].Key), string(resp.Kvs[0].Value), nil +} + +// LeaderNotify returns a channel that can fetch notification when the member become the leader or retire from the leader. +// `true` means become the leader; `false` means retire from the leader. +func (e *Election) LeaderNotify() <-chan bool { + return e.leaderCh +} + +// ErrorNotify returns a channel that can fetch errors occurred for campaign. +func (e *Election) ErrorNotify() <-chan error { + return e.ech +} + +// Close closes the election instance and release the resources. +func (e *Election) Close() { + e.l.Info("election is closing") + if !e.closed.CompareAndSwap(0, 1) { + e.l.Info("election was already closed") + return + } + + e.cancel() + e.bgWg.Wait() + e.l.Info("election is closed") +} + +func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Session) { + closeSession := func(se *concurrency.Session) { + err2 := se.Close() // only log this error + if err2 != nil { + e.l.Error("fail to close etcd session", zap.Int64("lease", int64(se.Lease())), zap.Error(err2)) + } + } + + var err error + defer func() { + if session != nil { + closeSession(session) // close the latest session. + } + if err != nil && errors.Cause(err) != ctx.Err() { // only send non-ctx.Err() error + e.ech <- err + } + }() + + for { + // check context canceled/timeout + select { + case <-session.Done(): + e.l.Info("etcd session is done, will try to create a new one", zap.Int64("old lease", int64(session.Lease()))) + closeSession(session) + session, err = e.newSession(ctx, newSessionRetryUnlimited) // retry until context is done + if err != nil { + err = terror.ErrElectionCampaignFail.Delegate(err, "create a new session") + return + } + case <-ctx.Done(): + e.l.Info("break campaign loop, context is done", zap.Error(ctx.Err())) + return + default: + } + + // try to campaign + elec := concurrency.NewElection(session, e.key) + err = elec.Campaign(ctx, e.id) + if err != nil { + // err may be ctx.Err(), but this can be handled in `case <-ctx.Done()` + e.l.Warn("fail to campaign", zap.Error(err)) + continue + } + + // compare with the current leader + leaderKey, leaderID, err := getLeaderInfo(ctx, elec) + if err != nil { + // err may be ctx.Err(), but this can be handled in `case <-ctx.Done()` + e.l.Warn("fail to get leader ID", zap.Error(err)) + continue + } + if leaderID != e.id { + e.l.Info("current member is not the leader", zap.String("current member", e.id), zap.String("leader", leaderID)) + continue + } + + e.toBeLeader() // become the leader now + e.watchLeader(ctx, session, leaderKey) + e.retireLeader() // need to re-campaign + } +} + +func (e *Election) toBeLeader() { + e.isLeader.Set(true) + select { + case e.leaderCh <- true: + default: + } +} + +func (e *Election) retireLeader() { + e.isLeader.Set(false) + select { + case e.leaderCh <- false: + default: + } +} + +func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string) { + e.l.Debug("watch leader key", zap.String("key", key)) + wch := e.cli.Watch(ctx, key) + for { + select { + case resp, ok := <-wch: + if !ok { + e.l.Info("watch channel is closed") + return + } + if resp.Canceled { + e.l.Info("watch canceled") + return + } + + for _, ev := range resp.Events { + // user may use some etcd client (like etcdctl) to delete the leader key and trigger a new campaign. + if ev.Type == mvccpb.DELETE { + e.l.Info("fail to watch, the leader is deleted", zap.ByteString("key", ev.Kv.Key)) + return + } + } + case <-session.Done(): + return + case <-ctx.Done(): + return + } + } +} + +func (e *Election) newSession(ctx context.Context, retryCnt int) (*concurrency.Session, error) { + var ( + err error + session *concurrency.Session + ) + +forLoop: + for i := 0; i < retryCnt; i++ { + if i > 0 { + select { + case e.ech <- terror.ErrElectionCampaignFail.Delegate(err, "create a new session"): + default: + } + + select { + case <-time.After(newSessionRetryInterval): + case <-ctx.Done(): + break forLoop + } + } + + // add more options if needed. + // NOTE: I think use the client's context is better than something like `concurrency.WithContext(ctx)`, + // so we can close the session when the client is still valid. + session, err = concurrency.NewSession(e.cli, concurrency.WithTTL(e.sessionTTL)) + if err == nil || errors.Cause(err) == e.cli.Ctx().Err() { + break forLoop + } + } + return session, err +} + +// getLeaderInfo get the current leader's information (if exists). +func getLeaderInfo(ctx context.Context, elec *concurrency.Election) (key, ID string, err error) { + resp, err := elec.Leader(ctx) + if err != nil { + return + } + return string(resp.Kvs[0].Key), string(resp.Kvs[0].Value), nil +} diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go new file mode 100644 index 0000000000..d09f5ec037 --- /dev/null +++ b/pkg/election/election_test.go @@ -0,0 +1,287 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +import ( + "context" + "fmt" + "net/url" + "sync" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/pd/pkg/tempurl" + "go.etcd.io/etcd/embed" + + "github.com/pingcap/dm/pkg/etcdutil" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" +) + +var _ = Suite(&testElectionSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testElectionSuite struct { + etcd *embed.Etcd + endPoint string +} + +func (t *testElectionSuite) SetUpTest(c *C) { + log.InitLogger(&log.Config{}) + + cfg := embed.NewConfig() + cfg.Name = "election-test" + cfg.Dir = c.MkDir() + cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(log.L().Logger, log.L().Core(), log.Props().Syncer) + cfg.Logger = "zap" + err := cfg.Validate() // verify & trigger the builder + c.Assert(err, IsNil) + + t.endPoint = tempurl.Alloc() + url2, err := url.Parse(t.endPoint) + c.Assert(err, IsNil) + cfg.LCUrls = []url.URL{*url2} + cfg.ACUrls = cfg.LCUrls + + url2, err = url.Parse(tempurl.Alloc()) + c.Assert(err, IsNil) + cfg.LPUrls = []url.URL{*url2} + cfg.APUrls = cfg.LPUrls + + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, url2) + cfg.ClusterState = embed.ClusterStateFlagNew + + t.etcd, err = embed.StartEtcd(cfg) + c.Assert(err, IsNil) + select { + case <-t.etcd.Server.ReadyNotify(): + case <-time.After(10 * time.Second): + c.Fatal("start embed etcd timeout") + } +} + +func (t *testElectionSuite) TearDownTest(c *C) { + t.etcd.Close() +} + +func (t *testElectionSuite) TestElection2After1(c *C) { + var ( + sessionTTL = 60 + key = "unit-test/election-2-after-1" + ID1 = "member1" + ID2 = "member2" + ID3 = "member3" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}) + c.Assert(err, IsNil) + defer cli.Close() + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1) + c.Assert(err, IsNil) + defer e1.Close() + + // e1 should become the leader + select { + case leader := <-e1.LeaderNotify(): + c.Assert(leader, IsTrue) + case <-time.After(3 * time.Second): + c.Fatal("leader campaign timeout") + } + c.Assert(e1.IsLeader(), IsTrue) + _, leaderID, err := e1.LeaderInfo(ctx1) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + + // start e2 + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2) + c.Assert(err, IsNil) + defer e2.Close() + select { + case leader := <-e2.leaderCh: + c.Fatalf("should not receive leader notify for e2, but got %v", leader) + case <-time.After(100 * time.Millisecond): // wait 100ms to start the campaign + } + // but the leader should still be e1 + _, leaderID, err = e2.LeaderInfo(ctx2) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(e2.IsLeader(), IsFalse) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case leader := <-e1.LeaderNotify(): // e1 should retire when closing + c.Assert(leader, IsFalse) + case <-time.After(time.Second): + c.Fatal("leader campaign timeout") + } + }() + e1.Close() // stop the campaign for e1 + c.Assert(e1.IsLeader(), IsFalse) + wg.Wait() + + // e2 should become the leader + select { + case leader := <-e2.LeaderNotify(): + c.Assert(leader, IsTrue) + case <-time.After(3 * time.Second): + c.Fatal("leader campaign timeout") + } + c.Assert(e2.IsLeader(), IsTrue) + _, leaderID, err = e2.LeaderInfo(ctx2) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e2.ID()) + + // if closing the client when campaigning, we should get an error + wg.Add(1) + go func() { + defer wg.Done() + select { + case err2 := <-e2.ErrorNotify(): + c.Assert(terror.ErrElectionCampaignFail.Equal(err2), IsTrue) + // the old session is done, but we can't create a new one. + c.Assert(err2, ErrorMatches, ".*fail to campaign leader: create a new session.*") + case <-time.After(time.Second): + c.Fatal("do not receive error for e2") + } + }() + cli.Close() // close the client + wg.Wait() + + // can not elect with closed client. + ctx3, cancel3 := context.WithCancel(context.Background()) + defer cancel3() + _, err = NewElection(ctx3, cli, sessionTTL, key, ID3) + c.Assert(terror.ErrElectionCampaignFail.Equal(err), IsTrue) + c.Assert(err, ErrorMatches, ".*fail to campaign leader: create the initial session: context canceled.*") +} + +func (t *testElectionSuite) TestElectionAlways1(c *C) { + var ( + sessionTTL = 60 + key = "unit-test/election-always-1" + ID1 = "member1" + ID2 = "member2" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}) + c.Assert(err, IsNil) + defer cli.Close() + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1) + c.Assert(err, IsNil) + defer e1.Close() + + // e1 should become the leader + select { + case leader := <-e1.LeaderNotify(): + c.Assert(leader, IsTrue) + case <-time.After(3 * time.Second): + c.Fatal("leader campaign timeout") + } + c.Assert(e1.IsLeader(), IsTrue) + _, leaderID, err := e1.LeaderInfo(ctx1) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + + // start e2 + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2) + c.Assert(err, IsNil) + defer e2.Close() + time.Sleep(100 * time.Millisecond) // wait 100ms to start the campaign + // but the leader should still be e1 + _, leaderID, err = e2.LeaderInfo(ctx2) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(e2.IsLeader(), IsFalse) + + // cancel the campaign for e2, should get no errors + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case err2 := <-e2.ErrorNotify(): + c.Fatalf("cancel the campaign should not get an error, %v", err2) + case <-time.After(time.Second): // wait 1s + } + }() + cancel2() + wg.Wait() + + // e1 is still the leader + c.Assert(e1.IsLeader(), IsTrue) + _, leaderID, err = e1.LeaderInfo(ctx1) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(e2.IsLeader(), IsFalse) +} + +func (t *testElectionSuite) TestElectionDeleteKey(c *C) { + var ( + sessionTTL = 60 + key = "unit-test/election-delete-key" + ID = "member" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}) + c.Assert(err, IsNil) + defer cli.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e, err := NewElection(ctx, cli, sessionTTL, key, ID) + c.Assert(err, IsNil) + defer e.Close() + + // should become the leader + select { + case leader := <-e.LeaderNotify(): + c.Assert(leader, IsTrue) + case <-time.After(3 * time.Second): + c.Fatal("leader campaign timeout") + } + c.Assert(e.IsLeader(), IsTrue) + leaderKey, leaderID, err := e.LeaderInfo(ctx) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e.ID()) + + // the leader retired after deleted the key + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Done() + select { + case err2 := <-e.ErrorNotify(): + c.Fatalf("delete the leader key should not get an error, %v", err2) + case leader := <-e.LeaderNotify(): + c.Assert(leader, IsFalse) + } + }() + _, err = cli.Delete(ctx, leaderKey) + c.Assert(err, IsNil) + wg.Wait() +} diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index f58a8a0801..c94b5da07f 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -31,6 +31,14 @@ const ( DefaultRequestTimeout = 10 * time.Second ) +// CreateClient creates an etcd client with some default config items. +func CreateClient(endpoints []string) (*clientv3.Client, error) { + return clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: DefaultDialTimeout, + }) +} + // ListMembers returns a list of internal etcd members. func ListMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) { ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) diff --git a/pkg/etcdutil/etcdutil_test.go b/pkg/etcdutil/etcdutil_test.go index afc593f321..67880113c1 100644 --- a/pkg/etcdutil/etcdutil_test.go +++ b/pkg/etcdutil/etcdutil_test.go @@ -88,7 +88,7 @@ func (t *testEtcdUtilSuite) startEtcd(c *C, cfg *embed.Config) *embed.Etcd { e, err := embed.StartEtcd(cfg) c.Assert(err, IsNil) - timeout := time.Second + timeout := 3 * time.Second select { case <-e.Server.ReadyNotify(): case <-time.After(timeout): @@ -99,7 +99,7 @@ func (t *testEtcdUtilSuite) startEtcd(c *C, cfg *embed.Config) *embed.Etcd { } func (t *testEtcdUtilSuite) createEtcdClient(c *C, cfg *embed.Config) *clientv3.Client { - cli, err := clientv3.New(clientv3.Config{Endpoints: t.urlsToStrings(cfg.LCUrls)}) + cli, err := CreateClient(t.urlsToStrings(cfg.LCUrls)) c.Assert(err, IsNil) return cli } diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 11a4743205..3969b7d1c7 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -140,6 +140,9 @@ const ( codeGTIDTruncateInvalid // pkg/streamer codeRelayLogGivenPosTooBig + // pkg/election + codeElectionCampaignFail + codeElectionGetLeaderIDFail ) // Config related error code list @@ -608,6 +611,9 @@ var ( ErrGTIDTruncateInvalid = New(codeGTIDTruncateInvalid, ClassFunctional, ScopeInternal, LevelHigh, "truncate GTID sets %v to %v not valid") // pkg/streamer ErrRelayLogGivenPosTooBig = New(codeRelayLogGivenPosTooBig, ClassFunctional, ScopeInternal, LevelHigh, "the given relay log pos %s of meta config is too big, please check it again") + // pkg/election + ErrElectionCampaignFail = New(codeElectionCampaignFail, ClassFunctional, ScopeInternal, LevelHigh, "fail to campaign leader: %s") + ErrElectionGetLeaderIDFail = New(codeElectionGetLeaderIDFail, ClassFunctional, ScopeInternal, LevelMedium, "fail to get leader ID") // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s") diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 64c0cde428..7af76a1b83 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -1,8 +1,2 @@ -all_mode full_mode -sequence_sharding -sequence_safe_mode -relay_interrupt start_task -initial_unit -http_apis