From a06156c7d19cd8882da0b87bef777fa9fe9db162 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 21 Jun 2024 01:22:28 -0700 Subject: [PATCH] Fix wantlist overflow handling to select newer entries. wantlist overflow handling now cancels existing entries to make room for newer requests. This fix prevents the wantlist from filling up with CIDs that the server does not have. Fixes #527 --- CHANGELOG.md | 3 +- bitswap/server/internal/decision/engine.go | 89 +++++++++++++------ .../server/internal/decision/engine_test.go | 58 ++++++++++++ .../server/internal/decision/peer_ledger.go | 3 +- 4 files changed, 123 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85176369f..2addb5a44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The following emojis are used to highlight certain changes: ### Changed - `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0) -- `bitswap/client` supports additional tracing +- `bitswap/client` supports additional tracing ### Removed @@ -41,6 +41,7 @@ The following emojis are used to highlight certain changes: - `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found - `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup +- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. ## [v0.20.0] diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index bc934db5a..564e27ad6 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -4,7 +4,6 @@ package decision import ( "context" "fmt" - "math/bits" "sync" "time" @@ -134,7 +133,7 @@ type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. Wants(p peer.ID, e wl.Entry) - // CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID]. + // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool // CancelWantWithType will not cancel WantBlock if we sent a HAVE message. @@ -702,38 +701,72 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.ClearPeerWantlist(p) } - s := uint(e.peerLedger.WantlistSizeForPeer(p)) - if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer { - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe) - // truncate wantlist to avoid overflow - available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0) - if o != 0 { - available = 0 + if len(wants) != 0 { + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if entry.Cid.Prefix().MhType == mh.IDENTITY { + // This is a truely broken client, let's kill the connection. + e.lock.Unlock() + log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) + return true + } + if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + filteredWants = append(filteredWants, entry) + if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) { + // filteredWants at limit, ignore remaining wants from request. + log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants)) + break + } } - wants = wants[:available] - } - - filteredWants := wants[:0] // shift inplace - - for _, entry := range wants { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) - return true + wants = wants[len(filteredWants):] + for i := range wants { + wants[i] = bsmsg.Entry{} // early GC } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue + wants = filteredWants + + // Ensure sufficient space for new wants. + s := e.peerLedger.WantlistSizeForPeer(p) + available := int(e.maxQueuedWantlistEntriesPerPeer) - s + if len(wants) > available { + needSpace := len(wants) - available + log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace) + // Cancel any wants that are being requested again. This makes room + // for new wants and minimizes that existing wants to cancel that + // are not in the new request. + for _, entry := range wants { + if e.peerLedger.CancelWant(p, entry.Cid) { + e.peerRequestQueue.Remove(entry.Cid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + // Cancel additional wants, that are not being replaced, to make + // room for new wants. + if needSpace != 0 { + wl := e.peerLedger.WantlistForPeer(p) + for i := range wl { + entCid := wl[i].Cid + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + } } - e.peerLedger.Wants(p, entry.Entry) - filteredWants = append(filteredWants, entry) + for _, entry := range wants { + e.peerLedger.Wants(p, entry.Entry) + } } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants for _, entry := range cancels { c := entry.Cid if c.Prefix().MhType == mh.IDENTITY { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index c25e3508d..a9595968d 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1733,3 +1733,61 @@ func TestKillConnectionForInlineCid(t *testing.T) { t.Fatal("connection was not killed when receiving inline in cancel") } } + +func TestWantlistOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) + riga := newTestEngine(ctx, "riga") + + m := message.New(false) + for i := 0; i < limit+(limit/2); i++ { + m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + // Check that the wantlist is at the size limit, and limit/2 wants ignored. + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist does not match limit", len(wl)) + } + + m = message.New(false) + blockCids := make([]cid.Cid, limit/2+4) + for i := 0; i < limit/2+4; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(i + limit))).Cid() + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + blockCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + + // Check that wantlist is still at size limit. + if len(wl) != limit { + t.Fatalf("wantlist size %d does not match limit %d", len(wl), limit) + } + + // Check that all new blocks are in wantlist. + var missing int + for _, c := range blockCids { + var found bool + for i := range wl { + if wl[i].Cid == c { + found = true + break + } + } + if !found { + missing++ + } + } + if missing != 0 { + t.Fatalf("Missing %d new wants expected in wantlist", missing) + } +} diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index b79db226d..714e28d42 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -42,13 +42,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { if !ok { return false } + _, had := wants[k] delete(wants, k) if len(wants) == 0 { delete(l.peers, p) } l.removePeerFromCid(p, k) - return true + return had } func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {