Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub v1.2: IDONTWANT control message and priority queue. #553

Merged
merged 31 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6877d7b
Replace sending channel with the smart rpcQueue
ppopth Jan 13, 2024
7ddc486
Implement UrgentPush in the smart rpcQueue
ppopth Feb 22, 2024
dc45ebb
Add IDONTWANT to rpc.proto and trace.proto
ppopth Feb 28, 2024
f06b3b2
Send IDONTWANT right before validation step
ppopth Feb 29, 2024
22d9773
Test GossipSub IDONWANT sending
ppopth Mar 1, 2024
62daad4
Send IDONWANT only for large messages
ppopth Mar 2, 2024
53b1393
Handle IDONTWANT control messages
ppopth May 1, 2024
4e930a7
Clear expired message IDs from the IDONTWANT cache
ppopth Jun 11, 2024
1afead5
Keep the hashes of IDONTWANT message ids instead
ppopth Jun 11, 2024
6fabcdd
Increase GossipSubMaxIHaveMessages to 1000
ppopth Aug 9, 2024
4db112f
fixup! Clear expired message IDs from the IDONTWANT cache
ppopth Aug 9, 2024
24be1a7
Not send IDONTWANT if the receiver doesn't support
ppopth Aug 10, 2024
0aaac6a
fixup! Replace sending channel with the smart rpcQueue
ppopth Aug 11, 2024
55abc3b
Not use pointers in rpcQueue
ppopth Aug 13, 2024
d1aa4d9
Simply rcpQueue by using only one mutex
ppopth Aug 13, 2024
1914320
Check ctx error in rpc sending worker
ppopth Aug 13, 2024
810f65f
fixup! Simply rcpQueue by using only one mutex
ppopth Aug 14, 2024
0c507ef
fixup! Keep the hashes of IDONTWANT message ids instead
ppopth Aug 14, 2024
97fa9b0
Use AfterFunc instead implementing our own
ppopth Aug 14, 2024
3ae4239
Fix misc lint errors
ppopth Aug 14, 2024
8e49e05
fixup! Fix misc lint errors
ppopth Aug 14, 2024
6de02f3
Revert "Increase GossipSubMaxIHaveMessages to 1000"
ppopth Aug 15, 2024
e5704b6
Increase GossipSubMaxIDontWantMessages to 1000
ppopth Aug 15, 2024
f4b05f4
fixup! Handle IDONTWANT control messages
ppopth Aug 15, 2024
e5c971c
Skip TestGossipsubConnTagMessageDeliveries
ppopth Aug 15, 2024
f141e13
Skip FuzzAppendOrMergeRPC
ppopth Aug 15, 2024
0d9f553
Revert "Skip FuzzAppendOrMergeRPC"
ppopth Aug 16, 2024
baba9e1
fixup! Send IDONWANT only for large messages
ppopth Aug 16, 2024
9bdeb9b
fixup! fixup! Keep the hashes of IDONTWANT message ids instead
ppopth Aug 16, 2024
6491d8e
fixup! Implement UrgentPush in the smart rpcQueue
ppopth Aug 16, 2024
79dcdc8
fixup! Use AfterFunc instead implementing our own
ppopth Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *PubSub) notifyPeerDead(pid peer.ID) {
}
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing *rpcQueue) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)
Expand All @@ -135,7 +135,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing *rpcQueue) {
select {
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
Expand All @@ -156,7 +156,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
p.notifyPeerDead(pid)
}

func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) {
writeRpc := func(rpc *RPC) error {
size := uint64(rpc.Size())

Expand All @@ -174,20 +174,17 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
}

defer s.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
for ctx.Err() == nil {
rpc, err := outgoing.Pop(ctx)
if err != nil {
log.Debugf("popping message from the queue to send to %s: %s", s.Conn().RemotePeer(), err)
return
ppopth marked this conversation as resolved.
Show resolved Hide resolved
}

err := writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
case <-ctx.Done():
err = writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
}
Expand All @@ -209,15 +206,17 @@ func rpcWithControl(msgs []*pb.Message,
ihave []*pb.ControlIHave,
iwant []*pb.ControlIWant,
graft []*pb.ControlGraft,
prune []*pb.ControlPrune) *RPC {
prune []*pb.ControlPrune,
idontwant []*pb.ControlIDontWant) *RPC {
return &RPC{
RPC: pb.RPC{
Publish: msgs,
Control: &pb.ControlMessage{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
Idontwant: idontwant,
},
},
}
Expand Down
12 changes: 7 additions & 5 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}

func (fs *FloodSubRouter) PreValidation([]*Message) {}

func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}

func (fs *FloodSubRouter) Publish(msg *Message) {
Expand All @@ -83,19 +85,19 @@ func (fs *FloodSubRouter) Publish(msg *Message) {
continue
}

mch, ok := fs.p.peers[pid]
q, ok := fs.p.peers[pid]
if !ok {
continue
}

select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
err := q.Push(out, false)
if err != nil {
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
continue
}
fs.tracer.SendRPC(out, pid)
}
}

Expand Down
Loading
Loading