diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 6bd04c3abbe6..86d2496655df 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -12,6 +12,7 @@ import ( exchange "github.com/ipfs/go-ipfs/exchange" bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" + "github.com/ipfs/go-ipfs/exchange/providers" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" @@ -38,13 +39,15 @@ type BlockService interface { type blockService struct { blockstore blockstore.Blockstore exchange exchange.Interface + providers providers.Interface + // If checkFirst is true then first check that a block doesn't // already exist to avoid republishing the block on the exchange. checkFirst bool } // NewBlockService creates a BlockService with given datastore instance. -func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService { +func New(bs blockstore.Blockstore, rem exchange.Interface, p providers.Interface) BlockService { if rem == nil { log.Warning("blockservice running in local (offline) mode.") } @@ -52,13 +55,14 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService { return &blockService{ blockstore: bs, exchange: rem, + providers: p, checkFirst: true, } } // NewWriteThrough ceates a BlockService that guarantees writes will go // through to the blockstore and are not skipped by cache checks. -func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService { +func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface, p providers.Interface) BlockService { if rem == nil { log.Warning("blockservice running in local (offline) mode.") } @@ -66,6 +70,7 @@ func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockServ return &blockService{ blockstore: bs, exchange: rem, + providers: p, checkFirst: false, } } @@ -119,6 +124,10 @@ func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { return nil, errors.New("blockservice is closed") } + if err := s.providers.Provide(o.Cid()); err != nil { + return nil, errors.New("blockservice is closed") + } + return c, nil } @@ -148,6 +157,9 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { if err := s.exchange.HasBlock(o); err != nil { return nil, fmt.Errorf("blockservice is closed (%s)", err) } + if err := s.providers.Provide(o.Cid()); err != nil { + return nil, fmt.Errorf("blockservice is closed (%s)", err) + } ks = append(ks, o.Cid()) } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 1ce735f8bdf2..74290f137806 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -19,7 +19,7 @@ func TestWriteThroughWorks(t *testing.T) { } bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) exch := offline.Exchange(bstore2) - bserv := NewWriteThrough(bstore, exch) + bserv := NewWriteThrough(bstore, exch, offline.Providers()) bgen := butil.NewBlockGenerator() block := bgen.Next() diff --git a/blockservice/test/blocks_test.go b/blockservice/test/blocks_test.go index 59e1b4e91c55..200f5b5e16d8 100644 --- a/blockservice/test/blocks_test.go +++ b/blockservice/test/blocks_test.go @@ -24,7 +24,7 @@ func newObject(data []byte) blocks.Block { func TestBlocks(t *testing.T) { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - bs := New(bstore, offline.Exchange(bstore)) + bs := New(bstore, offline.Exchange(bstore), offline.Providers()) defer bs.Close() o := newObject([]byte("beep boop")) diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index 622d1c8d6892..570e111e3889 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -17,7 +17,7 @@ func Mocks(n int) []BlockService { var servs []BlockService for _, i := range instances { - servs = append(servs, New(i.Blockstore(), i.Exchange)) + servs = append(servs, New(i.Blockstore(), i.Exchange, i.Providers)) } return servs } diff --git a/core/builder.go b/core/builder.go index 065c9cbd87e0..16ab7569dbe1 100644 --- a/core/builder.go +++ b/core/builder.go @@ -215,12 +215,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } } else { n.Exchange = offline.Exchange(n.Blockstore) + n.Providers = offline.Providers() } - n.Blocks = bserv.New(n.Blockstore, n.Exchange) + n.Blocks = bserv.New(n.Blockstore, n.Exchange, n.Providers) n.DAG = dag.NewDAGService(n.Blocks) - internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore))) + internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore), offline.Providers())) n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag) if err != nil { // TODO: we should move towards only running 'NewPinner' explicity on diff --git a/core/commands/add.go b/core/commands/add.go index 32435eda9a67..bed3f944e1c5 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -227,12 +227,14 @@ You can now refer to the added file in a gateway, like so: } exch := n.Exchange + prov := n.Providers local, _, _ := req.Option("local").Bool() if local { exch = offline.Exchange(addblockstore) + prov = offline.Providers() } - bserv := blockservice.New(addblockstore, exch) + bserv := blockservice.New(addblockstore, exch, prov) dserv := dag.NewDAGService(bserv) fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv) diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index fae62c5383f7..9d5ce6071c13 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -8,6 +8,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision" + provider "github.com/ipfs/go-ipfs/exchange/providers" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" @@ -157,6 +158,14 @@ var bitswapStatCmd = &cmds.Command{ return } + // ProvideBuf has been moved out of bitswap + ps, err := nd.Providers.Stat() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + st.ProvideBufLen = ps.ProvideBufLen + res.SetOutput(st) }, Marshalers: cmds.MarshalerMap{ @@ -167,7 +176,7 @@ var bitswapStatCmd = &cmds.Command{ } buf := new(bytes.Buffer) fmt.Fprintln(buf, "bitswap status") - fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize) + fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, provider.HasBlockBufferSize) fmt.Fprintf(buf, "\tblocks received: %d\n", out.BlocksReceived) fmt.Fprintf(buf, "\tblocks sent: %d\n", out.BlocksSent) fmt.Fprintf(buf, "\tdata received: %d\n", out.DataReceived) diff --git a/core/commands/ls.go b/core/commands/ls.go index 8c6bbb44d91a..7ae9b551b676 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -75,8 +75,10 @@ The JSON output contains type information. dserv := nd.DAG if !resolve { - offlineexch := offline.Exchange(nd.Blockstore) - bserv := blockservice.New(nd.Blockstore, offlineexch) + exch := offline.Exchange(nd.Blockstore) + prov := offline.Providers() + + bserv := blockservice.New(nd.Blockstore, exch, prov) dserv = merkledag.NewDAGService(bserv) } diff --git a/core/core.go b/core/core.go index 2326a273d086..45a2b79d3491 100644 --- a/core/core.go +++ b/core/core.go @@ -26,6 +26,7 @@ import ( exchange "github.com/ipfs/go-ipfs/exchange" bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + provider "github.com/ipfs/go-ipfs/exchange/providers" rp "github.com/ipfs/go-ipfs/exchange/reprovide" filestore "github.com/ipfs/go-ipfs/filestore" mount "github.com/ipfs/go-ipfs/fuse/mount" @@ -128,6 +129,7 @@ type IpfsNode struct { PeerHost p2phost.Host // the network host (server+client) Bootstrapper io.Closer // the periodic bootstrapper Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Providers provider.Interface // the content routing abstraction layer Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Ping *ping.PingService @@ -435,9 +437,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost // Wrap standard peer host with routing system to allow unknown peer lookups n.PeerHost = rhost.Wrap(host, n.Routing) + // Wrap content routing with a buffering layer + n.Providers = provider.NewProviders(ctx, n.Routing) + // setup exchange service const alwaysSendToPeer = true // use YesManStrategy - bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) + bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing, n.Providers) n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) size, err := n.getCacheSize() diff --git a/core/coreunix/add.go b/core/coreunix/add.go index eca42b0a838a..2c279b0db8ae 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -568,7 +568,7 @@ func outputDagnode(out chan interface{}, name string, dn node.Node) error { func NewMemoryDagService() dag.DAGService { // build mem-datastore for editor's intermediary nodes bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - bsrv := bserv.New(bs, offline.Exchange(bs)) + bsrv := bserv.New(bs, offline.Exchange(bs), offline.Providers()) return dag.NewDAGService(bsrv) } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index add4395eb5ea..9f6f60f26270 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -169,7 +169,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { } bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t} - bserv := blockservice.New(bs, node.Exchange) + bserv := blockservice.New(bs, node.Exchange, node.Providers) dserv := dag.NewDAGService(bserv) adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv) if err != nil { diff --git a/core/coreunix/metadata_test.go b/core/coreunix/metadata_test.go index ddb22c751152..3ab51fd7a413 100644 --- a/core/coreunix/metadata_test.go +++ b/core/coreunix/metadata_test.go @@ -25,7 +25,7 @@ import ( func getDagserv(t *testing.T) merkledag.DAGService { db := dssync.MutexWrap(ds.NewMapDatastore()) bs := bstore.NewBlockstore(db) - blockserv := bserv.New(bs, offline.Exchange(bs)) + blockserv := bserv.New(bs, offline.Exchange(bs), offline.Providers()) return merkledag.NewDAGService(blockserv) } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 35d48a35b40a..6660fc4ddb21 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -16,7 +16,6 @@ import ( bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" - flags "github.com/ipfs/go-ipfs/flags" "github.com/ipfs/go-ipfs/thirdparty/delay" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" @@ -37,29 +36,18 @@ const ( // TODO: if a 'non-nice' strategy is implemented, consider increasing this value maxProvidersPerRequest = 3 providerRequestTimeout = time.Second * 10 - provideTimeout = time.Second * 15 sizeBatchRequestChan = 32 // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 + + self = peer.ID("") ) var ( - HasBlockBufferSize = 256 - provideKeysBufferSize = 2048 - provideWorkerMax = 512 - // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) -func init() { - if flags.LowMemMode { - HasBlockBufferSize = 64 - provideKeysBufferSize = 512 - provideWorkerMax = 16 - } -} - var rebroadcastDelay = delay.Fixed(time.Minute) // New initializes a BitSwap instance that communicates over the provided @@ -96,8 +84,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, network: network, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan *cid.Cid, HasBlockBufferSize), - provideKeys: make(chan *cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), counters: new(counters), @@ -143,12 +129,6 @@ type Bitswap struct { // findKeys sends keys to a worker to find and connect to providers for them findKeys chan *blockRequest - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan *cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan *cid.Cid process process.Process @@ -298,7 +278,8 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlockFrom(blk, "") + //TODO: call provide here? + return bs.receiveBlockFrom(blk, self) } // TODO: Some of this stuff really only needs to be done when adding a block @@ -333,13 +314,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { } bs.engine.AddBlock(blk) - - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } return nil } @@ -395,6 +369,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg if err := bs.receiveBlockFrom(b, p); err != nil { log.Warningf("ReceiveMessage recvBlockFrom error: %s", err) } + if err := bs.network.Provide(ctx, b.Cid()); err != nil { + log.Warningf("ReceiveMessage Provide error: %s", err) + } log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid()) }(block) } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index d1dcbfe0fc63..4ff25b5788b6 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -7,6 +7,7 @@ import ( "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + provider "github.com/ipfs/go-ipfs/exchange/providers" inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" @@ -25,10 +26,11 @@ var log = logging.Logger("bitswap_network") var sendMessageTimeout = time.Minute * 10 // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host -func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, r routing.ContentRouting, p provider.Interface) BitSwapNetwork { bitswapNetwork := impl{ - host: host, - routing: r, + host: host, + routing: r, + provider: p, } host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream) host.SetStreamHandler(ProtocolBitswapOne, bitswapNetwork.handleNewStream) @@ -42,8 +44,9 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { // impl transforms the ipfs network interface, which sends and receives // NetMessage objects, into the bitswap network interface. type impl struct { - host host.Host - routing routing.ContentRouting + host host.Host + routing routing.ContentRouting + provider provider.Interface // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -137,6 +140,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { } // FindProvidersAsync returns a channel of providers for the given key +// TODO: move this and other FindProvider stuff out to exch.provider func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { // Since routing queries are expensive, give bitswap the peers to which we @@ -174,7 +178,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) // Provide provides the key to the network func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error { - return bsnet.routing.Provide(ctx, k, true) + return bsnet.provider.Provide(k) } // handleNewStream receives a new stream from the network. diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index 39f02c1c9315..743ee55a05ed 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -20,7 +20,7 @@ type Stat struct { func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) - st.ProvideBufLen = len(bs.newBlocks) + st.ProvideBufLen = -1 st.Wantlist = bs.GetWantlist() bs.counterLk.Lock() c := bs.counters diff --git a/exchange/bitswap/testnet/peernet.go b/exchange/bitswap/testnet/peernet.go index 32438508a131..78975b34f0a8 100644 --- a/exchange/bitswap/testnet/peernet.go +++ b/exchange/bitswap/testnet/peernet.go @@ -2,7 +2,9 @@ package bitswap import ( "context" + bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + pr "github.com/ipfs/go-ipfs/exchange/providers" mockrouting "github.com/ipfs/go-ipfs/routing/mock" ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" testutil "gx/ipfs/QmWRCn8vruNAzHx8i6SAXinuheRitKEGu8c7m26stKvsYx/go-testutil" @@ -25,7 +27,9 @@ func (pn *peernet) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { panic(err.Error()) } routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) - return bsnet.NewFromIpfsHost(client, routing) + provider := pr.NewProviders(context.TODO(), routing) + + return bsnet.NewFromIpfsHost(client, routing, provider) } func (pn *peernet) HasPeer(p peer.ID) bool { diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index ca7b9a60b568..96df0c092e18 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -10,6 +10,8 @@ import ( delay "github.com/ipfs/go-ipfs/thirdparty/delay" testutil "gx/ipfs/QmWRCn8vruNAzHx8i6SAXinuheRitKEGu8c7m26stKvsYx/go-testutil" + "github.com/ipfs/go-ipfs/exchange/offline" + "github.com/ipfs/go-ipfs/exchange/providers" p2ptestutil "gx/ipfs/QmQGX417WoxKxDJeHqouMEmmH4G1RCENNSzkZYHrXy3Xb3/go-libp2p-netutil" ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" ds_sync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync" @@ -68,6 +70,7 @@ func (g *SessionGenerator) Instances(n int) []Instance { type Instance struct { Peer peer.ID Exchange *Bitswap + Providers providers.Interface blockstore blockstore.Blockstore blockstoreDelay delay.D @@ -106,6 +109,7 @@ func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instanc return Instance{ Peer: p.ID(), Exchange: bs, + Providers: offline.Providers(), //TODO: make sure this is correct blockstore: bstore, blockstoreDelay: bsdelay, } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 3ce4f44c7c0a..8c08135f3337 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -10,7 +10,6 @@ import ( cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" - procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" ) @@ -35,16 +34,6 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { px.Go(func(px process.Process) { bs.rebroadcastWorker(ctx) }) - - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) } func (bs *Bitswap) taskWorker(ctx context.Context, id int) { @@ -85,85 +74,6 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } -func (bs *Bitswap) provideWorker(px process.Process) { - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k *cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - ev := logging.LoggableMap{"ID": wid} - - ctx := procctx.OnClosingContext(px) // derive ctx from px - defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done() - - ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warning(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - ev := logging.LoggableMap{"ID": 1} - log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - -func (bs *Bitswap) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []*cid.Cid - var nextKey *cid.Cid - var keysOut chan *cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index fed9c2d8756f..d880b188c1ad 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -9,6 +9,7 @@ import ( exchange "github.com/ipfs/go-ipfs/exchange" blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" + "github.com/ipfs/go-ipfs/exchange/providers" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" ) @@ -72,3 +73,19 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []*cid.Cid) (<-chan func (e *offlineExchange) IsOnline() bool { return false } + +type offlineProviders struct{} + +func Providers() providers.Interface { + return &offlineProviders{} +} + +// Provide always returns nil. +func (p *offlineProviders) Provide(*cid.Cid) error { + return nil +} + +// Provide always returns nil. +func (p *offlineProviders) Stat() (*providers.Stat, error) { + return nil, nil +} diff --git a/exchange/providers/providers.go b/exchange/providers/providers.go new file mode 100644 index 000000000000..cd9463ebb662 --- /dev/null +++ b/exchange/providers/providers.go @@ -0,0 +1,87 @@ +package providers + +import ( + "context" + "time" + + flags "github.com/ipfs/go-ipfs/flags" + + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" + procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" +) + +var ( + provideKeysBufferSize = 2048 + HasBlockBufferSize = 256 + + provideWorkerMax = 512 + provideTimeout = time.Second * 15 +) + +var log = logging.Logger("providers") + +// Interface is an definition of providers interface to libp2p routing system +type Interface interface { + Provide(*cid.Cid) error + Stat() (*Stat, error) +} + +type providers struct { + routing routing.ContentRouting + process process.Process + + // newBlocks is a channel for newly added blocks to be provided to the + // network. blocks pushed down this channel get buffered and fed to the + // provideKeys channel later on to avoid too much network activity + newBlocks chan *cid.Cid + // provideKeys directly feeds provide workers + provideKeys chan *cid.Cid +} + +func init() { + if flags.LowMemMode { + HasBlockBufferSize = 64 + provideKeysBufferSize = 512 + provideWorkerMax = 16 + } +} + +func NewProviders(parent context.Context, routing routing.ContentRouting) Interface { + ctx, cancelFunc := context.WithCancel(parent) + + px := process.WithTeardown(func() error { + return nil + }) + + p := &providers{ + routing: routing, + process: px, + + newBlocks: make(chan *cid.Cid, HasBlockBufferSize), + provideKeys: make(chan *cid.Cid, provideKeysBufferSize), + } + + p.startWorkers(px, ctx) + // bind the context and process. + // do it over here to avoid closing before all setup is done. + go func() { + <-px.Closing() // process closes first + cancelFunc() + }() + procctx.CloseAfterContext(px, ctx) // parent cancelled first + + return p +} + +func (p *providers) Provide(b *cid.Cid) error { + select { + case p.newBlocks <- b: + // send block off to be provided to the network + case <-p.process.Closing(): + return p.process.Close() + } + return nil +} diff --git a/exchange/providers/stat.go b/exchange/providers/stat.go new file mode 100644 index 000000000000..072a95d7c3d2 --- /dev/null +++ b/exchange/providers/stat.go @@ -0,0 +1,11 @@ +package providers + +type Stat struct { + ProvideBufLen int +} + +func (p *providers) Stat() (*Stat, error) { + return &Stat{ + ProvideBufLen: len(p.newBlocks), + }, nil +} diff --git a/exchange/providers/workers.go b/exchange/providers/workers.go new file mode 100644 index 000000000000..5534120506b8 --- /dev/null +++ b/exchange/providers/workers.go @@ -0,0 +1,101 @@ +package providers + +import ( + "context" + + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" + procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" +) + +func (p *providers) startWorkers(px process.Process, ctx context.Context) { + // Start up a worker to manage sending out provides messages + px.Go(func(px process.Process) { + p.provideCollector(ctx) + }) + + // Spawn up multiple workers to handle incoming blocks + // consider increasing number if providing blocks bottlenecks + // file transfers + px.Go(p.provideWorker) +} + +func (p *providers) provideWorker(px process.Process) { + + limit := make(chan struct{}, provideWorkerMax) + + limitedGoProvide := func(k *cid.Cid, wid int) { + defer func() { + // replace token when done + <-limit + }() + ev := logging.LoggableMap{"ID": wid} + + ctx := procctx.OnClosingContext(px) // derive ctx from px + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done() + + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + defer cancel() + + if err := p.routing.Provide(ctx, k, true); err != nil { + log.Warning(err) + } + } + + // worker spawner, reads from bs.provideKeys until it closes, spawning a + // _ratelimited_ number of workers to handle each key. + for wid := 2; ; wid++ { + ev := logging.LoggableMap{"ID": 1} + log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) + + select { + case <-px.Closing(): + return + case k, ok := <-p.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } + select { + case <-px.Closing(): + return + case limit <- struct{}{}: + go limitedGoProvide(k, wid) + } + } + } +} + +func (p *providers) provideCollector(ctx context.Context) { + defer close(p.provideKeys) + var toProvide []*cid.Cid + var nextKey *cid.Cid + var keysOut chan *cid.Cid + + for { + select { + case blkey, ok := <-p.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + + if keysOut == nil { + nextKey = blkey + keysOut = p.provideKeys + } else { + toProvide = append(toProvide, blkey) + } + case keysOut <- nextKey: + if len(toProvide) > 0 { + nextKey = toProvide[0] + toProvide = toProvide[1:] + } else { + keysOut = nil + } + case <-ctx.Done(): + return + } + } +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index bcf0e84a1c5d..2c87c5e0d03e 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -120,7 +120,7 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, er func (n *dagService) GetOfflineLinkService() LinkService { if n.Blocks.Exchange().IsOnline() { - bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore())) + bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()), offline.Providers()) return NewDAGService(bsrv) } else { return n diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 5caabe94bc6a..a12c63a34976 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -239,7 +239,7 @@ func TestFetchGraph(t *testing.T) { } // create an offline dagstore and ensure all blocks were fetched - bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore())) + bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore()), offline.Providers()) offline_ds := NewDAGService(bs) diff --git a/merkledag/test/utils.go b/merkledag/test/utils.go index 68e2572650e3..d2dd7b70dd09 100644 --- a/merkledag/test/utils.go +++ b/merkledag/test/utils.go @@ -15,5 +15,5 @@ func Mock() dag.DAGService { func Bserv() bsrv.BlockService { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - return bsrv.New(bstore, offline.Exchange(bstore)) + return bsrv.New(bstore, offline.Exchange(bstore), offline.Providers()) } diff --git a/merkledag/utils/utils.go b/merkledag/utils/utils.go index 6f08bf2fc2bc..bfbb143597cd 100644 --- a/merkledag/utils/utils.go +++ b/merkledag/utils/utils.go @@ -30,7 +30,7 @@ type Editor struct { func NewMemoryDagService() dag.DAGService { // build mem-datastore for editor's intermediary nodes bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - bsrv := bserv.New(bs, offline.Exchange(bs)) + bsrv := bserv.New(bs, offline.Exchange(bs), offline.Providers()) return dag.NewDAGService(bsrv) } diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index bebfa8d308d1..d893a1171f61 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -38,7 +38,7 @@ func emptyDirNode() *dag.ProtoNode { func getDagserv(t *testing.T) dag.DAGService { db := dssync.MutexWrap(ds.NewMapDatastore()) bs := bstore.NewBlockstore(db) - blockserv := bserv.New(bs, offline.Exchange(bs)) + blockserv := bserv.New(bs, offline.Exchange(bs), offline.Providers()) return dag.NewDAGService(blockserv) } diff --git a/pin/pin_test.go b/pin/pin_test.go index fc303b6f2354..603294b7e58e 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -51,7 +51,7 @@ func TestPinnerBasic(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) @@ -176,7 +176,7 @@ func TestIsPinnedLookup(t *testing.T) { ctx := context.Background() dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) @@ -276,7 +276,7 @@ func TestDuplicateSemantics(t *testing.T) { ctx := context.Background() dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) @@ -311,7 +311,7 @@ func TestDuplicateSemantics(t *testing.T) { func TestFlush(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) p := NewPinner(dstore, dserv, dserv) @@ -328,7 +328,7 @@ func TestPinRecursiveFail(t *testing.T) { ctx := context.Background() dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) p := NewPinner(dstore, dserv, dserv) @@ -371,7 +371,7 @@ func TestPinRecursiveFail(t *testing.T) { func TestPinUpdate(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) + bserv := bs.New(bstore, offline.Exchange(bstore), offline.Providers()) dserv := mdag.NewDAGService(bserv) p := NewPinner(dstore, dserv, dserv) diff --git a/pin/set_test.go b/pin/set_test.go index 856a86a971b1..f1dcec1f305b 100644 --- a/pin/set_test.go +++ b/pin/set_test.go @@ -39,7 +39,7 @@ func objCount(d ds.Datastore) int { func TestSet(t *testing.T) { dst := ds.NewMapDatastore() bstore := blockstore.NewBlockstore(dst) - ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore))) + ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore), offline.Providers())) // this value triggers the creation of a recursive shard. // If the recursive sharding is done improperly, this will result in