Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: add peer block filter option (#549)
Browse files Browse the repository at this point in the history
* feat: add peer block filter option

This feature lets a user configure a function that will
allow / deny request for a block coming from a peer.
  • Loading branch information
laurentsenta authored Mar 17, 2022
1 parent dbfc6a1 commit b6f0cc7
Show file tree
Hide file tree
Showing 3 changed files with 414 additions and 28 deletions.
11 changes: 11 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ func WithTargetMessageSize(tms int) Option {
}
}

func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option {
return func(bs *Bitswap) {
bs.peerBlockRequestFilter = pbrf
}
}

type TaskInfo = decision.TaskInfo
type TaskComparator = decision.TaskComparator
type PeerBlockRequestFilter = decision.PeerBlockRequestFilter

// WithTaskComparator configures custom task prioritization logic.
func WithTaskComparator(comparator TaskComparator) Option {
Expand Down Expand Up @@ -291,6 +298,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
decision.WithTargetMessageSize(bs.engineTargetMessageSize),
decision.WithPeerBlockRequestFilter(bs.peerBlockRequestFilter),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -399,6 +407,9 @@ type Bitswap struct {
simulateDontHavesOnTimeout bool

taskComparator TaskComparator

// an optional feature to accept / deny requests for blocks
peerBlockRequestFilter PeerBlockRequestFilter
}

type counters struct {
Expand Down
91 changes: 69 additions & 22 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type Engine struct {
metricUpdateCounter int

taskComparator TaskComparator

peerBlockRequestFilter PeerBlockRequestFilter
}

// TaskInfo represents the details of a request from a peer.
Expand All @@ -201,6 +203,10 @@ type TaskInfo struct {
// It should return true if task 'ta' has higher priority than task 'tb'
type TaskComparator func(ta, tb *TaskInfo) bool

// PeerBlockRequestFilter is used to accept / deny requests for a CID coming from a PeerID
// It should return true if the request should be fullfilled.
type PeerBlockRequestFilter func(p peer.ID, c cid.Cid) bool

type Option func(*Engine)

func WithTaskComparator(comparator TaskComparator) Option {
Expand All @@ -209,6 +215,12 @@ func WithTaskComparator(comparator TaskComparator) Option {
}
}

func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option {
return func(e *Engine) {
e.peerBlockRequestFilter = pbrf
}
}

func WithTargetMessageSize(size int) Option {
return func(e *Engine) {
e.targetMessageSize = size
Expand Down Expand Up @@ -598,8 +610,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
}
}()

// Get block sizes
// Dispatch entries
wants, cancels := e.splitWantsCancels(entries)
wants, denials := e.splitWantsDenials(p, wants)

// Get block sizes
wantKs := cid.NewSet()
for _, entry := range wants {
wantKs.Add(entry.Cid)
Expand Down Expand Up @@ -639,6 +654,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
}
}

// Cancel a block operation
sendDontHave := func(entry bsmsg.Entry) {
// Only add the task to the queue if the requester wants a DONT_HAVE
if e.sendDontHaves && entry.SendDontHave {
c := entry.Cid

newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
}

activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: bsmsg.BlockPresenceSize(c),
Data: &taskData{
BlockSize: 0,
HaveBlock: false,
IsWantBlock: isWantBlock,
SendDontHave: entry.SendDontHave,
},
})
}
}

// Deny access to blocks
for _, entry := range denials {
log.Debugw("Bitswap engine: block denied access", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)
sendDontHave(entry)
}

// For each want-have / want-block
for _, entry := range wants {
c := entry.Cid
Expand All @@ -650,27 +697,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
// If the block was not found
if !found {
log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

// Only add the task to the queue if the requester wants a DONT_HAVE
if e.sendDontHaves && entry.SendDontHave {
newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
}

activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: bsmsg.BlockPresenceSize(c),
Data: &taskData{
BlockSize: 0,
HaveBlock: false,
IsWantBlock: isWantBlock,
SendDontHave: entry.SendDontHave,
},
})
}
sendDontHave(entry)
} else {
// The block was found, add it to the queue
newWorkExists = true
Expand Down Expand Up @@ -722,6 +749,26 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent
return wants, cancels
}

// Split the want-have / want-block entries from the block that will be denied access
func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if e.peerBlockRequestFilter == nil {
return allWants, nil
}

wants := make([]bsmsg.Entry, 0, len(allWants))
denied := make([]bsmsg.Entry, 0, len(allWants))

for _, et := range allWants {
if e.peerBlockRequestFilter(p, et.Cid) {
wants = append(wants, et)
} else {
denied = append(denied, et)
}
}

return wants, denied
}

// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
Expand Down
Loading

0 comments on commit b6f0cc7

Please sign in to comment.