-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: feat(network): Reuse stream for notification message. #1537
Conversation
@@ -198,7 +199,7 @@ func (s *Service) getBlockAnnounceHandshake() (Handshake, error) { | |||
}, nil | |||
} | |||
|
|||
func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) error { | |||
func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake, stream libp2pnetwork.Stream) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need to add stream
in this function as that would require changing all the handler functions, it should be stored in the createNotificationsMessageHandler
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing this function signature would also require exposing streams to external packages such as grandpa which we don't want
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (err error) { | ||
// get outbound stream for given peer | ||
s := h.getOutboundStream(p, pid) | ||
func (h *host) send(p peer.ID, pid protocol.ID, msg Message, stream libp2pnetwork.Stream) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this function need to be changed? generally if we know about the stream we can use writeToStream
, if we're opening it for the first time we can use send
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also this won't this panic if stream
is nil which it seems to be in the tests? based on the test of the code this can be changed back
info.handshakeData.Store(peer, &handshakeData{ | ||
validated: true, | ||
received: true, | ||
stream: stream, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are storing the stream in this function (which is good) we don't need to pass it to handshakeValidator
@@ -202,7 +207,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |||
seen := s.gossip.hasSeen(msg) | |||
if !seen { | |||
// TODO: update this to write to stream w/ handshake established | |||
s.broadcastExcluding(info, peer, msg) | |||
s.broadcastExcluding(info, peer, msg, &stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we passing the stream? the stream is specific to the current peer, whereas broadcastExcluding
sends to our other peers. as well the streams should be stored in the handshakeData
@@ -241,7 +246,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |||
}) | |||
|
|||
logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs) | |||
err = s.host.send(peer, info.protocolID, hs) | |||
err = s.host.send(peer, info.protocolID, hs, *stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the stream here, the peer
and stream
do not correspond to the same peer, since we are iterating over all our peers
@@ -258,7 +263,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |||
|
|||
// we've already completed the handshake with the peer, send message directly | |||
logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg) | |||
err = s.host.send(peer, info.protocolID, msg) | |||
err = s.host.send(peer, info.protocolID, msg, *stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, remove stream
from send
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general this function needs to check notificationsProtocol.handshakeData[peer]
for the stream if it already exists for this peer, then use writeToStream
if the stream is opened and established, otherwise it should use send
to open a new stream and send the handshake (which is what was previously happening)
@@ -211,7 +216,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |||
|
|||
// gossipExcluding sends a message to each connected peer except the given peer | |||
// Used for notifications sub-protocols to gossip a message | |||
func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer.ID, msg NotificationsMessage) { | |||
func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer.ID, msg NotificationsMessage, stream *libp2pnetwork.Stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove stream
@@ -42,7 +42,7 @@ type ( | |||
HandshakeDecoder = func([]byte) (Handshake, error) | |||
|
|||
// HandshakeValidator validates a handshake. It returns an error if it is invalid | |||
HandshakeValidator = func(peer.ID, Handshake) error | |||
HandshakeValidator = func(peer.ID, Handshake, libp2pnetwork.Stream) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove stream
Closing this PR as the same has been implemented in #1545 |
Changes
Tests
Issues