Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed Jun 9, 2021
1 parent d871223 commit b6ca72b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 141 deletions.
195 changes: 114 additions & 81 deletions chain/gssmr/genesis-spec.json

Large diffs are not rendered by default.

19 changes: 14 additions & 5 deletions chain/gssmr/genesis.json

Large diffs are not rendered by default.

35 changes: 11 additions & 24 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ func (n *notificationsProtocol) getHandshakeData(pid peer.ID, inbound bool) (han
return handshakeData{}, false
}

hsData, ok := data.(handshakeData)
if !ok {
panic("data is not handshakeData")
}
return hsData, true
return data.(handshakeData), true
}

type handshakeData struct {
Expand Down Expand Up @@ -211,21 +207,16 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
}

func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtocol, msg NotificationsMessage) {
logger.Debug("checking if peer supports protocol", "protocol", info.protocolID)
if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support {
return
}

logger.Debug("getting peer handshake data", "protocol", info.protocolID)

hsData, has := info.getHandshakeData(peer, false)
if has && !hsData.validated {
// peer has sent us an invalid handshake in the past, ignore
return
}

logger.Debug("got peer handshake data", "protocol", info.protocolID, "hsData", hsData, "has", has)

if !has || !hsData.received || hsData.stream == nil {
if !has {
hsData = newHandshakeData(false, false, nil)
Expand All @@ -234,10 +225,10 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
hsData.Lock()
defer hsData.Unlock()

logger.Debug("sending outbound handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
logger.Trace("sending outbound handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
stream, err := s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Debug("failed to send message to peer", "peer", peer, "error", err)
logger.Trace("failed to send message to peer", "peer", peer, "error", err)
return
}

Expand All @@ -250,21 +241,17 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc

hsTimer := time.NewTimer(handshakeTimeout)

if info.handshakeDecoder == nil {
panic("handshakeDecoder is nil")
}

var hs Handshake
select {
case <-hsTimer.C:
logger.Debug("handshake timeout reached", "protocol", info.protocolID, "peer", peer)
logger.Trace("handshake timeout reached", "protocol", info.protocolID, "peer", peer)
_ = stream.Close()
info.outboundHandshakeData.Delete(peer)
return
case hsResponse := <-s.readHandshake(stream, info.handshakeDecoder):
hsTimer.Stop()
if hsResponse.err != nil {
logger.Debug("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err)
logger.Trace("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Close()
info.outboundHandshakeData.Delete(peer)
return
Expand All @@ -276,15 +263,15 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc

err = info.handshakeValidator(peer, hs)
if err != nil {
logger.Debug("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
hsData.validated = false
info.outboundHandshakeData.Store(peer, hsData)
return
}

hsData.validated = true
info.outboundHandshakeData.Store(peer, hsData)
logger.Debug("sender: validated handshake", "protocol", info.protocolID, "peer", peer)
logger.Trace("sender: validated handshake", "protocol", info.protocolID, "peer", peer)
}

if s.host.messageCache != nil {
Expand All @@ -300,19 +287,19 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}

// we've completed the handshake with the peer, send message directly
logger.Debug("sending message", "protocol", info.protocolID, "peer", peer, "message", msg)
logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg)

err := s.host.writeToStream(hsData.stream, msg)
if err != nil {
logger.Debug("failed to send message to peer", "peer", peer, "error", err)
logger.Trace("failed to send message to peer", "peer", peer, "error", err)
}
}

// broadcastExcluding sends a message to each connected peer except the given peer,
// and peers that have previously sent us the message or who we have already sent the message to.
// used for notifications sub-protocols to gossip a message
func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer.ID, msg NotificationsMessage) {
logger.Debug(
logger.Trace(
"broadcasting message from notifications sub-protocol",
"protocol", info.protocolID,
)
Expand Down Expand Up @@ -343,7 +330,7 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe
close(hsC)
}()

tot, err := readStream(stream, msgBytes[:]) // TODO: I think this will hang forever if the peer never responds, need to add stream to streamManager
tot, err := readStream(stream, msgBytes[:])
if err != nil {
hsC <- &handshakeReader{hs: nil, err: err}
return
Expand Down
25 changes: 0 additions & 25 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func NewMessageHandler(grandpa *Service, blockState BlockState) *MessageHandler
func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.NotificationsMessage, error) {
logger.Trace("handling grandpa message", "msg", m)

if m == nil {
return nil, nil
}

switch m.Type() {
case voteType:
vm, ok := m.(*VoteMessage)
Expand All @@ -68,23 +64,17 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
return h.handleCommitMessage(fm)
}
case neighbourType:
//return nil, nil

nm, ok := m.(*NeighbourMessage)
if !ok {
return nil, nil
}

return nil, h.handleNeighbourMessage(from, nm)
case catchUpRequestType:
//return nil, nil

if r, ok := m.(*catchUpRequest); ok {
return h.handleCatchUpRequest(r)
}
case catchUpResponseType:
//return nil, nil

if r, ok := m.(*catchUpResponse); ok {
return nil, h.handleCatchUpResponse(r)
}
Expand All @@ -96,22 +86,11 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
}

func (h *MessageHandler) handleNeighbourMessage(from peer.ID, msg *NeighbourMessage) error {
logger.Debug("received NeighbourMessage", "msg", msg)
currFinalized, err := h.blockState.GetFinalizedHeader(0, 0)
if err != nil {
return err
}

logger.Debug("handleNeighbourMessage", "currFinalized", currFinalized)

if currFinalized == nil || currFinalized.Number == nil {
panic("currFinalized is nil")
}

if msg == nil {
panic("msg is nil")
}

// ignore neighbour messages where our best finalised number is greater than theirs
if uint32(currFinalized.Number.Int64()) >= msg.Number {
return nil
Expand All @@ -124,10 +103,6 @@ func (h *MessageHandler) handleNeighbourMessage(from peer.ID, msg *NeighbourMess
return err
}

if head == nil {
panic("head is nil")
}

// ignore neighbour messages that are above our head
if int64(msg.Number) > head.Int64() {
return nil
Expand Down
7 changes: 1 addition & 6 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ func (hs *GrandpaHandshake) Decode(in []byte) error {
return err
}

m, ok := msg.(*GrandpaHandshake)
if !ok {
return nil
}

hs.Roles = m.Roles
hs.Roles = msg.(*GrandpaHandshake).Roles
return nil
}

Expand Down

0 comments on commit b6ca72b

Please sign in to comment.