Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft.LeaderCh() always deliver latest transition #384

Merged
merged 1 commit into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
leaderCh: make(chan bool, 1),
localID: localID,
localAddr: localAddr,
logger: logger,
Expand Down Expand Up @@ -959,10 +959,17 @@ func (r *Raft) State() RaftState {
return r.getState()
}

// LeaderCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. The channel is not buffered,
// and does not block on writes.
// LeaderCh is used to get a channel which delivers signals on acquiring or
// losing leadership. It sends true if we become the leader, and false if we
// lose it.
//
// Receivers can expect to receive a notification only if leadership
// transition has occured.
//
// If receivers aren't ready for the signal, signals may drop and only the
// latest leadership transition. For example, if a receiver receives subsequent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// latest leadership transition. For example, if a receiver receives subsequent
// latest leadership transition will be received. For example, if a receiver receives consecutive

// `true` values, they may deduce that leadership was lost and regained while
// the the receiver was processing first leadership transition.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}
Expand Down
4 changes: 2 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (r *Raft) runLeader() {
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)

// Notify that we are the leader
asyncNotifyBool(r.leaderCh, true)
overrideNotifyBool(r.leaderCh, true)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
Expand Down Expand Up @@ -420,7 +420,7 @@ func (r *Raft) runLeader() {
r.leaderLock.Unlock()

// Notify that we are not the leader
asyncNotifyBool(r.leaderCh, false)
overrideNotifyBool(r.leaderCh, false)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
Expand Down
19 changes: 19 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) {
}
}

// overrideNotifyBool is used to notify on a bool channel
// but override existing value if value is present.
// ch must be 1-item buffered channel.
//
// This method does not support multiple concurrent calls.
func overrideNotifyBool(ch chan bool, v bool) {
select {
case ch <- v:
// value sent, all done
case <-ch:
// channel had an old value
select {
case ch <- v:
default:
panic("race: channel was sent concurrently")
}
}
}

// Decode reverses the encode operation on a byte slice input.
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)
Expand Down
45 changes: 45 additions & 0 deletions util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,48 @@ func TestBackoff(t *testing.T) {
t.Fatalf("bad: %v", b)
}
}

func TestOverrideNotifyBool(t *testing.T) {
ch := make(chan bool, 1)

// sanity check - buffered channel don't have any values
select {
case v := <-ch:
t.Fatalf("unexpected receive: %v", v)
default:
}

// simple case of a single push
overrideNotifyBool(ch, false)
select {
case v := <-ch:
if v != false {
t.Fatalf("expected false but got %v", v)
}
default:
t.Fatalf("expected a value but is not ready")
}

// assert that function never blocks and only last item is received
overrideNotifyBool(ch, false)
overrideNotifyBool(ch, false)
overrideNotifyBool(ch, false)
overrideNotifyBool(ch, false)
overrideNotifyBool(ch, true)

select {
case v := <-ch:
if v != true {
t.Fatalf("expected true but got %v", v)
}
default:
t.Fatalf("expected a value but is not ready")
}

// no further value is available
select {
case v := <-ch:
t.Fatalf("unexpected receive: %v", v)
default:
}
}