From 9635a9597ce29154677be240184a5c4b9ad6ecbd Mon Sep 17 00:00:00 2001 From: Guilhem Fanton <8671905+gfanton@users.noreply.github.com> Date: Thu, 16 May 2024 18:13:51 +0200 Subject: [PATCH] fix(bft): correctly drain channels events (#1515) Co-authored-by: Manfred Touron <94029+moul@users.noreply.github.com> --- tm2/pkg/bft/consensus/common_test.go | 46 +++++++++++++++++ tm2/pkg/bft/consensus/state_test.go | 75 ++++++++++++++++++---------- 2 files changed, 96 insertions(+), 25 deletions(-) diff --git a/tm2/pkg/bft/consensus/common_test.go b/tm2/pkg/bft/consensus/common_test.go index d4c572c6bda..f657bf3b6d9 100644 --- a/tm2/pkg/bft/consensus/common_test.go +++ b/tm2/pkg/bft/consensus/common_test.go @@ -7,6 +7,7 @@ import ( "os" "path" "path/filepath" + "reflect" "sort" "sync" "testing" @@ -762,3 +763,48 @@ func newPersistentKVStore() abci.Application { func newPersistentKVStoreWithPath(dbDir string) abci.Application { return kvstore.NewPersistentKVStoreApplication(dbDir) } + +// ------------------------------------ + +func ensureDrainedChannels(t *testing.T, channels ...any) { + t.Helper() + + r := recover() + if r == nil { + return + } + + t.Logf("checking for drained channel") + leaks := make(map[string]int) + for _, ch := range channels { + chVal := reflect.ValueOf(ch) + if chVal.Kind() != reflect.Chan { + panic(chVal.Type().Name() + " not a channel") + } + + maxExp := time.After(time.Second * 5) + + // Use a select statement with reflection + cases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: chVal}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(maxExp)}, + {Dir: reflect.SelectDefault}, + } + + for { + chosen, recv, recvOK := reflect.Select(cases) + if chosen != 0 || !recvOK { + break + } + + leaks[reflect.TypeOf(recv.Interface()).String()]++ + time.Sleep(time.Millisecond * 500) + } + } + + for leak, count := range leaks { + t.Logf("channel %q: %d events left\n", leak, count) + } + + panic(r) +} diff --git a/tm2/pkg/bft/consensus/state_test.go b/tm2/pkg/bft/consensus/state_test.go index 35877837ab3..201cf8906b3 100644 --- a/tm2/pkg/bft/consensus/state_test.go +++ b/tm2/pkg/bft/consensus/state_test.go @@ -76,10 +76,14 @@ func TestStateProposerSelection0(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, newRoundCh) // Wait for new round so proposer is set. ensureNewRound(newRoundCh, height, round) + // Wait for complete proposal. + ensureNewProposal(proposalCh, height, round) + // Commit a block and ensure proposer for the next height is correct. prop := cs1.GetRoundState().Validators.GetProposer() address := cs1.privValidator.GetPubKey().Address() @@ -87,9 +91,6 @@ func TestStateProposerSelection0(t *testing.T) { t.Fatalf("expected proposer to be validator %d. Got %X", 0, prop.Address) } - // Wait for complete proposal. - ensureNewProposal(proposalCh, height, round) - rs := cs1.GetRoundState() signAddVotes(cs1, types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:]...) @@ -121,6 +122,7 @@ func TestStateProposerSelection2(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, newRoundCh) ensureNewRound(newRoundCh, height, round) // wait for the new round @@ -156,7 +158,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) { cs.Stop() cs.Wait() }() - + defer ensureDrainedChannels(t, timeoutCh) // if we're not a validator, EnterPropose should timeout ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) @@ -183,6 +185,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) { cs.Stop() cs.Wait() }() + defer ensureDrainedChannels(t, proposalCh, newRoundCh, timeoutCh) // Wait for new round so proposer is set. ensureNewRound(newRoundCh, height, round) @@ -247,6 +250,7 @@ func TestStateBadProposal(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, voteCh) // wait for proposal ensureProposal(proposalCh, height, round, blockID) @@ -285,10 +289,12 @@ func TestStateFullRound1(t *testing.T) { cs.Stop() cs.Wait() }() + defer ensureDrainedChannels(t, newRoundCh, voteCh, propCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(propCh, height, round) + propBlockHash := cs.GetRoundState().ProposalBlock.Hash() ensurePrevote(voteCh, height, round) // wait for prevote @@ -319,6 +325,7 @@ func TestStateFullRoundNil(t *testing.T) { cs.Stop() cs.Wait() }() + defer ensureDrainedChannels(t, voteCh) ensurePrevote(voteCh, height, round) // prevote ensurePrecommit(voteCh, height, round) // precommit @@ -345,6 +352,7 @@ func TestStateFullRound2(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, voteCh, newBlockCh) ensurePrevote(voteCh, height, round) // prevote @@ -390,6 +398,8 @@ func TestStateLockNoPOL(t *testing.T) { proposalCh := subscribe(cs1.evsw, cstypes.EventCompleteProposal{}) newRoundCh := subscribe(cs1.evsw, cstypes.EventNewRound{}) + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, timeoutProposeCh, newRoundCh, voteCh) + /* Round1 (cs1, B) // B B // B B2 */ @@ -400,18 +410,18 @@ func TestStateLockNoPOL(t *testing.T) { ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) // prevote roundState := cs1.GetRoundState() theBlockHash := roundState.ProposalBlock.Hash() thePartSetHeader := roundState.ProposalBlockParts.Header() - - ensurePrevote(voteCh, height, round) // prevote + validatePrevote(cs1, round, vss[0], theBlockHash) // we should now be stuck in limbo forever, waiting for more prevotes // prevote arrives from vs2: signAddVotes(cs1, types.PrevoteType, theBlockHash, thePartSetHeader, vs2) - ensurePrevote(voteCh, height, round) // prevote - + ensurePrevote(voteCh, height, round) // prevote ensurePrecommit(voteCh, height, round) // precommit + // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, round, round, vss[0], theBlockHash, theBlockHash) @@ -441,14 +451,13 @@ func TestStateLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds()) + // wait to finish prevote + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() - if rs.ProposalBlock != nil { panic("Expected proposal block to be nil") } - // wait to finish prevote - ensurePrevote(voteCh, height, round) // we should have prevoted our locked block validatePrevote(cs1, round, vss[0], rs.LockedBlock.Hash()) @@ -481,8 +490,8 @@ func TestStateLockNoPOL(t *testing.T) { */ incrementRound(vs2) - ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) // prevote rs = cs1.GetRoundState() // now we're on a new round and are the proposer @@ -490,9 +499,7 @@ func TestStateLockNoPOL(t *testing.T) { panic(fmt.Sprintf("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock)) } - ensurePrevote(voteCh, height, round) // prevote validatePrevote(cs1, round, vss[0], rs.LockedBlock.Hash()) - signAddVotes(cs1, types.PrevoteType, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2) ensurePrevote(voteCh, height, round) @@ -579,14 +586,16 @@ func TestStateLockPOLRelock(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, newRoundCh, newBlockCh, voteCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) // prevote rs := cs1.GetRoundState() theBlockHash := rs.ProposalBlock.Hash() theBlockParts := rs.ProposalBlockParts.Header() - ensurePrevote(voteCh, height, round) // prevote + validatePrevote(cs1, round, vss[0], theBlockHash) signAddVotes(cs1, types.PrevoteType, theBlockHash, theBlockParts, vs2, vs3, vs4) @@ -676,14 +685,15 @@ func TestStateLockPOLUnlock(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, newRoundCh, voteCh, unlockCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() theBlockHash := rs.ProposalBlock.Hash() theBlockParts := rs.ProposalBlockParts.Header() - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], theBlockHash) signAddVotes(cs1, types.PrevoteType, theBlockHash, theBlockParts, vs2, vs3, vs4) @@ -770,13 +780,14 @@ func TestStateLockPOLSafety1(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, timeoutProposeCh, newRoundCh, voteCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() propBlock := rs.ProposalBlock - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], propBlock.Hash()) // the others sign a polka but we don't see it @@ -811,17 +822,15 @@ func TestStateLockPOLSafety1(t *testing.T) { // a polka happened but we didn't see it! */ + // go to prevote, prevote for proposal block ensureNewProposal(proposalCh, height, round) - + ensurePrevote(voteCh, height, round) rs = cs1.GetRoundState() - if rs.LockedBlock != nil { panic("we should not be locked!") } t.Logf("new prop hash %v", fmt.Sprintf("%X", propBlockHash)) - // go to prevote, prevote for proposal block - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], propBlockHash) // now we see the others prevote for it, so we should lock on it @@ -854,6 +863,7 @@ func TestStateLockPOLSafety1(t *testing.T) { validatePrevote(cs1, round, vss[0], propBlockHash) newStepCh := subscribe(cs1.evsw, cstypes.EventNewRoundStep{}) + defer ensureDrainedChannels(t, newStepCh) // before prevotes from the previous round are added // add prevotes from the earlier round @@ -912,6 +922,7 @@ func TestStateLockPOLSafety2(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, newRoundCh, unlockCh, voteCh) ensureNewRound(newRoundCh, height, round) if err := cs1.SetProposalAndBlock(prop1, propBlock1, propBlockParts1, "some peer"); err != nil { @@ -992,13 +1003,15 @@ func TestProposeValidBlock(t *testing.T) { cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, timeoutProposeCh, newRoundCh, unlockCh, voteCh) + ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() propBlock := rs.ProposalBlock propBlockHash := propBlock.Hash() - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], propBlockHash) // the others sign a polka @@ -1055,7 +1068,7 @@ func TestProposeValidBlock(t *testing.T) { t.Log("### ONTO ROUND 4") ensureNewProposal(proposalCh, height, round) - + ensurePrevote(voteCh, height, round) rs = cs1.GetRoundState() assert.True(t, bytes.Equal(rs.ProposalBlock.Hash(), propBlockHash)) assert.True(t, bytes.Equal(rs.ProposalBlock.Hash(), rs.ValidBlock.Hash())) @@ -1087,15 +1100,16 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutWaitCh, newRoundCh, validBlockCh, voteCh, proposalCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() propBlock := rs.ProposalBlock propBlockHash := propBlock.Hash() propBlockParts := propBlock.MakePartSet(partSize) - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], propBlockHash) // vs2 send prevote for propBlock @@ -1156,6 +1170,7 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutWaitCh, timeoutProposeCh, newRoundCh, validBlockCh, voteCh, proposalCh) ensureNewRound(newRoundCh, height, round) ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds()) @@ -1207,6 +1222,7 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutWaitCh, newRoundCh) ensureNewRound(newRoundCh, height, round) signAddVotes(cs1, types.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) @@ -1236,6 +1252,7 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutWaitCh, newRoundCh, voteCh) ensureNewRound(newRoundCh, height, round) ensurePrevote(voteCh, height, round) @@ -1276,6 +1293,7 @@ func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutWaitCh, newRoundCh, voteCh) ensureNewRound(newRoundCh, height, round) ensurePrevote(voteCh, height, round) @@ -1316,6 +1334,7 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, timeoutProposeCh, newRoundCh, voteCh) ensureNewRound(newRoundCh, height, round) incrementRound(vss[1:]...) @@ -1353,6 +1372,7 @@ func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, newRoundCh, newRoundCh, validBlockCh) ensureNewRound(newRoundCh, height, round) // vs2, vs3 and vs4 send precommit for propBlock @@ -1391,6 +1411,7 @@ func TestCommitFromPreviousRound(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, newRoundCh, validBlockCh) ensureNewRound(newRoundCh, height, round) // vs2, vs3 and vs4 send precommit for propBlock for the previous round @@ -1446,14 +1467,15 @@ func TestStartNextHeightCorrectly(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, newRoundCh, voteCh, newBlockHeader) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) + ensurePrevote(voteCh, height, round) rs := cs1.GetRoundState() theBlockHash := rs.ProposalBlock.Hash() theBlockParts := rs.ProposalBlockParts.Header() - ensurePrevote(voteCh, height, round) validatePrevote(cs1, round, vss[0], theBlockHash) signAddVotes(cs1, types.PrevoteType, theBlockHash, theBlockParts, vs2, vs3, vs4) @@ -1478,6 +1500,7 @@ func TestStartNextHeightCorrectly(t *testing.T) { height, round = height+1, 0 ensureNewRound(newRoundCh, height, round) ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds()) + ensurePrevote(voteCh, height, round) rs = cs1.GetRoundState() assert.False(t, rs.TriggeredTimeoutPrecommit, "triggeredTimeoutPrecommit should be false at the beginning of each round") } @@ -1507,6 +1530,7 @@ func TestFlappyResetTimeoutPrecommitUponNewHeight(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, newRoundCh, voteCh, newBlockHeader) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round) @@ -1653,6 +1677,7 @@ func TestFlappyStateHalt1(t *testing.T) { cs1.Stop() cs1.Wait() }() + defer ensureDrainedChannels(t, proposalCh, timeoutWaitCh, newRoundCh, voteCh, newBlockCh) ensureNewRound(newRoundCh, height, round) ensureNewProposal(proposalCh, height, round)