From d5168fec19720bd02e262c2aee4986a99e92f567 Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Mon, 11 Oct 2021 21:12:57 -0700 Subject: [PATCH 1/8] enable custom task prioritization logic --- bitswap.go | 10 ++++++ go.mod | 2 ++ go.sum | 2 -- internal/decision/engine.go | 69 +++++++++++++++++++++++++++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/bitswap.go b/bitswap.go index af648972..98de8d78 100644 --- a/bitswap.go +++ b/bitswap.go @@ -148,6 +148,13 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } +// WithTaskComparator configures custom task prioritization logic. +func WithTaskComparator(comparator decision.TaskComparator) Option { + return func(bs *Bitswap) { + bs.taskComparator = comparator + } +} + // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network // delegate. Runs until context is cancelled or bitswap.Close is called. @@ -272,6 +279,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, activeEngineGauge, pendingBlocksGauge, activeBlocksGauge, + decision.WithTaskComparator(bs.taskComparator), ) bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves) @@ -375,6 +383,8 @@ type Bitswap struct { // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool + + taskComparator TaskComparator } type counters struct { diff --git a/go.mod b/go.mod index 88e2eba5..eb8aa88c 100644 --- a/go.mod +++ b/go.mod @@ -32,4 +32,6 @@ require ( go.uber.org/zap v1.16.0 ) +replace github.com/ipfs/go-peertaskqueue => ../go-peertaskqueue + go 1.16 diff --git a/go.sum b/go.sum index 939c92b0..0cd2ccb4 100644 --- a/go.sum +++ b/go.sum @@ -302,8 +302,6 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= -github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI= -github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= diff --git a/internal/decision/engine.go b/internal/decision/engine.go index df49f0bc..548917f9 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-metrics-interface" "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" process "github.com/jbenet/goprocess" "github.com/libp2p/go-libp2p-core/peer" ) @@ -175,6 +176,33 @@ type Engine struct { // used to ensure metrics are reported each fixed number of operation metricsLock sync.Mutex metricUpdateCounter int + + taskComparator TaskComparator +} + +// TaskInfo represents the details of a request from a peer. +type TaskInfo struct { + Cid cid.Cid + // Tasks can be want-have or want-block + IsWantBlock bool + // Whether to immediately send a response if the block is not found + SendDontHave bool + // The size of the block corresponding to the task + BlockSize int + // Whether the block was found + HaveBlock bool +} + +// TaskComparator is used for task prioritization. +// It should return true if task 'ta' has higher priority than task 'tb' +type TaskComparator func(ta, tb *TaskInfo) bool + +type Option func(*Engine) + +func WithTaskComparator(comparator TaskComparator) Option { + return func(e *Engine) { + e.taskComparator = comparator + } } // NewEngine creates a new block sending engine for the given block store. @@ -192,6 +220,7 @@ func NewEngine( activeEngineGauge metrics.Gauge, pendingBlocksGauge metrics.Gauge, activeBlocksGauge metrics.Gauge, + opts ...Option, ) *Engine { return newEngine( ctx, @@ -207,6 +236,7 @@ func NewEngine( activeEngineGauge, pendingBlocksGauge, activeBlocksGauge, + opts..., ) } @@ -223,6 +253,7 @@ func newEngine( activeEngineGauge metrics.Gauge, pendingBlocksGauge metrics.Gauge, activeBlocksGauge metrics.Gauge, + opts ...Option, ) *Engine { if scoreLedger == nil { @@ -247,12 +278,46 @@ func newEngine( } e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String()) e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String()) - e.peerRequestQueue = peertaskqueue.New( + + for _, opt := range opts { + opt(e) + } + + // default peer task queue options + peerTaskQueueOpts := []peertaskqueue.Option{ peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved), peertaskqueue.TaskMerger(newTaskMerger()), peertaskqueue.IgnoreFreezing(true), - peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer)) + peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer), + } + + if e.taskComparator != nil { + peerTaskComparator := func(a, b *peertask.QueueTask) bool { + taskDataA := a.Task.Data.(*taskData) + taskInfoA := &TaskInfo{ + Cid: a.Task.Topic.(cid.Cid), + IsWantBlock: taskDataA.IsWantBlock, + SendDontHave: taskDataA.SendDontHave, + BlockSize: taskDataA.BlockSize, + HaveBlock: taskDataA.HaveBlock, + } + taskDataB := b.Task.Data.(*taskData) + taskInfoB := &TaskInfo{ + Cid: b.Task.Topic.(cid.Cid), + IsWantBlock: taskDataB.IsWantBlock, + SendDontHave: taskDataB.SendDontHave, + BlockSize: taskDataB.BlockSize, + HaveBlock: taskDataB.HaveBlock, + } + return e.taskComparator(taskInfoA, taskInfoB) + } + peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.PeerComparator(peertracker.TaskPriorityPeerComparator(peerTaskComparator))) + peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.TaskComparator(peerTaskComparator)) + } + + e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) + return e } From 41662895a2b84421881fa91d148b3d0b86245f03 Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Tue, 12 Oct 2021 08:47:23 -0700 Subject: [PATCH 2/8] add peer to TaskInfo --- internal/decision/engine.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 548917f9..2cede3b4 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -182,6 +182,8 @@ type Engine struct { // TaskInfo represents the details of a request from a peer. type TaskInfo struct { + Peer peer.ID + // The CID of the block Cid cid.Cid // Tasks can be want-have or want-block IsWantBlock bool @@ -296,6 +298,7 @@ func newEngine( peerTaskComparator := func(a, b *peertask.QueueTask) bool { taskDataA := a.Task.Data.(*taskData) taskInfoA := &TaskInfo{ + Peer: a.Target, Cid: a.Task.Topic.(cid.Cid), IsWantBlock: taskDataA.IsWantBlock, SendDontHave: taskDataA.SendDontHave, @@ -304,6 +307,7 @@ func newEngine( } taskDataB := b.Task.Data.(*taskData) taskInfoB := &TaskInfo{ + Peer: b.Target, Cid: b.Task.Topic.(cid.Cid), IsWantBlock: taskDataB.IsWantBlock, SendDontHave: taskDataB.SendDontHave, From 68ae19476785ae7e8de3fea99d2bad846e9bd4bb Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Tue, 12 Oct 2021 08:56:15 -0700 Subject: [PATCH 3/8] move task comparator wrapper to separate function --- internal/decision/engine.go | 51 ++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 2cede3b4..4426d8ce 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -207,6 +207,31 @@ func WithTaskComparator(comparator TaskComparator) Option { } } +// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator +func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { + return func(a, b *peertask.QueueTask) bool { + taskDataA := a.Task.Data.(*taskData) + taskInfoA := &TaskInfo{ + Peer: a.Target, + Cid: a.Task.Topic.(cid.Cid), + IsWantBlock: taskDataA.IsWantBlock, + SendDontHave: taskDataA.SendDontHave, + BlockSize: taskDataA.BlockSize, + HaveBlock: taskDataA.HaveBlock, + } + taskDataB := b.Task.Data.(*taskData) + taskInfoB := &TaskInfo{ + Peer: b.Target, + Cid: b.Task.Topic.(cid.Cid), + IsWantBlock: taskDataB.IsWantBlock, + SendDontHave: taskDataB.SendDontHave, + BlockSize: taskDataB.BlockSize, + HaveBlock: taskDataB.HaveBlock, + } + return tc(taskInfoA, taskInfoB) + } +} + // NewEngine creates a new block sending engine for the given block store. // maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum // work already outstanding. @@ -295,29 +320,9 @@ func newEngine( } if e.taskComparator != nil { - peerTaskComparator := func(a, b *peertask.QueueTask) bool { - taskDataA := a.Task.Data.(*taskData) - taskInfoA := &TaskInfo{ - Peer: a.Target, - Cid: a.Task.Topic.(cid.Cid), - IsWantBlock: taskDataA.IsWantBlock, - SendDontHave: taskDataA.SendDontHave, - BlockSize: taskDataA.BlockSize, - HaveBlock: taskDataA.HaveBlock, - } - taskDataB := b.Task.Data.(*taskData) - taskInfoB := &TaskInfo{ - Peer: b.Target, - Cid: b.Task.Topic.(cid.Cid), - IsWantBlock: taskDataB.IsWantBlock, - SendDontHave: taskDataB.SendDontHave, - BlockSize: taskDataB.BlockSize, - HaveBlock: taskDataB.HaveBlock, - } - return e.taskComparator(taskInfoA, taskInfoB) - } - peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.PeerComparator(peertracker.TaskPriorityPeerComparator(peerTaskComparator))) - peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.TaskComparator(peerTaskComparator)) + queueTaskComparator := wrapTaskComparator(e.taskComparator) + peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.PeerComparator(peertracker.TaskPriorityPeerComparator(queueTaskComparator))) + peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.TaskComparator(queueTaskComparator)) } e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) From 59e4c943503440fb1763dba6f90811383198d225 Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Tue, 12 Oct 2021 16:40:42 -0700 Subject: [PATCH 4/8] update go-peertaskqueue version --- go.mod | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go.mod b/go.mod index eb8aa88c..548ca0de 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-metrics-interface v0.0.1 - github.com/ipfs/go-peertaskqueue v0.4.0 + github.com/ipfs/go-peertaskqueue v0.6.0 github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.14.3 @@ -32,6 +32,4 @@ require ( go.uber.org/zap v1.16.0 ) -replace github.com/ipfs/go-peertaskqueue => ../go-peertaskqueue - go 1.16 From b67d113637285ead9cc1abe27fe2b0e22afcc11b Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Tue, 12 Oct 2021 18:23:03 -0700 Subject: [PATCH 5/8] fix undeclared name error --- bitswap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap.go b/bitswap.go index 98de8d78..eebc0bb7 100644 --- a/bitswap.go +++ b/bitswap.go @@ -384,7 +384,7 @@ type Bitswap struct { // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool - taskComparator TaskComparator + taskComparator decision.TaskComparator } type counters struct { From 13bfb245d438cb6cc6af4660efa9e317b9900211 Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Wed, 13 Oct 2021 01:32:18 -0700 Subject: [PATCH 6/8] Update go.sum --- go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.sum b/go.sum index 0cd2ccb4..62b20d19 100644 --- a/go.sum +++ b/go.sum @@ -302,6 +302,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg= +github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= From 1a344b1fe5ef5d937e1f8df5e4599302c087b060 Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Wed, 27 Oct 2021 14:59:28 -0700 Subject: [PATCH 7/8] Add TaskComparator test --- internal/decision/engine_test.go | 65 ++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index d8445fde..3b7aaf3c 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/go-metrics-interface" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -92,14 +93,14 @@ type engineSet struct { Blockstore blockstore.Blockstore } -func newTestEngine(ctx context.Context, idStr string) engineSet { - return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New()) +func newTestEngine(ctx context.Context, idStr string, opts ...Option) engineSet { + return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New(), opts...) } -func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) engineSet { +func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock, opts ...Option) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock)) + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock), opts...) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), @@ -193,6 +194,7 @@ func newEngineForTesting( self peer.ID, maxReplaceSize int, scoreLedger ScoreLedger, + opts ...Option, ) *Engine { testPendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge() testActiveEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge() @@ -212,6 +214,7 @@ func newEngineForTesting( testActiveEngineGauge, testPendingBlocksGauge, testActiveBlocksGauge, + opts..., ) } @@ -1054,6 +1057,60 @@ func TestWantlistForPeer(t *testing.T) { } +func TestTaskComparator(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + keys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"} + cids := make(map[cid.Cid]int) + blks := make([]blocks.Block, 0, len(keys)) + for i, letter := range keys { + block := blocks.NewBlock([]byte(letter)) + blks = append(blks, block) + cids[block.Cid()] = i + } + + fpt := &fakePeerTagger{} + sl := NewTestScoreLedger(shortTerm, nil, clock.New()) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + if err := bs.PutMany(blks); err != nil { + t.Fatal(err) + } + + // use a single task worker so that the order of outgoing messages is deterministic + engineTaskWorkerCount := 1 + e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + WithTaskComparator(func(ta, tb *TaskInfo) bool { + // prioritize based on lexicographic ordering of block content + return cids[ta.Cid] < cids[tb.Cid] + }), + ) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + + // rely on randomness of Go map's iteration order to add Want entries in random order + peerIDs := make([]peer.ID, len(keys)) + for _, i := range cids { + peerID := libp2ptest.RandPeerIDFatal(t) + peerIDs[i] = peerID + partnerWantBlocks(e, keys[i:i+1], peerID) + } + + // check that outgoing messages are sent in the correct order + for i, peerID := range peerIDs { + next := <-e.Outbox() + envelope := <-next + if peerID != envelope.Peer { + t.Errorf("expected message for peer ID %#v but instead got message for peer ID %#v", peerID, envelope.Peer) + } + responseBlocks := envelope.Message.Blocks() + if len(responseBlocks) != 1 { + t.Errorf("expected 1 block in response but instead got %v", len(blks)) + } else if responseBlocks[0].Cid() != blks[i].Cid() { + t.Errorf("expected block with CID %#v but instead got block with CID %#v", blks[i].Cid(), responseBlocks[0].Cid()) + } + } +} + func TestTaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() From b1246539f85e99d126e83df3c91854dec083d33d Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Wed, 27 Oct 2021 15:37:05 -0700 Subject: [PATCH 8/8] Add type aliases for TaskInfo and TaskComparator --- bitswap.go | 7 +++++-- internal/decision/engine_test.go | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bitswap.go b/bitswap.go index eebc0bb7..4a15fc58 100644 --- a/bitswap.go +++ b/bitswap.go @@ -148,8 +148,11 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } +type TaskInfo = decision.TaskInfo +type TaskComparator = decision.TaskComparator + // WithTaskComparator configures custom task prioritization logic. -func WithTaskComparator(comparator decision.TaskComparator) Option { +func WithTaskComparator(comparator TaskComparator) Option { return func(bs *Bitswap) { bs.taskComparator = comparator } @@ -384,7 +387,7 @@ type Bitswap struct { // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool - taskComparator decision.TaskComparator + taskComparator TaskComparator } type counters struct { diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 3b7aaf3c..acde1795 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1080,6 +1080,7 @@ func TestTaskComparator(t *testing.T) { // use a single task worker so that the order of outgoing messages is deterministic engineTaskWorkerCount := 1 e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl, + // if this Option is omitted, the test fails WithTaskComparator(func(ta, tb *TaskInfo) bool { // prioritize based on lexicographic ordering of block content return cids[ta.Cid] < cids[tb.Cid]