Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(lib/grandpa, dot/network): send CommitMessage directly to peer on round mismatch; cleanup grandpa receiveMessages #1684

Merged
merged 34 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
41b2226
don't reset justification map on each round
noot Jul 6, 2021
f364251
don't put prevotes in justification
noot Jul 6, 2021
819fcda
update babe log
noot Jul 7, 2021
fe5c1c5
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Jul 7, 2021
86cb466
add precommit/prevote blockState getters, add Set/GetLatestRound to g…
noot Jul 7, 2021
de75114
use grandpaState.Get/SetPrevotes/Precommits in grandpa
noot Jul 7, 2021
30dc258
update newCommitMessage to get precommits from db
noot Jul 7, 2021
9c317b4
store precommits on CommitMessage verification
noot Jul 7, 2021
8e741eb
move grandpa.Vote and grandpa.SignedVote to types package
noot Jul 7, 2021
21365e5
lint, fix tests
noot Jul 7, 2021
b93945b
store prevotes and precommits in db on catch up response handling
noot Jul 7, 2021
c40a58e
fix round setting on restart
noot Jul 7, 2021
8b13356
fix round calculation
noot Jul 7, 2021
e8f712b
address comments
noot Jul 8, 2021
3afb196
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Jul 8, 2021
0b59563
rename network SendMessage to GossipMessage
noot Jul 8, 2021
b80ed95
cleanup receiveMessages usage, update gssmr runtime to have c=1/2
noot Jul 8, 2021
11056ee
cleanup
noot Jul 8, 2021
0124355
update grandpa to store from peer.ID for vote messages
noot Jul 8, 2021
163ad5e
fix tests, lint
noot Jul 8, 2021
81c0ccd
potential fix to getBestFinalCandidate
noot Jul 9, 2021
b2b74bc
merge w development
noot Jul 12, 2021
0ca67fc
cleanup
noot Jul 12, 2021
1707689
fix tests
noot Jul 13, 2021
f8080f1
address comments
noot Jul 13, 2021
1d6f752
Merge branch 'development' into noot/grandpa-network
noot Jul 14, 2021
0b503db
Merge branch 'development' into noot/grandpa-network
noot Jul 14, 2021
27f6b4a
fix unit tests
noot Jul 14, 2021
61c4e51
Merge branch 'noot/grandpa-network' of github.com:ChainSafe/gossamer …
noot Jul 14, 2021
3fdf45d
attempt to revert and fix CI
noot Jul 15, 2021
92175ac
merge w development
noot Jul 15, 2021
9d5031e
re-add changes
noot Jul 15, 2021
3580f74
fix unit tests
noot Jul 16, 2021
f235f26
fix tests again??
noot Jul 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions chain/gssmr/genesis.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type TransactionState interface {

// Network is the interface for the network service
type Network interface {
SendMessage(network.NotificationsMessage)
GossipMessage(network.NotificationsMessage)
}

// EpochState is the interface for state.EpochState
Expand Down
5 changes: 2 additions & 3 deletions dot/core/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func TestService_ProcessBlockAnnounceMessage(t *testing.T) {
BestBlock: true,
}

//setup the SendMessage function
net.On("SendMessage", expected)
net.On("GossipMessage", expected)

state, err := s.storageState.TrieState(nil)
require.NoError(t, err)
Expand All @@ -120,7 +119,7 @@ func TestService_ProcessBlockAnnounceMessage(t *testing.T) {
require.NoError(t, err)

time.Sleep(time.Second)
net.AssertCalled(t, "SendMessage", expected)
net.AssertCalled(t, "GossipMessage", expected)
}

func TestService_HandleTransactionMessage(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions dot/core/mocks/network.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions dot/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *Service) HandleBlockProduced(block *types.Block, state *rtstorage.TrieS
BestBlock: true,
}

s.net.SendMessage(msg)
s.net.GossipMessage(msg)
return s.handleBlock(block, state)
}

Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *Service) HandleSubmittedExtrinsic(ext types.Extrinsic) error {

// broadcast transaction
msg := &network.TransactionMessage{Extrinsics: []types.Extrinsic{ext}}
s.net.SendMessage(msg)
s.net.GossipMessage(msg)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func Test_HandshakeTimeout(t *testing.T) {
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
}
nodeA.SendMessage(testHandshakeMsg)
nodeA.GossipMessage(testHandshakeMsg)

go nodeA.sendData(nodeB.host.id(), testHandshakeMsg, info, nil)

Expand Down
21 changes: 19 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ func (s *Service) IsStopped() bool {
return s.ctx.Err() != nil
}

// SendMessage implementation of interface to handle receiving messages
func (s *Service) SendMessage(msg NotificationsMessage) {
// GossipMessage gossips a notifications protocol message to our peers
func (s *Service) GossipMessage(msg NotificationsMessage) {
if s.host == nil || msg == nil || s.IsStopped() {
return
}
Expand Down Expand Up @@ -509,6 +509,23 @@ func (s *Service) SendMessage(msg NotificationsMessage) {
logger.Error("message not supported by any notifications protocol", "msg type", msg.Type())
}

// SendMessage sends a message to the given peer
func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()
for msgID, prtl := range s.notificationsProtocols {
if msg.Type() != msgID || prtl == nil {
continue
}
hs, err := prtl.getHandshake()
if err != nil {
return err
}
s.sendData(to, hs, prtl, msg)
}
return errors.New("message not supported by any notifications protocol")
}

// handleLightStream handles streams with the <protocol-id>/light/2 protocol ID
func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg)
Expand Down
6 changes: 3 additions & 3 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestBroadcastMessages(t *testing.T) {
require.NoError(t, err)

// simulate message sent from core service
nodeA.SendMessage(testBlockAnnounceMessage)
nodeA.GossipMessage(testBlockAnnounceMessage)
time.Sleep(time.Second * 2)
require.NotNil(t, handler.messages[nodeA.host.id()])
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) {

// Only one message will be sent.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
nodeA.GossipMessage(testBlockAnnounceMessage)
time.Sleep(time.Millisecond * 10)
}

Expand All @@ -243,7 +243,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) {

// All 5 message will be sent since cache is disabled.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
nodeA.GossipMessage(testBlockAnnounceMessage)
time.Sleep(time.Millisecond * 10)
}
require.Equal(t, 6, len(handler.messages[nodeA.host.id()]))
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func setupSystemModule(t *testing.T) *SystemModule {

type mockNetwork struct{}

func (n *mockNetwork) SendMessage(_ network.NotificationsMessage) {}
func (n *mockNetwork) GossipMessage(_ network.NotificationsMessage) {}

func newCoreService(t *testing.T, srvc *state.Service) *core.Service {
// setup service
Expand Down
15 changes: 5 additions & 10 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,31 +94,26 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
}

err := bs.setArrivalTime(header.Hash(), time.Now())
if err != nil {
if err := bs.setArrivalTime(header.Hash(), time.Now()); err != nil {
return nil, err
}

err = bs.SetHeader(header)
if err != nil {
if err := bs.SetHeader(header); err != nil {
return nil, err
}

err = bs.db.Put(headerHashKey(header.Number.Uint64()), header.Hash().ToBytes())
if err != nil {
if err := bs.db.Put(headerHashKey(header.Number.Uint64()), header.Hash().ToBytes()); err != nil {
return nil, err
}

err = bs.SetBlockBody(header.Hash(), types.NewBody([]byte{}))
if err != nil {
if err := bs.SetBlockBody(header.Hash(), types.NewBody([]byte{})); err != nil {
return nil, err
}

bs.genesisHash = header.Hash()

// set the latest finalised head to the genesis header
err = bs.SetFinalisedHash(bs.genesisHash, 0, 0)
if err != nil {
if err := bs.SetFinalisedHash(bs.genesisHash, 0, 0); err != nil {
return nil, err
}

Expand Down
94 changes: 10 additions & 84 deletions lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Service struct {
bestFinalCandidate map[uint64]*Vote // map of round number -> best final candidate

// channels for communication with other services
in chan GrandpaMessage // only used to receive *VoteMessage
in chan *networkVoteMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
finalisedChID byte
neighbourMessage *NeighbourMessage // cached neighbour message
Expand Down Expand Up @@ -166,7 +166,7 @@ func NewService(cfg *Config) (*Service, error) {
preVotedBlock: make(map[uint64]*Vote),
bestFinalCandidate: make(map[uint64]*Vote),
head: head,
in: make(chan GrandpaMessage, 128),
in: make(chan *networkVoteMessage, 128),
resumed: make(chan struct{}),
network: cfg.Network,
finalisedCh: finalisedCh,
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s *Service) handleIsPrimary() (bool, error) {
return false, fmt.Errorf("failed to encode finalisation message: %w", err)
}

s.network.SendMessage(msg)
s.network.GossipMessage(msg)
return true, nil
}

Expand All @@ -439,42 +439,24 @@ func (s *Service) primaryBroadcastCommitMessage() {
logger.Warn("failed to encode finalisation message", "error", err)
}

s.network.SendMessage(msg)
s.network.GossipMessage(msg)
}

// playGrandpaRound executes a round of GRANDPA
// at the end of this round, a block will be finalised.
func (s *Service) playGrandpaRound() error {
logger.Debug("starting round", "round", s.state.round, "setID", s.state.setID)

// save start time
start := time.Now()
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()

isPrimary, err := s.handleIsPrimary()
if err != nil {
return err
}

logger.Debug("receiving pre-vote messages...")

go s.receiveMessages(func() bool {
if s.paused.Load().(bool) {
return true
}

end := start.Add(interval * 2)

// ignore err, since if round isn't completable then this will continue
completable, _ := s.isCompletable()

if time.Since(end) >= 0 || completable {
return true
}

return false
})

time.Sleep(interval * 2)
go s.receiveMessages(ctx)
time.Sleep(interval)

if s.paused.Load().(bool) {
return ErrServicePaused
Expand All @@ -494,28 +476,14 @@ func (s *Service) playGrandpaRound() error {
if !isPrimary {
s.prevotes.Store(s.publicKeyBytes(), spv)
}
logger.Debug("sending pre-vote message...", "vote", pv)

logger.Debug("sending pre-vote message...", "vote", pv)
roundComplete := make(chan struct{})

// continue to send prevote messages until round is done
go s.sendVoteMessage(prevote, vm, roundComplete)

logger.Debug("receiving pre-commit messages...")

go s.receiveMessages(func() bool {
end := start.Add(interval * 4)

// ignore err, since if round isn't completable then this will continue
completable, _ := s.isCompletable()

if time.Since(end) >= 0 || completable {
return true
}

return false
})

time.Sleep(interval)

if s.paused.Load().(bool) {
Expand All @@ -539,44 +507,6 @@ func (s *Service) playGrandpaRound() error {
// continue to send precommit messages until round is done
go s.sendVoteMessage(precommit, pcm, roundComplete)

go func() {
// receive messages until current round is completable and previous round is finalisable
// and the last finalised block is greater than the best final candidate from the previous round
s.receiveMessages(func() bool {
if s.paused.Load().(bool) {
return true
}

completable, err := s.isCompletable() //nolint
if err != nil {
return false
}

round := s.state.round
finalisable, err := s.isFinalisable(round)
if err != nil {
return false
}

s.mapLock.Lock()
prevBfc := s.bestFinalCandidate[s.state.round-1]
s.mapLock.Unlock()

// this shouldn't happen as long as playGrandpaRound is called through initiate
if prevBfc == nil {
return false
}

if completable && finalisable && uint32(s.head.Number.Int64()) >= prevBfc.Number {
return true
}

return false
})
}()

time.Sleep(interval)

err = s.attemptToFinalize()
if err != nil {
logger.Error("failed to finalise", "error", err)
Expand Down Expand Up @@ -671,7 +601,7 @@ func (s *Service) attemptToFinalize() error {
}

logger.Debug("sending CommitMessage", "msg", cm)
s.network.SendMessage(msg)
s.network.GossipMessage(msg)
return nil
}
}
Expand Down Expand Up @@ -947,10 +877,6 @@ func (s *Service) getBestFinalCandidate() (*Vote, error) {

// if there are multiple blocks, get the one with the highest number
// that is also an ancestor of the prevoted block (or is the prevoted block)
if blocks[prevoted.Hash] != 0 {
return &prevoted, nil
}

bfc := &Vote{
Number: 0,
}
Expand Down
5 changes: 4 additions & 1 deletion lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
vm, ok := m.(*VoteMessage)
if h.grandpa != nil && ok {
// send vote message to grandpa service
h.grandpa.in <- vm
h.grandpa.in <- &networkVoteMessage{
from: from,
msg: vm,
}
}
return nil, nil
case commitType:
Expand Down
2 changes: 1 addition & 1 deletion lib/grandpa/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestMessageHandler_VoteMessage(t *testing.T) {

select {
case vote := <-gs.in:
require.Equal(t, vm, vote)
require.Equal(t, vm, vote.msg)
case <-time.After(time.Second):
t.Fatal("did not receive VoteMessage")
}
Expand Down
Loading