Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Nov 21, 2019
1 parent 80ae3d2 commit 5c4c820
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
24 changes: 13 additions & 11 deletions pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const (
newSessionRetryUnlimited = math.MaxInt64
// newSessionRetryInterval is the interval time when retrying to create a new session.
newSessionRetryInterval = 200 * time.Millisecond
// logNewSessionIntervalCnt is the interval count when logging errors for creating session
logNewSessionIntervalCnt = int(3 * time.Second / newSessionRetryInterval)
)

// Election implements the leader election based on etcd.
Expand Down Expand Up @@ -151,7 +149,6 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
closeSession(session) // close the latest session.
}
if err != nil && errors.Cause(err) != ctx.Err() { // only send non-ctx.Err() error
e.l.Error("", zap.Error(err))
e.ech <- err
}
}()
Expand Down Expand Up @@ -253,20 +250,25 @@ func (e *Election) newSession(ctx context.Context, retryCnt int) (*concurrency.S
session *concurrency.Session
)
for i := 0; i < retryCnt; i++ {
if i > 0 {
select {
case e.ech <- terror.ErrElectionCampaignFail.Delegate(err, "create a new session"):
default:
}

select {
case <-time.After(newSessionRetryInterval):
case <-ctx.Done():
break
}
}

// add more options if needed.
// NOTE: I think use the client's context is better than something like `concurrency.WithContext(ctx)`,
// so we can close the session when the client is still valid.
session, err = concurrency.NewSession(e.cli, concurrency.WithTTL(e.sessionTTL))
if err == nil || errors.Cause(err) == ctx.Err() {
break
} else if i%logNewSessionIntervalCnt == 0 {
e.l.Warn("fail to create a new session", zap.Error(err))
}

select {
case <-time.After(newSessionRetryInterval):
case <-ctx.Done():
break
}
}
return session, err
Expand Down
9 changes: 5 additions & 4 deletions pkg/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,17 @@ func (t *testElectionSuite) TestElection2After1(c *C) {
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e2.ID())

// if closing the client when campaigning,
// we should get no error because new session will retry until context is done,
// and we ignore the context done error.
// if closing the client when campaigning, we should get an error
wg.Add(1)
go func() {
defer wg.Done()
select {
case err2 := <-e2.ErrorNotify():
c.Fatalf("should not recieve error %v", err2)
c.Assert(terror.ErrElectionCampaignFail.Equal(err2), IsTrue)
// the old session is done, but we can't create a new one.
c.Assert(err2, ErrorMatches, ".*fail to campaign leader: create a new session.*")
case <-time.After(time.Second):
c.Fatal("do not receive error for e2")
}
}()
cli.Close() // close the client
Expand Down

0 comments on commit 5c4c820

Please sign in to comment.