From f868b7c5fa52a934822a131b224c416da6862d8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 5 Dec 2017 15:28:31 +0100 Subject: [PATCH] providers: ProvideRecursive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/builder.go | 3 +++ core/commands/add.go | 8 +++++++- providers/providers.go | 35 ++++++++++++++++++++++++++++++++--- providers/workers.go | 7 +++++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/core/builder.go b/core/builder.go index 288368e30147..221ac1688a27 100644 --- a/core/builder.go +++ b/core/builder.go @@ -15,6 +15,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" resolver "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + providers "github.com/ipfs/go-ipfs/providers" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/verifbs" @@ -220,7 +221,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } } else { + n.SetupOfflineRouting() n.Exchange = offline.Exchange(n.Blockstore) + n.Providers = providers.NewProviders(n.ctx, n.Routing, nil) } n.Blocks = bserv.New(n.Blockstore, n.Exchange) diff --git a/core/commands/add.go b/core/commands/add.go index bffca579fa9b..ca14754679d8 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -313,7 +313,7 @@ You can now check what blocks have been created by: } // copy intermediary nodes from editor to our actual dagservice - _, err := fileAdder.Finalize() + nd, err := fileAdder.Finalize() if err != nil { return err } @@ -322,6 +322,12 @@ You can now check what blocks have been created by: return nil } + if !local { + if err := n.Providers.ProvideRecursive(req.Context(), nd, dserv); err != nil { + return err + } + } + return fileAdder.PinRoot() } diff --git a/providers/providers.go b/providers/providers.go index 0b0469ca2950..ca568fadcd40 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -13,12 +13,13 @@ import ( pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" + ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ) const ( provideTimeout = time.Second * 15 - // maxProvidersPerRequest specifies the maximum number of providers desired + // MaxProvidersPerRequest specifies the maximum number of providers desired // from the network. This value is specified because the network streams // results. // TODO: if a 'non-nice' strategy is implemented, consider increasing this value @@ -46,8 +47,10 @@ type blockRequest struct { // Interface is an definition of providers interface to libp2p routing system type Interface interface { - Provide(*cid.Cid) error - FindProviders(ctx context.Context, c *cid.Cid) error + Provide(k *cid.Cid) error + ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error + + FindProviders(ctx context.Context, k *cid.Cid) error FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID Stat() (*Stat, error) @@ -119,6 +122,29 @@ func (p *providers) Provide(b *cid.Cid) error { return nil } +func (p *providers) provideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter, done *cid.Set) error { + p.Provide(n.Cid()) + + for _, l := range n.Links() { + if !done.Visit(l.Cid) { + continue + } + + sub, err := l.GetNode(ctx, serv) + if err != nil { + return err + } + if err := p.provideRecursive(ctx, sub, serv, done); err != nil { + return err + } + } + return nil +} + +func (p *providers) ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error { + return p.provideRecursive(ctx, n, serv, cid.NewSet()) +} + func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error { select { case <-ctx.Done(): @@ -130,6 +156,9 @@ func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error { // FindProvidersAsync returns a channel of providers for the given key func (p *providers) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { + if p.host == nil { + return nil + } // Since routing queries are expensive, give bitswap the peers to which we // have open connections. Note that this may cause issues if bitswap starts diff --git a/providers/workers.go b/providers/workers.go index e949c83d0c6d..76d5d43ce7d7 100644 --- a/providers/workers.go +++ b/providers/workers.go @@ -13,6 +13,13 @@ import ( ) func (p *providers) startWorkers(ctx context.Context, px process.Process) { + if p.host != nil { + // Start up a worker to handle block requests this node is making + px.Go(func(px process.Process) { + p.providerQueryManager(ctx) + }) + } + // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { p.provideCollector(ctx)