Skip to content

Commit

Permalink
bitswap: WIP extract provider workers
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Oct 24, 2017
1 parent 005d243 commit 561d784
Show file tree
Hide file tree
Showing 29 changed files with 301 additions and 155 deletions.
16 changes: 14 additions & 2 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
exchange "github.com/ipfs/go-ipfs/exchange"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"

"github.com/ipfs/go-ipfs/exchange/providers"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Expand All @@ -38,34 +39,38 @@ type BlockService interface {
type blockService struct {
blockstore blockstore.Blockstore
exchange exchange.Interface
providers providers.Interface

// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
}

// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
func New(bs blockstore.Blockstore, rem exchange.Interface, p providers.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &blockService{
blockstore: bs,
exchange: rem,
providers: p,
checkFirst: true,
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface, p providers.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &blockService{
blockstore: bs,
exchange: rem,
providers: p,
checkFirst: false,
}
}
Expand Down Expand Up @@ -119,6 +124,10 @@ func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
return nil, errors.New("blockservice is closed")
}

if err := s.providers.Provide(o.Cid()); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
}

Expand Down Expand Up @@ -148,6 +157,9 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}
if err := s.providers.Provide(o.Cid()); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}

ks = append(ks, o.Cid())
}
Expand Down
2 changes: 1 addition & 1 deletion blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestWriteThroughWorks(t *testing.T) {
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
bserv := NewWriteThrough(bstore, exch)
bserv := NewWriteThrough(bstore, exch, offline.Providers())
bgen := butil.NewBlockGenerator()

block := bgen.Next()
Expand Down
2 changes: 1 addition & 1 deletion blockservice/test/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func newObject(data []byte) blocks.Block {

func TestBlocks(t *testing.T) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bs := New(bstore, offline.Exchange(bstore))
bs := New(bstore, offline.Exchange(bstore), offline.Providers())
defer bs.Close()

o := newObject([]byte("beep boop"))
Expand Down
2 changes: 1 addition & 1 deletion blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Mocks(n int) []BlockService {

var servs []BlockService
for _, i := range instances {
servs = append(servs, New(i.Blockstore(), i.Exchange))
servs = append(servs, New(i.Blockstore(), i.Exchange, i.Providers))
}
return servs
}
5 changes: 3 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
} else {
n.Exchange = offline.Exchange(n.Blockstore)
n.Providers = offline.Providers()
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
n.Blocks = bserv.New(n.Blockstore, n.Exchange, n.Providers)
n.DAG = dag.NewDAGService(n.Blocks)

internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)))
internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore), offline.Providers()))
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicity on
Expand Down
4 changes: 3 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,14 @@ You can now refer to the added file in a gateway, like so:
}

exch := n.Exchange
prov := n.Providers
local, _, _ := req.Option("local").Bool()
if local {
exch = offline.Exchange(addblockstore)
prov = offline.Providers()
}

bserv := blockservice.New(addblockstore, exch)
bserv := blockservice.New(addblockstore, exch, prov)
dserv := dag.NewDAGService(bserv)

fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv)
Expand Down
11 changes: 10 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
provider "github.com/ipfs/go-ipfs/exchange/providers"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
"gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
Expand Down Expand Up @@ -157,6 +158,14 @@ var bitswapStatCmd = &cmds.Command{
return
}

// ProvideBuf has been moved out of bitswap
ps, err := nd.Providers.Stat()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
st.ProvideBufLen = ps.ProvideBufLen

res.SetOutput(st)
},
Marshalers: cmds.MarshalerMap{
Expand All @@ -167,7 +176,7 @@ var bitswapStatCmd = &cmds.Command{
}
buf := new(bytes.Buffer)
fmt.Fprintln(buf, "bitswap status")
fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, provider.HasBlockBufferSize)
fmt.Fprintf(buf, "\tblocks received: %d\n", out.BlocksReceived)
fmt.Fprintf(buf, "\tblocks sent: %d\n", out.BlocksSent)
fmt.Fprintf(buf, "\tdata received: %d\n", out.DataReceived)
Expand Down
6 changes: 4 additions & 2 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ The JSON output contains type information.

dserv := nd.DAG
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
exch := offline.Exchange(nd.Blockstore)
prov := offline.Providers()

bserv := blockservice.New(nd.Blockstore, exch, prov)
dserv = merkledag.NewDAGService(bserv)
}

Expand Down
7 changes: 6 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
exchange "github.com/ipfs/go-ipfs/exchange"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
provider "github.com/ipfs/go-ipfs/exchange/providers"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
Expand Down Expand Up @@ -128,6 +129,7 @@ type IpfsNode struct {
PeerHost p2phost.Host // the network host (server+client)
Bootstrapper io.Closer // the periodic bootstrapper
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Providers provider.Interface // the content routing abstraction layer
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Ping *ping.PingService
Expand Down Expand Up @@ -435,9 +437,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
// Wrap standard peer host with routing system to allow unknown peer lookups
n.PeerHost = rhost.Wrap(host, n.Routing)

// Wrap content routing with a buffering layer
n.Providers = provider.NewProviders(ctx, n.Routing)

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing, n.Providers)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)

size, err := n.getCacheSize()
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func outputDagnode(out chan interface{}, name string, dn node.Node) error {
func NewMemoryDagService() dag.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
bsrv := bserv.New(bs, offline.Exchange(bs), offline.Providers())
return dag.NewDAGService(bsrv)
}

Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
}

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
bserv := blockservice.New(bs, node.Exchange, node.Providers)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func getDagserv(t *testing.T) merkledag.DAGService {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
blockserv := bserv.New(bs, offline.Exchange(bs), offline.Providers())
return merkledag.NewDAGService(blockserv)
}

Expand Down
37 changes: 7 additions & 30 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
Expand All @@ -37,29 +36,18 @@ const (
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32

self = peer.ID("")
)

var (
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 512

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

func init() {
if flags.LowMemMode {
HasBlockBufferSize = 64
provideKeysBufferSize = 512
provideWorkerMax = 16
}
}

var rebroadcastDelay = delay.Fixed(time.Minute)

// New initializes a BitSwap instance that communicates over the provided
Expand Down Expand Up @@ -96,8 +84,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
counters: new(counters),

Expand Down Expand Up @@ -143,12 +129,6 @@ type Bitswap struct {

// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid

process process.Process

Expand Down Expand Up @@ -298,7 +278,8 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlockFrom(blk, "")
//TODO: call provide here?
return bs.receiveBlockFrom(blk, self)
}

// TODO: Some of this stuff really only needs to be done when adding a block
Expand Down Expand Up @@ -333,13 +314,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
}

bs.engine.AddBlock(blk)

select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
return nil
}

Expand Down Expand Up @@ -395,6 +369,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
if err := bs.receiveBlockFrom(b, p); err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
}
if err := bs.network.Provide(ctx, b.Cid()); err != nil {
log.Warningf("ReceiveMessage Provide error: %s", err)
}
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}(block)
}
Expand Down
16 changes: 10 additions & 6 deletions exchange/bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
provider "github.com/ipfs/go-ipfs/exchange/providers"

inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
Expand All @@ -25,10 +26,11 @@ var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, p provider.Interface) BitSwapNetwork {
bitswapNetwork := impl{
host: host,
routing: r,
host: host,
routing: r,
provider: p,
}
host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
host.SetStreamHandler(ProtocolBitswapOne, bitswapNetwork.handleNewStream)
Expand All @@ -42,8 +44,9 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
type impl struct {
host host.Host
routing routing.ContentRouting
host host.Host
routing routing.ContentRouting
provider provider.Interface

// inbound messages from the network are forwarded to the receiver
receiver Receiver
Expand Down Expand Up @@ -137,6 +140,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
}

// FindProvidersAsync returns a channel of providers for the given key
// TODO: move this and other FindProvider stuff out to exch.provider
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {

// Since routing queries are expensive, give bitswap the peers to which we
Expand Down Expand Up @@ -174,7 +178,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int)

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error {
return bsnet.routing.Provide(ctx, k, true)
return bsnet.provider.Provide(k)
}

// handleNewStream receives a new stream from the network.
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Stat struct {

func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
st.ProvideBufLen = -1
st.Wantlist = bs.GetWantlist()
bs.counterLk.Lock()
c := bs.counters
Expand Down
Loading

0 comments on commit 561d784

Please sign in to comment.