Skip to content

Commit

Permalink
FAB-1930 Subscription-like API to leader election
Browse files Browse the repository at this point in the history
Peer leadership status change callback signature
This callback invoked once peer become leader or give up on leadership
And it passed as argument to NewLeaderElectionService, can be nil

Change-Id: Ib0ef831f84428e3505cd4a69feb6135b57660e1d
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Jan 29, 2017
1 parent 36bbeb6 commit 1c0ecbd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
16 changes: 15 additions & 1 deletion gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type LeaderElectionAdapter interface {
Peers() []Peer
}

type leadershipCallback func(isLeader bool)

// LeaderElectionService is the object that runs the leader election algorithm
type LeaderElectionService interface {
// IsLeader returns whether this peer is a leader or not
Expand All @@ -127,8 +129,11 @@ type Msg interface {
IsDeclaration() bool
}

func noopCallback(_ bool) {
}

// NewLeaderElectionService returns a new LeaderElectionService
func NewLeaderElectionService(adapter LeaderElectionAdapter, id string) LeaderElectionService {
func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService {
if len(id) == 0 {
panic(fmt.Errorf("Empty id"))
}
Expand All @@ -139,7 +144,13 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string) LeaderEl
stopChan: make(chan struct{}, 1),
interruptChan: make(chan struct{}, 1),
logger: logging.MustGetLogger("LeaderElection"),
callback: noopCallback,
}

if callback != nil {
le.callback = callback
}

// TODO: This will be configured using the core.yaml when FAB-1217 (Integrate peer logging with gossip logging) is done
logging.SetLevel(logging.WARNING, "LeaderElection")
go le.start()
Expand All @@ -160,6 +171,7 @@ type leaderElectionSvcImpl struct {
sleeping bool
adapter LeaderElectionAdapter
logger *logging.Logger
callback leadershipCallback
}

func (le *leaderElectionSvcImpl) start() {
Expand Down Expand Up @@ -357,11 +369,13 @@ func (le *leaderElectionSvcImpl) IsLeader() bool {
func (le *leaderElectionSvcImpl) beLeader() {
le.logger.Info(le.id, ": Becoming a leader")
atomic.StoreInt32(&le.isLeader, int32(1))
le.callback(true)
}

func (le *leaderElectionSvcImpl) stopBeingLeader() {
le.logger.Info(le.id, "Stopped being a leader")
atomic.StoreInt32(&le.isLeader, int32(0))
le.callback(false)
}

func (le *leaderElectionSvcImpl) shouldStop() bool {
Expand Down
60 changes: 50 additions & 10 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ func (m *msg) IsDeclaration() bool {
type peer struct {
mockedMethods map[string]struct{}
mock.Mock
id string
peers map[string]*peer
sharedLock *sync.RWMutex
msgChan chan Msg
id string
peers map[string]*peer
sharedLock *sync.RWMutex
msgChan chan Msg
isLeaderFromCallback bool
callbackInvoked bool
LeaderElectionService
}

Expand Down Expand Up @@ -126,6 +128,11 @@ func (p *peer) Peers() []Peer {
return peers
}

func (p *peer) leaderCallback(isLeader bool) {
p.isLeaderFromCallback = isLeader
p.callbackInvoked = true
}

func createPeers(spawnInterval time.Duration, ids ...int) []*peer {
peers := make([]*peer, len(ids))
peerMap := make(map[string]*peer)
Expand All @@ -143,16 +150,16 @@ func createPeers(spawnInterval time.Duration, ids ...int) []*peer {
func createPeer(id int, peerMap map[string]*peer, l *sync.RWMutex) *peer {
idStr := fmt.Sprintf("p%d", id)
c := make(chan Msg, 100)
p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{})}
p.LeaderElectionService = NewLeaderElectionService(p, idStr)
p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), isLeaderFromCallback: false, callbackInvoked: false}
p.LeaderElectionService = NewLeaderElectionService(p, idStr, p.leaderCallback)
l.Lock()
peerMap[idStr] = p
l.Unlock()
return p

}

func waitForLeaderElection(t *testing.T, peers []*peer) []string {
func waitForMultipleLeadersElection(t *testing.T, peers []*peer, leadersNum int) []string {
end := time.Now().Add(testTimeout)
for time.Now().Before(end) {
var leaders []string
Expand All @@ -161,7 +168,7 @@ func waitForLeaderElection(t *testing.T, peers []*peer) []string {
leaders = append(leaders, p.id)
}
}
if len(leaders) > 0 {
if len(leaders) >= leadersNum {
return leaders
}
time.Sleep(testPollInterval)
Expand All @@ -170,6 +177,10 @@ func waitForLeaderElection(t *testing.T, peers []*peer) []string {
return nil
}

func waitForLeaderElection(t *testing.T, peers []*peer) []string {
return waitForMultipleLeadersElection(t, peers, 1)
}

func TestInitPeersAtSameTime(t *testing.T) {
t.Parallel()
// Scenario: Peers are spawned at the same time
Expand All @@ -179,6 +190,7 @@ func TestInitPeersAtSameTime(t *testing.T) {
leaders := waitForLeaderElection(t, peers)
isP0leader := peers[len(peers)-1].IsLeader()
assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders)
assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "p0 didn't got leaderhip change callback invoked")
assert.Len(t, leaders, 1, "More than 1 leader elected")
}

Expand Down Expand Up @@ -257,6 +269,18 @@ func TestConvergence(t *testing.T) {
finalLeaders := waitForLeaderElection(t, combinedPeers)
assert.Len(t, finalLeaders, 1, "Combined peer group was suppose to have 1 leader exactly")
assert.Equal(t, leaders1[0], finalLeaders[0], "Combined peer group has different leader than expected:")

for _, p := range combinedPeers {
if p.id == finalLeaders[0] {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id)
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id)
} else {
assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id)
if p.id == leaders2[0] {
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id)
}
}
}
}

func TestLeadershipTakeover(t *testing.T) {
Expand Down Expand Up @@ -286,20 +310,36 @@ func TestPartition(t *testing.T) {
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "Leadership callback result is wrong for %s", peers[len(peers)-1].id)

for _, p := range peers {
p.On("Peers").Return([]Peer{})
p.On("Gossip", mock.Anything)
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, len(leaders))
leaders = waitForMultipleLeadersElection(t, peers, 6)
assert.Len(t, leaders, 6)
for _, p := range peers {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id)
}

for _, p := range peers {
p.sharedLock.Lock()
p.mockedMethods = make(map[string]struct{})
p.callbackInvoked = false
p.sharedLock.Unlock()
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
for _, p := range peers {
if p.id == leaders[0] {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %", p.id)
} else {
assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id)
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for %s", p.id)
}
}

}

0 comments on commit 1c0ecbd

Please sign in to comment.