Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Feat/bitswap pr improvements #19

Merged
merged 3 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

var log = logging.Logger("bitswap")

var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
Expand Down
292 changes: 292 additions & 0 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
package bitswap

import (
"context"
"encoding/json"
"io/ioutil"
"math/rand"
"sync"
"testing"
"time"

tn "github.com/ipfs/go-bitswap/testnet"

"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)

type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)

type runStats struct {
Dups uint64
MsgSent uint64
MsgRecd uint64
Time time.Duration
Name string
}

var benchmarkLog []runStats

func BenchmarkDups2Nodes(b *testing.B) {
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
})
b.Run("AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
})

b.Run("Overlap1-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
})

b.Run("Overlap2-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
})

b.Run("Overlap3-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
})
b.Run("Overlap3-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
})
b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
})
b.Run("Overlap3-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
})
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
})
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
})
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
})
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
})
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
})
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
})
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
})
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
})
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
})
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
})

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666)
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
sg := NewTestSessionGenerator(net)
defer sg.Close()

bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(numnodes)
blocks := bg.Blocks(numblks)

fetcher := instances[numnodes-1]

df(b, instances[:numnodes-1], blocks)

var ks []cid.Cid
for _, blk := range blocks {
ks = append(ks, blk.Cid())
}

ff(b, fetcher.Exchange, ks)

st, err := fetcher.Exchange.Stat()
if err != nil {
b.Fatal(err)
}

nst := fetcher.Exchange.network.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
MsgSent: nst.MessagesSent,
Dups: st.DupBlksReceived,
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, stats)
b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
if st.DupBlksReceived != 0 {
b.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
}
}

func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
b.Fatal(err)
}
}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
b.Fatal(err)
}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]

bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
} else {
jeff.Blockstore().Put(blk)
}
}
}

func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap3 only works with 2 provs")
}

bill := provs[0]
jeff := provs[1]

bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
} else {
jeff.Blockstore().Put(blk)
}
}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
}
}

func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}
b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt))
}

// fetch data in batches, 10 at a time
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
if err != nil {
b.Fatal(err)
}
for range out {
}
}
}

// fetch each block at the same time concurrently
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())

var wg sync.WaitGroup
for _, c := range ks {
wg.Add(1)
go func(c cid.Cid) {
defer wg.Done()
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}(c)
}
wg.Wait()
}

func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
b.Fatal(err)
}
for range out {
}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
b.Fatal(err)
}

out, err := ses.GetBlocks(context.Background(), ks[1:11])
if err != nil {
b.Fatal(err)
}
for range out {
}

out, err = ses.GetBlocks(context.Background(), ks[11:])
if err != nil {
b.Fatal(err)
}
for range out {
}
}
10 changes: 10 additions & 0 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type BitSwapNetwork interface {

ConnectionManager() ifconnmgr.ConnManager

Stats() NetworkStats

Routing
}

Expand Down Expand Up @@ -68,3 +70,11 @@ type Routing interface {
// Provide provides the key to the network
Provide(context.Context, cid.Cid) error
}

// NetworkStats is a container for statistics about the bitswap network
// the numbers inside are specific to bitswap, and not any other protocols
// using the same underlying network.
type NetworkStats struct {
MessagesSent uint64
MessagesRecvd uint64
}
13 changes: 13 additions & 0 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
Expand Down Expand Up @@ -48,6 +49,8 @@ type impl struct {

// inbound messages from the network are forwarded to the receiver
receiver Receiver

stats NetworkStats
}

type streamMessageSender struct {
Expand Down Expand Up @@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage(
s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
go inet.AwaitEOF(s)
return s.Close()
Expand Down Expand Up @@ -210,13 +215,21 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
}
}

func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager {
return bsnet.host.ConnManager()
}

func (bsnet *impl) Stats() NetworkStats {
return NetworkStats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
}

type netNotifiee impl

func (nn *netNotifiee) impl() *impl {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
},
{
"author": "hsanjuan",
"hash": "QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM",
"hash": "QmUdh9184Bozfinyn5YDhgPRg33E3KR3btfZXcVoFgTxD4",
"name": "go-ipfs-exchange-interface",
"version": "0.1.0"
"version": "0.1.1"
},
{
"author": "whyrusleeping",
Expand Down
3 changes: 2 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
Expand Down Expand Up @@ -51,7 +52,7 @@ type Session struct {

// NewSession creates a new bitswap session whose lifetime is bounded by the
// given context
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[cid.Cid]time.Time),
Expand Down
Loading