Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Nov 21, 2019
1 parent dbb87d0 commit 80ae3d2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 49 deletions.
6 changes: 6 additions & 0 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Error("get leader info", zap.Error(err2))
}
}
case err := <-s.election.ErrorNotify():
// handle errors here, we do no meaningful things now.
// but maybe:
// 1. trigger an alert
// 2. shutdown the DM-master process
log.L().Error("receive error from election", zap.Error(err))
}
}
}
9 changes: 6 additions & 3 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ func (s *Server) Start(ctx context.Context) (err error) {
return
}

s.closed.Set(false) // the server started now.

// start leader election
s.election = election.NewElection(ctx, s.etcdClient, electionTTL, electionKey, s.cfg.Name)
s.election, err = election.NewElection(ctx, s.etcdClient, electionTTL, electionKey, s.cfg.Name)
if err != nil {
return
}

s.closed.Set(false) // the server started now.

s.bgFunWg.Add(1)
go func() {
Expand Down
71 changes: 51 additions & 20 deletions pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package election

import (
"context"
"math"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
Expand All @@ -28,6 +30,17 @@ import (
"github.com/pingcap/dm/pkg/terror"
)

const (
// newSessionDefaultRetryCnt is the default retry times when creating new session.
newSessionDefaultRetryCnt = 3
// newSessionRetryUnlimited is the unlimited retry times when creating new session.
newSessionRetryUnlimited = math.MaxInt64
// newSessionRetryInterval is the interval time when retrying to create a new session.
newSessionRetryInterval = 200 * time.Millisecond
// logNewSessionIntervalCnt is the interval count when logging errors for creating session
logNewSessionIntervalCnt = int(3 * time.Second / newSessionRetryInterval)
)

// Election implements the leader election based on etcd.
type Election struct {
// the Election instance does not own the client instance,
Expand All @@ -48,7 +61,7 @@ type Election struct {
}

// NewElection creates a new etcd leader Election instance and starts the campaign loop.
func NewElection(ctx context.Context, cli *clientv3.Client, sessionTTL int, key, id string) *Election {
func NewElection(ctx context.Context, cli *clientv3.Client, sessionTTL int, key, id string) (*Election, error) {
ctx2, cancel2 := context.WithCancel(ctx)
e := &Election{
cli: cli,
Expand All @@ -60,12 +73,21 @@ func NewElection(ctx context.Context, cli *clientv3.Client, sessionTTL int, key,
cancel: cancel2,
l: log.With(zap.String("component", "election")),
}

// try create a session before enter the campaign loop.
// so we can detect potential error earlier.
session, err := e.newSession(ctx, newSessionDefaultRetryCnt)
if err != nil {
cancel2()
return nil, terror.ErrElectionCampaignFail.Delegate(err, "create the initial session")
}

e.bgWg.Add(1)
go func() {
defer e.bgWg.Done()
e.campaignLoop(ctx2)
e.campaignLoop(ctx2, session)
}()
return e
return e, nil
}

// IsLeader returns whether this member is the leader.
Expand Down Expand Up @@ -115,19 +137,15 @@ func (e *Election) Close() {
e.l.Info("election is closed")
}

func (e *Election) campaignLoop(ctx context.Context) {
var (
session *concurrency.Session
err error
)

func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Session) {
closeSession := func(se *concurrency.Session) {
err2 := se.Close() // only log this error
if err2 != nil {
e.l.Error("fail to close etcd session", zap.Int64("lease", int64(se.Lease())), zap.Error(err2))
}
}

var err error
defer func() {
if session != nil {
closeSession(session) // close the latest session.
Expand All @@ -138,20 +156,13 @@ func (e *Election) campaignLoop(ctx context.Context) {
}
}()

// create the initial session.
session, err = newSession(e.cli, e.sessionTTL)
if err != nil {
err = terror.ErrElectionCampaignFail.Delegate(err, "create the initial session")
return
}

for {
// check context canceled/timeout
select {
case <-session.Done():
e.l.Info("etcd session is done, will try to create a new one", zap.Int64("old lease", int64(session.Lease())))
closeSession(session)
session, err = newSession(e.cli, e.sessionTTL)
session, err = e.newSession(ctx, newSessionRetryUnlimited) // retry until context is done
if err != nil {
err = terror.ErrElectionCampaignFail.Delegate(err, "create a new session")
return
Expand Down Expand Up @@ -236,9 +247,29 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session
}
}

func newSession(cli *clientv3.Client, ttl int) (*concurrency.Session, error) {
// add more options if needed.
return concurrency.NewSession(cli, concurrency.WithTTL(ttl))
func (e *Election) newSession(ctx context.Context, retryCnt int) (*concurrency.Session, error) {
var (
err error
session *concurrency.Session
)
for i := 0; i < retryCnt; i++ {
// add more options if needed.
// NOTE: I think use the client's context is better than something like `concurrency.WithContext(ctx)`,
// so we can close the session when the client is still valid.
session, err = concurrency.NewSession(e.cli, concurrency.WithTTL(e.sessionTTL))
if err == nil || errors.Cause(err) == ctx.Err() {
break
} else if i%logNewSessionIntervalCnt == 0 {
e.l.Warn("fail to create a new session", zap.Error(err))
}

select {
case <-time.After(newSessionRetryInterval):
case <-ctx.Done():
break
}
}
return session, err
}

// getLeaderInfo get the current leader's information (if exists).
Expand Down
43 changes: 17 additions & 26 deletions pkg/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (t *testElectionSuite) TestElection2After1(c *C) {

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
e1 := NewElection(ctx1, cli, sessionTTL, key, ID1)
e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1)
c.Assert(err, IsNil)
defer e1.Close()

// e1 should become the leader
Expand All @@ -111,7 +112,8 @@ func (t *testElectionSuite) TestElection2After1(c *C) {
// start e2
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
e2 := NewElection(ctx2, cli, sessionTTL, key, ID2)
e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2)
c.Assert(err, IsNil)
defer e2.Close()
select {
case leader := <-e2.leaderCh:
Expand Down Expand Up @@ -151,17 +153,16 @@ func (t *testElectionSuite) TestElection2After1(c *C) {
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e2.ID())

// if closing the client when campaigning, we should get an error
// if closing the client when campaigning,
// we should get no error because new session will retry until context is done,
// and we ignore the context done error.
wg.Add(1)
go func() {
defer wg.Done()
select {
case err2 := <-e2.ErrorNotify():
c.Assert(terror.ErrElectionCampaignFail.Equal(err2), IsTrue)
// the old session is done, but we can't create a new one.
c.Assert(err2, ErrorMatches, ".*fail to campaign leader: create a new session.*")
c.Fatalf("should not recieve error %v", err2)
case <-time.After(time.Second):
c.Fatal("do not receive error for e2")
}
}()
cli.Close() // close the client
Expand All @@ -170,22 +171,9 @@ func (t *testElectionSuite) TestElection2After1(c *C) {
// can not elect with closed client.
ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()
e3 := NewElection(ctx3, cli, sessionTTL, key, ID3)
defer e3.Close()
select {
case err3 := <-e3.ErrorNotify():
c.Assert(terror.ErrElectionCampaignFail.Equal(err3), IsTrue)
c.Assert(err3, ErrorMatches, ".*fail to campaign leader: create the initial session.*")
case <-time.After(time.Second):
c.Fatal("should fail to create session with closed client")
}

// can not get leader info with closed client.
leaderKey, leaderID, err := e3.LeaderInfo(ctx3)
c.Assert(terror.ErrElectionGetLeaderIDFail.Equal(err), IsTrue)
c.Assert(err, ErrorMatches, ".*fail to get leader ID: rpc error: code = Canceled.*")
c.Assert(leaderKey, Equals, "")
c.Assert(leaderID, Equals, "")
_, err = NewElection(ctx3, cli, sessionTTL, key, ID3)
c.Assert(terror.ErrElectionCampaignFail.Equal(err), IsTrue)
c.Assert(err, ErrorMatches, ".*fail to campaign leader: create the initial session: context canceled.*")
}

func (t *testElectionSuite) TestElectionAlways1(c *C) {
Expand All @@ -201,7 +189,8 @@ func (t *testElectionSuite) TestElectionAlways1(c *C) {

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
e1 := NewElection(ctx1, cli, sessionTTL, key, ID1)
e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1)
c.Assert(err, IsNil)
defer e1.Close()

// e1 should become the leader
Expand All @@ -219,7 +208,8 @@ func (t *testElectionSuite) TestElectionAlways1(c *C) {
// start e2
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
e2 := NewElection(ctx2, cli, sessionTTL, key, ID2)
e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2)
c.Assert(err, IsNil)
defer e2.Close()
time.Sleep(100 * time.Millisecond) // wait 100ms to start the campaign
// but the leader should still be e1
Expand Down Expand Up @@ -262,7 +252,8 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
e := NewElection(ctx, cli, sessionTTL, key, ID)
e, err := NewElection(ctx, cli, sessionTTL, key, ID)
c.Assert(err, IsNil)
defer e.Close()

// should become the leader
Expand Down

0 comments on commit 80ae3d2

Please sign in to comment.