Skip to content

Commit

Permalink
fix(dot/netwok): check for duplicate message earlier (#2435)
Browse files Browse the repository at this point in the history
Check if we have seen the block announce notification message before any processing
happens on it

don't report if we fail to find if a protocol is supported
  • Loading branch information
kishansagathiya authored Apr 4, 2022
1 parent 2deb7dd commit d62503f
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 53 deletions.
25 changes: 17 additions & 8 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ func (bm *BlockAnnounceMessage) Decode(in []byte) error {
}

// Hash returns the hash of the BlockAnnounceMessage
func (bm *BlockAnnounceMessage) Hash() common.Hash {
func (bm *BlockAnnounceMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := bm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := bm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
}

return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -144,9 +147,15 @@ func (*BlockAnnounceHandshake) Type() byte {
return 0
}

// Hash ...
func (*BlockAnnounceHandshake) Hash() common.Hash {
return common.Hash{}
// Hash returns blake2b hash of block announce handshake.
func (hs *BlockAnnounceHandshake) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, err := hs.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode handshake: %w", err)
}

return common.Blake2bHash(encMsg)
}

// IsHandshake returns true
Expand Down Expand Up @@ -174,7 +183,7 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err
return errors.New("invalid handshake type")
}

if bhs.GenesisHash != s.blockState.GenesisHash() {
if !bhs.GenesisHash.Equal(s.blockState.GenesisHash()) {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.GenesisMismatch,
Reason: peerset.GenesisMismatchReason,
Expand Down
21 changes: 14 additions & 7 deletions dot/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package network

import (
"fmt"
"sync"

"github.com/ChainSafe/gossamer/internal/log"
Expand All @@ -25,19 +26,25 @@ func newGossip() *gossip {
}
}

// hasSeen broadcasts messages that have not been seen
func (g *gossip) hasSeen(msg NotificationsMessage) bool {
// check if message has not been seen
msgHash := msg.Hash()
// hasSeen checks if we have seen the given message before.
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
msgHash, err := msg.Hash()
if err != nil {
return false, fmt.Errorf("could not hash notification message: %w", err)
}

g.seenMutex.Lock()
defer g.seenMutex.Unlock()

// check if message has not been seen
_, ok := g.seenMap[msgHash]
if !ok {
// set message to has been seen
g.seenMap[msgHash] = struct{}{}
return false
if !msg.IsHandshake() {
g.seenMap[msgHash] = struct{}{}
}
return false, nil
}

return true
return true, nil
}
9 changes: 6 additions & 3 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ func TestGossip(t *testing.T) {

time.Sleep(TestMessageTimeout)

_, ok := nodeB.gossip.seenMap[announceMessage.Hash()]
hash, err := announceMessage.Hash()
require.NoError(t, err)

_, ok := nodeB.gossip.seenMap[hash]
require.True(t, ok, "node B did not receive block request message from node A")

_, ok = nodeC.gossip.seenMap[announceMessage.Hash()]
_, ok = nodeC.gossip.seenMap[hash]
require.True(t, ok, "node C did not receive block request message from node B")

_, ok = nodeA.gossip.seenMap[announceMessage.Hash()]
_, ok = nodeA.gossip.seenMap[hash]
require.True(t, ok, "node A did not receive block request message from node C")
}
12 changes: 7 additions & 5 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Message interface {
type NotificationsMessage interface {
Message
Type() byte
Hash() common.Hash
Hash() (common.Hash, error)
IsHandshake() bool
}

Expand Down Expand Up @@ -389,11 +389,13 @@ func (cm *ConsensusMessage) Decode(in []byte) error {
}

// Hash returns the Hash of ConsensusMessage
func (cm *ConsensusMessage) Hash() common.Hash {
func (cm *ConsensusMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := cm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := cm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down
9 changes: 8 additions & 1 deletion dot/network/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package network

import (
"errors"
"fmt"
"time"

"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error)
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool {
key, err := generateCacheKey(peer, msg)
if err != nil {
logger.Errorf("could not generate cache key: %s", err)
return false
}

Expand All @@ -67,7 +69,12 @@ func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) {
return nil, errors.New("cache does not support handshake messages")
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...))
msgHash, err := msg.Hash()
if err != nil {
return nil, fmt.Errorf("cannot hash notification message: %w", err)
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msgHash.ToBytes()...))
if err != nil {
return nil, err
}
Expand Down
37 changes: 24 additions & 13 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ func (s *Service) createNotificationsMessageHandler(
return fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), msg)
}

hasSeen, err := s.gossip.hasSeen(msg)
if err != nil {
return fmt.Errorf("could not check if message was seen before: %w", err)
}

if hasSeen {
// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}

if msg.IsHandshake() {
logger.Tracef("received handshake on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)
Expand Down Expand Up @@ -207,16 +221,7 @@ func (s *Service) createNotificationsMessageHandler(
return nil
}

if !s.gossip.hasSeen(msg) {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
s.broadcastExcluding(info, peer, msg)
return nil
}
}
Expand All @@ -238,7 +243,13 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support {
support, err := s.host.supportsProtocol(peer, info.protocolID)
if err != nil {
logger.Errorf("could not check if protocol %s is supported by peer %s: %s", info.protocolID, peer, err)
return
}

if !support {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadProtocolValue,
Reason: peerset.BadProtocolReason,
Expand Down Expand Up @@ -319,7 +330,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
peer, info.protocolID, hs)
stream, err := s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Tracef("failed to send message to peer %s: %s", peer, err)
logger.Tracef("failed to send handshake to peer %s: %s", peer, err)
// don't need to close the stream here, as it's nil!
return nil, err
}
Expand All @@ -345,7 +356,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
}

if hsResponse.err != nil {
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
closeOutboundStream(info, peer, stream)
return nil, hsResponse.err
}
Expand Down
16 changes: 9 additions & 7 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
Roles: 4,
BestBlockNumber: 77,
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
// we are using a different genesis here, thus this
// handshake would be validated to be incorrect.
GenesisHash: common.Hash{2},
}

err = handler(stream, testHandshake)
Expand Down Expand Up @@ -367,43 +369,43 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
require.Len(t, txnBatch, 1)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 2)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 3)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 4)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
Expand Down
26 changes: 19 additions & 7 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (tm *TransactionMessage) Decode(in []byte) error {
}

// Hash returns the hash of the TransactionMessage
func (tm *TransactionMessage) Hash() common.Hash {
encMsg, _ := tm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
func (tm *TransactionMessage) Hash() (common.Hash, error) {
encMsg, err := tm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("could not encode message: %w", err)
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -93,8 +95,8 @@ func (*transactionHandshake) Type() byte {
}

// Hash ...
func (*transactionHandshake) Hash() common.Hash {
return common.Hash{}
func (*transactionHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
}

// IsHandshake returns true
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
logger.Warnf("could not handle transaction message: %s", err)
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}
Expand All @@ -137,7 +140,16 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
// TODO: Check if s.gossip.hasSeen should be moved before handleTransactionMessage. #2445
// That we could avoid handling the transactions again, which we would have already seen.

hasSeen, err := s.gossip.hasSeen(txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
logger.Debugf("could not check if message was seen before: %s", err)
continue
}
if !hasSeen {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
Expand Down
4 changes: 4 additions & 0 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {

// ReportPeer reports ReputationChange according to the peer behaviour.
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
for _, pid := range peers {
logger.Debugf("reporting reputation change of %d to peer %s, reason: %s", rep.Value, pid, rep.Reason)
}

h.actionQueue <- action{
actionCall: reportPeer,
reputation: rep,
Expand Down
4 changes: 2 additions & 2 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (*GrandpaHandshake) Type() byte {
}

// Hash ...
func (*GrandpaHandshake) Hash() common.Hash {
return common.Hash{}
func (*GrandpaHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
}

// IsHandshake returns true
Expand Down

0 comments on commit d62503f

Please sign in to comment.