From c8438803571d38e33afcf5a679360a7a3a6de8e0 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 17 Feb 2023 07:51:53 +0100 Subject: [PATCH] bitswap/server/internal/decision: add filtering on CIDs - Ignore cids that are too big. - Kill connection for peers that are using inline CIDs. --- bitswap/internal/defaults/defaults.go | 6 +++ bitswap/network/ipfs_impl.go | 2 +- bitswap/options.go | 6 +++ bitswap/server/internal/decision/engine.go | 45 +++++++++++++++++++++- bitswap/server/server.go | 16 +++++++- 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index 1b0de2497..f9494a0da 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -1,6 +1,7 @@ package defaults import ( + "encoding/binary" "time" ) @@ -27,4 +28,9 @@ const ( // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 + + // Copied from github.com/ipfs/go-verifcid#maximumHashLength + // FIXME: expose this in go-verifcid. + MaximumHashLength = 128 + MaximumAllowedCid = binary.MaxVarintLen64*4 + MaximumHashLength ) diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 03344fa9c..caca837ca 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -370,7 +370,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { } func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error { - panic("Not implemented: DisconnectFrom() is only used by tests") + return bsnet.host.Network().ClosePeer(p) } // FindProvidersAsync returns a channel of providers for the given key. diff --git a/bitswap/options.go b/bitswap/options.go index 1e2e09018..9ccf3c4d6 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -33,6 +33,12 @@ func MaxQueuedWantlistEntriesPerPeer(count uint) Option { return Option{server.MaxQueuedWantlistEntriesPerPeer(count)} } +// MaxCidSize only affects the server. +// If it is 0 no limit is applied. +func MaxCidSize(n uint) Option { + return Option{server.MaxCidSize(n)} +} + func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 29ac1aa2a..208fef740 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -25,6 +25,7 @@ import ( "github.com/ipfs/go-peertaskqueue/peertracker" process "github.com/jbenet/goprocess" "github.com/libp2p/go-libp2p/core/peer" + mh "github.com/multiformats/go-multihash" ) // TODO consider taking responsibility for other types of requests. For @@ -187,6 +188,7 @@ type Engine struct { maxOutstandingBytesPerPeer int maxQueuedWantlistEntriesPerPeer uint + maxCidSize uint } // TaskInfo represents the details of a request from a peer. @@ -272,13 +274,20 @@ func WithMaxOutstandingBytesPerPeer(count int) Option { // WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send. // If a peer send us more than this we will truncate newest entries. -// It defaults to DefaultMaxQueuedWantlistEntiresPerPeer. func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option { return func(e *Engine) { e.maxQueuedWantlistEntriesPerPeer = count } } +// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send. +// If a peer send us more than this we will truncate newest entries. +func WithMaxCidSize(n uint) Option { + return func(e *Engine) { + e.maxCidSize = n + } +} + func WithSetSendDontHave(send bool) Option { return func(e *Engine) { e.sendDontHaves = send @@ -357,6 +366,7 @@ func newEngine( tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()), tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()), maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer, + maxCidSize: defaults.MaximumAllowedCid, } for _, opt := range opts { @@ -617,7 +627,7 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived is called when a message is received from a remote peer. // For each item in the wantlist, add a want-have or want-block entry to the // request queue (this is later popped off by the workerTasks) -func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) { +func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) { entries := m.Wantlist() if len(entries) > 0 { @@ -676,10 +686,40 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap wants = wants[:available] } + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if entry.Cid.Prefix().MhType == mh.IDENTITY { + // This is a truely broken client, let's kill the connection. + e.lock.Unlock() + log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) + return true + } + if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + e.peerLedger.Wants(p, entry.Entry) + filteredWants = append(filteredWants, entry) + } + clear := wants[len(filteredWants):] + for i := range clear { + clear[i] = bsmsg.Entry{} // early GC } + wants = filteredWants for _, entry := range cancels { + if entry.Cid.Prefix().MhType == mh.IDENTITY { + // This is a truely broken client, let's kill the connection. + e.lock.Unlock() + log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p) + return true + } + if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid) if e.peerLedger.CancelWant(p, entry.Cid) { e.peerRequestQueue.Remove(entry.Cid, p) @@ -765,6 +805,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...) e.updateMetrics() } + return false } // Split the want-have / want-block entries from the cancel entries diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 424456036..7918b73d7 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -220,6 +220,17 @@ func MaxQueuedWantlistEntriesPerPeer(count uint) Option { } } +// MaxCidSize limits how big CIDs we are willing to serve. +// We will ignore CIDs over this limit. +// It defaults to [defaults.MaxCidSize]. +// If it is 0 no limit is applied. +func MaxCidSize(n uint) Option { + o := decision.WithMaxCidSize(n) + return func(bs *Server) { + bs.engineOptions = append(bs.engineOptions, o) + } +} + // HasBlockBufferSize configure how big the new blocks buffer should be. func HasBlockBufferSize(count int) Option { if count < 0 { @@ -511,7 +522,10 @@ func (bs *Server) provideWorker(px process.Process) { func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. - bs.engine.MessageReceived(ctx, p, incoming) + mustKillConnection := bs.engine.MessageReceived(ctx, p, incoming) + if mustKillConnection { + bs.network.DisconnectFrom(ctx, p) + } // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger