Skip to content

Commit

Permalink
feat(dot/network): request block justifications when near head (#1499)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Apr 5, 2021
1 parent a6e0bd0 commit ae7012b
Show file tree
Hide file tree
Showing 18 changed files with 455 additions and 94 deletions.
16 changes: 8 additions & 8 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
return errors.New("genesis hash mismatch")
}

// if peer has higher best block than us, begin syncing
latestHeader, err := s.blockState.BestBlockHeader()
if err != nil {
return err
}

bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))

np, ok := s.notificationsProtocols[BlockAnnounceMsgType]
if !ok {
// this should never happen.
Expand All @@ -239,6 +231,14 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err

data.handshake = hs

// if peer has higher best block than us, begin syncing
latestHeader, err := s.blockState.BestBlockHeader()
if err != nil {
return err
}

bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))

// check if peer block number is greater than host block number
if latestHeader.Number.Cmp(bestBlockNum) >= 0 {
return nil
Expand Down
8 changes: 4 additions & 4 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (h *host) bootstrap() {
// peer (gets the already opened outbound message stream or opens a new one).
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (err error) {
// get outbound stream for given peer
s := h.getStream(p, pid)
s := h.getOutboundStream(p, pid)

// check if stream needs to be opened
if s == nil {
Expand Down Expand Up @@ -286,10 +286,10 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
return err
}

// getStream returns the outbound message stream for the given peer or returns
// getOutboundStream returns the outbound message stream for the given peer or returns
// nil if no outbound message stream exists. For each peer, each host opens an
// outbound message stream and writes to the same stream until closed or reset.
func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
func (h *host) getOutboundStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
conns := h.h.Network().ConnsToPeer(p)

// loop through connections (only one for now)
Expand All @@ -310,7 +310,7 @@ func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Strea

// closeStream closes a stream open to the peer with the given sub-protocol, if it exists.
func (h *host) closeStream(p peer.ID, pid protocol.ID) {
stream := h.getStream(p, pid)
stream := h.getOutboundStream(p, pid)
if stream != nil {
_ = stream.Close()
}
Expand Down
10 changes: 5 additions & 5 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestExistingStream(t *testing.T) {
}
require.NoError(t, err)

stream := nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream := nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.Nil(t, stream, "node A should not have an outbound stream")

// node A opens the stream to send the first message
Expand All @@ -279,15 +279,15 @@ func TestExistingStream(t *testing.T) {
time.Sleep(TestMessageTimeout)
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node A should have an outbound stream")

// node A uses the stream to send a second message
err = nodeA.host.send(addrInfosB[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
require.NoError(t, err)
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")

// node B opens the stream to send the first message
Expand All @@ -297,15 +297,15 @@ func TestExistingStream(t *testing.T) {
time.Sleep(TestMessageTimeout)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")

stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")

// node B uses the stream to send a second message
err = nodeB.host.send(addrInfosA[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
require.NoError(t, err)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")

stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")
}

Expand Down
15 changes: 10 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
return errors.New("message is not NotificationsMessage")
}

logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

if msg.IsHandshake() {
logger.Trace("received handshake on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

hs, ok := msg.(Handshake)
if !ok {
return errors.New("failed to convert message to Handshake")
Expand Down Expand Up @@ -186,6 +186,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
return nil
}

logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

err := messageHandler(peer, msg)
if err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) {
defer info.mapMu.RUnlock()

peer := conn.RemotePeer()
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received {
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint
info.handshakeData.Store(peer, &handshakeData{
validated: false,
})
Expand Down Expand Up @@ -428,6 +428,9 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,

info := s.notificationsProtocols[messageID]

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)

s.host.registerStreamHandlerWithOverwrite(sub, overwriteProtocol, func(stream libp2pnetwork.Stream) {
logger.Trace("received stream", "sub-protocol", sub)
conn := stream.Conn()
Expand All @@ -437,10 +440,6 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
}

p := conn.RemotePeer()

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)

s.readStream(stream, p, decoder, handlerWithValidate)
})

Expand Down Expand Up @@ -537,7 +536,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
// decode message based on message type
msg, err := decoder(msgBytes[:tot], peer)
if err != nil {
logger.Trace("Failed to decode message from peer", "peer", peer, "err", err)
logger.Trace("failed to decode message from peer", "protocol", stream.Protocol(), "err", err)
continue
}

Expand Down
1 change: 1 addition & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BlockState interface {
GenesisHash() common.Hash
HasBlockBody(common.Hash) (bool, error)
GetFinalizedHeader(round, setID uint64) (*types.Header, error)
GetHashByNumber(num *big.Int) (common.Hash, error)
}

// Syncer is implemented by the syncing service
Expand Down
4 changes: 4 additions & 0 deletions dot/network/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ func (mbs *MockBlockState) HasBlockBody(common.Hash) (bool, error) {
func (mbs *MockBlockState) GetFinalizedHeader(_, _ uint64) (*types.Header, error) {
return mbs.BestBlockHeader()
}

func (mbs *MockBlockState) GetHashByNumber(_ *big.Int) (common.Hash, error) {
return common.Hash{}, nil
}
Loading

0 comments on commit ae7012b

Please sign in to comment.