Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

PoC of Bitswap protocol extensions implementation #189

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
434 changes: 350 additions & 84 deletions benchmarks_test.go

Large diffs are not rendered by default.

125 changes: 66 additions & 59 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package bitswap
import (
"context"
"errors"

"sync"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

bsbpm "github.com/ipfs/go-bitswap/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -20,6 +21,7 @@ import (
bspm "github.com/ipfs/go-bitswap/peermanager"
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/session"
bssim "github.com/ipfs/go-bitswap/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
Expand Down Expand Up @@ -113,24 +115,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm := bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

engine := decision.NewEngine(ctx, bstore, network.ConnectionManager()) // TODO close the engine with Close() method
bs := &Bitswap{
blockstore: bstore,
engine: engine,
Expand All @@ -139,8 +147,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
Expand All @@ -156,7 +166,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
option(bs)
}

bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)

Expand All @@ -181,6 +190,8 @@ type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

Expand Down Expand Up @@ -215,9 +226,13 @@ type Bitswap struct {
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// the sessionmanager manages tracking sessions
// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager

// the SessionInterestManager keeps track of which sessions are interested
// in which CIDs
sim *bssim.SessionInterestManager

// whether or not to make provide announcements
provideEnabled bool

Expand Down Expand Up @@ -275,14 +290,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -293,22 +308,20 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// If blocks came from the network
if from != "" {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
}

// Put wanted blocks into blockstore
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
}

// NOTE: There exists the possiblity for a race condition here. If a user
Expand All @@ -322,33 +335,25 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}

wantedKs := allKs
if len(blks) != len(wanted) {
wantedKs = make([]cid.Cid, 0, len(wanted))
for _, b := range wanted {
wantedKs = append(wantedKs, b.Cid())
}
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(from, allKs)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted block keys to decision engine
bs.engine.AddBlocks(wantedKs)
// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of received
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
}

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, k := range wantedKs {
for _, blk := range wanted {
select {
case bs.newBlocks <- k:
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
Expand Down Expand Up @@ -380,20 +385,22 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

iblocks := incoming.Blocks()

if len(iblocks) == 0 {
return
}

bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
}

// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
haves := incoming.Haves()
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
}
}

Expand Down Expand Up @@ -479,12 +486,12 @@ func (bs *Bitswap) Close() error {

// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
}
return out
return bs.pm.CurrentWants()
}

// GetWanthaves returns the current list of want-haves.
func (bs *Bitswap) GetWantHaves() []cid.Cid {
return bs.pm.CurrentWantHaves()
}

// IsOnline is needed to match go-ipfs-exchange-interface
Expand Down
25 changes: 18 additions & 7 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,15 +571,17 @@ func TestWantlistCleanup(t *testing.T) {
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

instances := ig.Instances(1)[0]
bswap := instances.Exchange
instances := ig.Instances(2)
instance := instances[0]
bswap := instance.Exchange
blocks := bg.Blocks(20)

var keys []cid.Cid
for _, b := range blocks {
keys = append(keys, b.Cid())
}

// Once context times out, key should be removed from wantlist
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := bswap.GetBlock(ctx, keys[0])
Expand All @@ -589,10 +591,11 @@ func TestWantlistCleanup(t *testing.T) {

time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Once context times out, keys should be removed from wantlist
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err = bswap.GetBlocks(ctx, keys[:10])
Expand All @@ -603,29 +606,37 @@ func TestWantlistCleanup(t *testing.T) {
<-ctx.Done()
time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Send want for single block, with no timeout
_, err = bswap.GetBlocks(context.Background(), keys[:1])
if err != nil {
t.Fatal(err)
}

// Send want for 10 blocks
ctx, cancel = context.WithCancel(context.Background())
_, err = bswap.GetBlocks(ctx, keys[10:])
if err != nil {
t.Fatal(err)
}

// Even after 50 milli-seconds we haven't explicitly cancelled anything
// and no timeouts have expired, so we should have 11 want-haves
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 5 {
t.Fatal("should have 5 keys in wantlist")
if len(bswap.GetWantHaves()) != 11 {
t.Fatal("should have 11 keys in wantlist")
}

// Cancel the timeout for the request for 10 blocks. This should remove
// the want-haves
cancel()

// Once the cancel is processed, we are left with the request for 1 block
time.Sleep(time.Millisecond * 50)
if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
t.Fatal("should only have keys[0] in wantlist")
}
}
Expand Down
Loading