Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#189 from ipfs/feat/proto-ext-poc
Browse files Browse the repository at this point in the history
PoC of Bitswap protocol extensions implementation

This commit was moved from ipfs/go-bitswap@86178ba
  • Loading branch information
Stebalien authored Jan 30, 2020
2 parents 602dbce + eab2bf8 commit fcea5cd
Show file tree
Hide file tree
Showing 58 changed files with 8,205 additions and 1,838 deletions.
434 changes: 350 additions & 84 deletions bitswap/benchmarks_test.go

Large diffs are not rendered by default.

125 changes: 66 additions & 59 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package bitswap
import (
"context"
"errors"

"sync"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

bsbpm "github.com/ipfs/go-bitswap/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -20,6 +21,7 @@ import (
bspm "github.com/ipfs/go-bitswap/peermanager"
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/session"
bssim "github.com/ipfs/go-bitswap/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
Expand Down Expand Up @@ -113,24 +115,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm := bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

engine := decision.NewEngine(ctx, bstore, network.ConnectionManager()) // TODO close the engine with Close() method
bs := &Bitswap{
blockstore: bstore,
engine: engine,
Expand All @@ -139,8 +147,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
Expand All @@ -156,7 +166,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
option(bs)
}

bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)

Expand All @@ -181,6 +190,8 @@ type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

Expand Down Expand Up @@ -215,9 +226,13 @@ type Bitswap struct {
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// the sessionmanager manages tracking sessions
// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager

// the SessionInterestManager keeps track of which sessions are interested
// in which CIDs
sim *bssim.SessionInterestManager

// whether or not to make provide announcements
provideEnabled bool

Expand Down Expand Up @@ -275,14 +290,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -293,22 +308,20 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// If blocks came from the network
if from != "" {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
}

// Put wanted blocks into blockstore
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
}

// NOTE: There exists the possiblity for a race condition here. If a user
Expand All @@ -322,33 +335,25 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}

wantedKs := allKs
if len(blks) != len(wanted) {
wantedKs = make([]cid.Cid, 0, len(wanted))
for _, b := range wanted {
wantedKs = append(wantedKs, b.Cid())
}
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(from, allKs)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted block keys to decision engine
bs.engine.AddBlocks(wantedKs)
// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of received
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
}

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, k := range wantedKs {
for _, blk := range wanted {
select {
case bs.newBlocks <- k:
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
Expand Down Expand Up @@ -380,20 +385,22 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

iblocks := incoming.Blocks()

if len(iblocks) == 0 {
return
}

bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
}

// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
haves := incoming.Haves()
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
}
}

Expand Down Expand Up @@ -479,12 +486,12 @@ func (bs *Bitswap) Close() error {

// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
}
return out
return bs.pm.CurrentWants()
}

// GetWanthaves returns the current list of want-haves.
func (bs *Bitswap) GetWantHaves() []cid.Cid {
return bs.pm.CurrentWantHaves()
}

// IsOnline is needed to match go-ipfs-exchange-interface
Expand Down
25 changes: 18 additions & 7 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,15 +571,17 @@ func TestWantlistCleanup(t *testing.T) {
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

instances := ig.Instances(1)[0]
bswap := instances.Exchange
instances := ig.Instances(2)
instance := instances[0]
bswap := instance.Exchange
blocks := bg.Blocks(20)

var keys []cid.Cid
for _, b := range blocks {
keys = append(keys, b.Cid())
}

// Once context times out, key should be removed from wantlist
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := bswap.GetBlock(ctx, keys[0])
Expand All @@ -589,10 +591,11 @@ func TestWantlistCleanup(t *testing.T) {

time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Once context times out, keys should be removed from wantlist
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err = bswap.GetBlocks(ctx, keys[:10])
Expand All @@ -603,29 +606,37 @@ func TestWantlistCleanup(t *testing.T) {
<-ctx.Done()
time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Send want for single block, with no timeout
_, err = bswap.GetBlocks(context.Background(), keys[:1])
if err != nil {
t.Fatal(err)
}

// Send want for 10 blocks
ctx, cancel = context.WithCancel(context.Background())
_, err = bswap.GetBlocks(ctx, keys[10:])
if err != nil {
t.Fatal(err)
}

// Even after 50 milli-seconds we haven't explicitly cancelled anything
// and no timeouts have expired, so we should have 11 want-haves
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 5 {
t.Fatal("should have 5 keys in wantlist")
if len(bswap.GetWantHaves()) != 11 {
t.Fatal("should have 11 keys in wantlist")
}

// Cancel the timeout for the request for 10 blocks. This should remove
// the want-haves
cancel()

// Once the cancel is processed, we are left with the request for 1 block
time.Sleep(time.Millisecond * 50)
if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
t.Fatal("should only have keys[0] in wantlist")
}
}
Expand Down
Loading

0 comments on commit fcea5cd

Please sign in to comment.