Skip to content

Commit

Permalink
Fix wantlist overflow handling to select newer entries.
Browse files Browse the repository at this point in the history
wantlist overflow handling now cancels existing entries to make room for newer requests. This fix prevents the wantlist from filling up with CIDs that the server does not have.

Fixes #527
  • Loading branch information
gammazero committed Jun 23, 2024
1 parent dfd4a53 commit 7baf15c
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 40 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -39,6 +39,9 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.

### Security

## [v0.20.0]

Expand Down
12 changes: 8 additions & 4 deletions bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,21 @@ func (m *impl) Empty() bool {
}

func (m *impl) Wantlist() []Entry {
out := make([]Entry, 0, len(m.wantlist))
out := make([]Entry, len(m.wantlist))
var i int
for _, e := range m.wantlist {
out = append(out, *e)
out[i] = *e
i++
}
return out
}

func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0, len(m.blocks))
bs := make([]blocks.Block, len(m.blocks))
var i int
for _, block := range m.blocks {
bs = append(bs, block)
bs[i] = block
i++
}
return bs
}
Expand Down
86 changes: 51 additions & 35 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package decision
import (
"context"
"fmt"
"math/bits"
"sync"
"time"

Expand Down Expand Up @@ -134,7 +133,7 @@ type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
Wants(p peer.ID, e wl.Entry)

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
// CancelWant returns true if the [cid.Cid] is removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
Expand Down Expand Up @@ -668,8 +667,9 @@ func (e *Engine) Peers() []peer.ID {

// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) {
// request queue (this is later popped off by the workerTasks). Returns true
// if the connection to the server must be closed.
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) bool {
entries := m.Wantlist()

if len(entries) > 0 {
Expand Down Expand Up @@ -708,7 +708,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
return false
}

e.lock.Lock()
Expand All @@ -717,39 +717,52 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
e.peerLedger.ClearPeerWantlist(p)
}

s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
filteredWants = append(filteredWants, entry)
if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) {
// filteredWants has max wants, ignore remaining wants from request.
break
}
}
wants = wants[:available]
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
wants := wants[len(filteredWants):]
for i := range wants {
wants[i] = bsmsg.Entry{} // early GC
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
wants = filteredWants

// Ensure sufficient space for new wants.
s := e.peerLedger.WantlistSizeForPeer(p)
available := int(e.maxQueuedWantlistEntriesPerPeer) - s
if len(wants) > available {
needSpace := len(wants) - available
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "replacing", needSpace)
wl := e.peerLedger.WantlistForPeer(p)
for i := 0; i < needSpace; i++ {
entCid := wl[i].Cid
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
}
}
}

for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
}
clear := wants[len(filteredWants):]
for i := range clear {
clear[i] = bsmsg.Entry{} // early GC
}
wants = filteredWants

for _, entry := range cancels {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
Expand Down Expand Up @@ -852,6 +865,9 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(es) == 0 {
return nil, nil
}
wants := make([]bsmsg.Entry, 0, len(es))
cancels := make([]bsmsg.Entry, 0, len(es))
for _, et := range es {
Expand All @@ -866,12 +882,12 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent

// Split the want-have / want-block entries from the block that will be denied access
func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if e.peerBlockRequestFilter == nil {
if e.peerBlockRequestFilter == nil || len(allWants) == 0 {
return allWants, nil
}

wants := make([]bsmsg.Entry, 0, len(allWants))
denied := make([]bsmsg.Entry, 0, len(allWants))
var denied []bsmsg.Entry

for _, et := range allWants {
if e.peerBlockRequestFilter(p, et.Cid) {
Expand Down
43 changes: 43 additions & 0 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,3 +1733,46 @@ func TestKillConnectionForInlineCid(t *testing.T) {
t.Fatal("connection was not killed when receiving inline in cancel")
}
}

func TestWantlistOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const limit = 32
warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit))
riga := newTestEngine(ctx, "riga")

m := message.New(false)
for i := 0; i < limit+(limit/2); i++ {
m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true)
}
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)

if warsaw.Peer == riga.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
}

wl := warsaw.Engine.WantlistForPeer(riga.Peer)
if len(wl) != limit {
t.Fatal("wantlist does not match limit", len(wl))
}

// Add one block to the existing message
nb := blocks.NewBlock([]byte(fmt.Sprint(2*limit + 1)))
newCid := nb.Cid()
m.AddEntry(newCid, 0, pb.Message_Wantlist_Block, true)
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)

// Check that new block is in wantlist.
wl = warsaw.Engine.WantlistForPeer(riga.Peer)
var found bool
for i := range wl {
if wl[i].Cid == newCid {
found = true
break
}
}
if !found {
t.Fatal("Expected newest block in wantlist")
}
}

0 comments on commit 7baf15c

Please sign in to comment.