diff --git a/gossipsub.go b/gossipsub.go index 11d9b1c3..caea93f5 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -45,6 +45,7 @@ var ( GossipSubFanoutTTL = 60 * time.Second GossipSubPrunePeers = 16 GossipSubPruneBackoff = time.Minute + GossipSubUnsubscribeBackoff = 10 * time.Second GossipSubConnectors = 8 GossipSubMaxPendingConnections = 128 GossipSubConnectionTimeout = 30 * time.Second @@ -153,6 +154,11 @@ type GossipSubParams struct { // before attempting to re-graft. PruneBackoff time.Duration + // UnsubscribeBackoff controls the backoff time to use when unsuscribing + // from a topic. A peer should not resubscribe to this topic before this + // duration. + UnsubscribeBackoff time.Duration + // Connectors controls the number of active connection attempts for peers obtained through PX. Connectors int @@ -244,6 +250,7 @@ func DefaultGossipSubParams() GossipSubParams { FanoutTTL: GossipSubFanoutTTL, PrunePeers: GossipSubPrunePeers, PruneBackoff: GossipSubPruneBackoff, + UnsubscribeBackoff: GossipSubUnsubscribeBackoff, Connectors: GossipSubConnectors, MaxPendingConnections: GossipSubMaxPendingConnections, ConnectionTimeout: GossipSubConnectionTimeout, @@ -777,7 +784,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. gs.score.AddPenalty(p, 1) } // refresh the backoff - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, false) prune = append(prune, topic) continue } @@ -791,7 +798,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. // but we won't PX to them doPX = false // add/refresh backoff so that we don't reGRAFT too early even if the score decays back up - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, false) continue } @@ -800,7 +807,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. // mesh takeover attacks combined with love bombing if len(peers) >= gs.params.Dhi && !gs.outbound[p] { prune = append(prune, topic) - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, false) continue } @@ -815,7 +822,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. cprune := make([]*pb.ControlPrune, 0, len(prune)) for _, topic := range prune { - cprune = append(cprune, gs.makePrune(p, topic, doPX)) + cprune = append(cprune, gs.makePrune(p, topic, doPX, false)) } return cprune @@ -839,7 +846,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { if backoff > 0 { gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second) } else { - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, false) } px := prune.GetPeers() @@ -855,8 +862,12 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } } -func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { - gs.doAddBackoff(p, topic, gs.params.PruneBackoff) +func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) { + backoff := gs.params.PruneBackoff + if isUnsubscribe { + backoff = gs.params.UnsubscribeBackoff + } + gs.doAddBackoff(p, topic, backoff) } func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) { @@ -1096,11 +1107,11 @@ func (gs *GossipSubRouter) Leave(topic string) { for p := range gmap { log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) gs.tracer.Prune(p, topic) - gs.sendPrune(p, topic) + gs.sendPrune(p, topic, true) // Add a backoff to this peer to prevent us from eagerly // re-grafting this peer into our mesh if we rejoin this // topic before the backoff period ends. - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, true) } } @@ -1110,8 +1121,8 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { gs.sendRPC(p, out) } -func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { - prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)} +func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string, isUnsubscribe bool) { + prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX, isUnsubscribe)} out := rpcWithControl(nil, nil, nil, nil, prune) gs.sendRPC(p, out) } @@ -1368,7 +1379,7 @@ func (gs *GossipSubRouter) heartbeat() { prunePeer := func(p peer.ID) { gs.tracer.Prune(p, topic) delete(peers, p) - gs.addBackoff(p, topic) + gs.addBackoff(p, topic, false) topics := toprune[p] toprune[p] = append(topics, topic) } @@ -1668,7 +1679,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, delete(toprune, p) prune = make([]*pb.ControlPrune, 0, len(pruning)) for _, topic := range pruning { - prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) + prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false)) } } @@ -1679,7 +1690,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, for p, topics := range toprune { prune := make([]*pb.ControlPrune, 0, len(topics)) for _, topic := range topics { - prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) + prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false)) } out := rpcWithControl(nil, nil, nil, nil, prune) @@ -1834,13 +1845,17 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control } } -func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune { +func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune { if !gs.feature(GossipSubFeaturePX, gs.peers[p]) { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway return &pb.ControlPrune{TopicID: &topic} } backoff := uint64(gs.params.PruneBackoff / time.Second) + if isUnsubscribe { + backoff = uint64(gs.params.UnsubscribeBackoff / time.Second) + } + var px []*pb.PeerInfo if doPX { // select peers for Peer eXchange diff --git a/gossipsub_test.go b/gossipsub_test.go index 96e822e0..4d09c317 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -7,6 +7,7 @@ import ( "io" "math/rand" "sync" + "sync/atomic" "testing" "time" @@ -581,6 +582,104 @@ func TestGossipsubPrune(t *testing.T) { } } +func TestGossipsubPruneBackoffTime(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 10) + + // App specific score that we'll change later. + currentScoreForHost0 := int32(0) + + params := DefaultGossipSubParams() + params.HeartbeatInitialDelay = time.Millisecond * 10 + params.HeartbeatInterval = time.Millisecond * 100 + + psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params), WithPeerScore( + &PeerScoreParams{ + AppSpecificScore: func(p peer.ID) float64 { + if p == hosts[0].ID() { + return float64(atomic.LoadInt32(¤tScoreForHost0)) + } else { + return 0 + } + }, + AppSpecificWeight: 1, + DecayInterval: time.Second, + DecayToZero: 0.01, + }, + &PeerScoreThresholds{ + GossipThreshold: -1, + PublishThreshold: -1, + GraylistThreshold: -1, + })) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + connectAll(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second) + + pruneTime := time.Now() + // Flip the score. Host 0 should be pruned from everyone + atomic.StoreInt32(¤tScoreForHost0, -1000) + + // wait for heartbeats to run and prune + time.Sleep(time.Second) + + wg := sync.WaitGroup{} + var missingBackoffs uint32 = 0 + for i := 1; i < 10; i++ { + wg.Add(1) + // Copy i so this func keeps the correct value in the closure. + var idx = i + // Run this check in the eval thunk so that we don't step over the heartbeat goroutine and trigger a race. + psubs[idx].rt.(*GossipSubRouter).p.eval <- func() { + defer wg.Done() + backoff, ok := psubs[idx].rt.(*GossipSubRouter).backoff["foobar"][hosts[0].ID()] + if !ok { + atomic.AddUint32(&missingBackoffs, 1) + } + if ok && backoff.Sub(pruneTime)-params.PruneBackoff > time.Second { + t.Errorf("backoff time should be equal to prune backoff (with some slack) was %v", backoff.Sub(pruneTime)-params.PruneBackoff) + } + } + } + wg.Wait() + + // Sometimes not all the peers will have updated their backoffs by this point. If the majority haven't we'll fail this test. + if missingBackoffs >= 5 { + t.Errorf("missing too many backoffs: %v", missingBackoffs) + } + + for i := 0; i < 10; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + // Don't publish from host 0, since everyone should have pruned it. + owner := rand.Intn(len(psubs)-1) + 1 + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs[1:] { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + func TestGossipsubGraft(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1839,27 +1938,51 @@ func TestGossipSubLeaveTopic(t *testing.T) { time.Sleep(time.Second) - psubs[0].rt.Leave("test") - time.Sleep(time.Second) - peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"] - if len(peerMap) != 1 { - t.Fatalf("No peer is populated in the backoff map for peer 0") - } - _, ok := peerMap[h[1].ID()] - if !ok { - t.Errorf("Expected peer does not exist in the backoff map") + leaveTime := time.Now() + done := make(chan struct{}) + + psubs[0].rt.(*GossipSubRouter).p.eval <- func() { + defer close(done) + psubs[0].rt.Leave("test") + time.Sleep(time.Second) + peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"] + if len(peerMap) != 1 { + t.Fatalf("No peer is populated in the backoff map for peer 0") + } + _, ok := peerMap[h[1].ID()] + if !ok { + t.Errorf("Expected peer does not exist in the backoff map") + } + + backoffTime := peerMap[h[1].ID()].Sub(leaveTime) + // Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s) + if backoffTime-GossipSubUnsubscribeBackoff > time.Second { + t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.") + } } + <-done + done = make(chan struct{}) // Ensure that remote peer 1 also applies the backoff appropriately // for peer 0. - peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"] - if len(peerMap2) != 1 { - t.Fatalf("No peer is populated in the backoff map for peer 1") - } - _, ok = peerMap2[h[0].ID()] - if !ok { - t.Errorf("Expected peer does not exist in the backoff map") + psubs[1].rt.(*GossipSubRouter).p.eval <- func() { + defer close(done) + peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"] + if len(peerMap2) != 1 { + t.Fatalf("No peer is populated in the backoff map for peer 1") + } + _, ok := peerMap2[h[0].ID()] + if !ok { + t.Errorf("Expected peer does not exist in the backoff map") + } + + backoffTime := peerMap2[h[0].ID()].Sub(leaveTime) + // Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s) + if backoffTime-GossipSubUnsubscribeBackoff > time.Second { + t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.") + } } + <-done } func TestGossipSubJoinTopic(t *testing.T) { @@ -1880,7 +2003,7 @@ func TestGossipSubJoinTopic(t *testing.T) { // Add in backoff for peer. peerMap := make(map[peer.ID]time.Time) - peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff) + peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff) router0.backoff["test"] = peerMap