Skip to content

Commit

Permalink
providers: ProvideRecursive
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Mar 25, 2018
1 parent 04a50a2 commit f868b7c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
3 changes: 3 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
resolver "github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
providers "github.com/ipfs/go-ipfs/providers"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/verifbs"
Expand Down Expand Up @@ -220,7 +221,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}
} else {
n.SetupOfflineRouting()
n.Exchange = offline.Exchange(n.Blockstore)
n.Providers = providers.NewProviders(n.ctx, n.Routing, nil)
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
Expand Down
8 changes: 7 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ You can now check what blocks have been created by:
}

// copy intermediary nodes from editor to our actual dagservice
_, err := fileAdder.Finalize()
nd, err := fileAdder.Finalize()
if err != nil {
return err
}
Expand All @@ -322,6 +322,12 @@ You can now check what blocks have been created by:
return nil
}

if !local {
if err := n.Providers.ProvideRecursive(req.Context(), nd, dserv); err != nil {
return err
}
}

return fileAdder.PinRoot()
}

Expand Down
35 changes: 32 additions & 3 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

const (
provideTimeout = time.Second * 15

// maxProvidersPerRequest specifies the maximum number of providers desired
// MaxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
Expand Down Expand Up @@ -46,8 +47,10 @@ type blockRequest struct {

// Interface is an definition of providers interface to libp2p routing system
type Interface interface {
Provide(*cid.Cid) error
FindProviders(ctx context.Context, c *cid.Cid) error
Provide(k *cid.Cid) error
ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error

FindProviders(ctx context.Context, k *cid.Cid) error
FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID

Stat() (*Stat, error)
Expand Down Expand Up @@ -119,6 +122,29 @@ func (p *providers) Provide(b *cid.Cid) error {
return nil
}

func (p *providers) provideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter, done *cid.Set) error {
p.Provide(n.Cid())

for _, l := range n.Links() {
if !done.Visit(l.Cid) {
continue
}

sub, err := l.GetNode(ctx, serv)
if err != nil {
return err
}
if err := p.provideRecursive(ctx, sub, serv, done); err != nil {
return err
}
}
return nil
}

func (p *providers) ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error {
return p.provideRecursive(ctx, n, serv, cid.NewSet())
}

func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error {
select {
case <-ctx.Done():
Expand All @@ -130,6 +156,9 @@ func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error {

// FindProvidersAsync returns a channel of providers for the given key
func (p *providers) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
if p.host == nil {
return nil
}

// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
Expand Down
7 changes: 7 additions & 0 deletions providers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
)

func (p *providers) startWorkers(ctx context.Context, px process.Process) {
if p.host != nil {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
p.providerQueryManager(ctx)
})
}

// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
p.provideCollector(ctx)
Expand Down

0 comments on commit f868b7c

Please sign in to comment.