Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Bitswap Refactor #2: Extract PeerManager From Want Manager + Unit Test #29

Merged
merged 5 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

decision "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"

Expand Down Expand Up @@ -85,12 +87,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
" data blocks recived").Histogram(metricsBuckets)

sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)

notif := notifications.New()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})

peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
}

bs := &Bitswap{
blockstore: bstore,
notifications: notif,
Expand All @@ -100,14 +109,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx, network),
wm: bswm.New(ctx),
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(),
counters: new(counters),

dupMetric: dupHist,
allMetric: allHist,
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
}
go bs.wm.Run()

bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
network.SetDelegate(bs)

// Start up bitswaps async worker routines
Expand All @@ -128,6 +141,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *bspm.PeerManager

// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

// the engine is the bit of logic that decides who to send which blocks to
Expand Down Expand Up @@ -160,8 +176,9 @@ type Bitswap struct {
counters *counters

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// the sessionmanager manages tracking sessions
sm *bssm.SessionManager
Expand Down Expand Up @@ -427,13 +444,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
4 changes: 2 additions & 2 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.wm.ConnectedPeers()
peers := inst.Exchange.pm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.wm.ConnectedPeers()
peers = inst.Exchange.pm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
Expand Down
Loading