From a63e224e24d345708f8d264b110be67c8e05db6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:07:50 +0100 Subject: [PATCH 01/13] coreapi: dht interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/dht.go | 28 +++++++++++++++++++++++++ core/coreapi/interface/options/dht.go | 30 +++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 core/coreapi/interface/dht.go create mode 100644 core/coreapi/interface/options/dht.go diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go new file mode 100644 index 00000000000..1c8e68bd162 --- /dev/null +++ b/core/coreapi/interface/dht.go @@ -0,0 +1,28 @@ +package iface + +import ( + "context" + + options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" +) + +// DhtAPI specifies the interface to the DHT +type DhtAPI interface { + // FindPeer queries the DHT for all of the multiaddresses associated with a + // Peer ID + FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error) + + // FindProviders finds peers in the DHT who can provide a specific value + // given a key. + FindProviders(context.Context, Path) (<-chan peer.ID, error) //TODO: is path the right choice here? + + // Provide announces to the network that you are providing given values + Provide(context.Context, Path, ...options.DhtProvideOption) error + + // WithRecursive is an option for Provide which specifies whether to provide + // the given path recursively + WithRecursive(recursive bool) options.DhtProvideOption +} diff --git a/core/coreapi/interface/options/dht.go b/core/coreapi/interface/options/dht.go new file mode 100644 index 00000000000..92fd14f4aa2 --- /dev/null +++ b/core/coreapi/interface/options/dht.go @@ -0,0 +1,30 @@ +package options + +type DhtProvideSettings struct { + Recursive bool +} + +type DhtProvideOption func(*DhtProvideSettings) error + +func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) { + options := &DhtProvideSettings{ + Recursive: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + return options, nil +} + +type DhtOptions struct{} + +func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption { + return func(settings *DhtProvideSettings) error { + settings.Recursive = recursive + return nil + } +} From 36168542c9dbf4cdaab43a76b278b762a3c4d9c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:12:54 +0100 Subject: [PATCH 02/13] coreapi: implement dht api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/coreapi.go | 5 + core/coreapi/dht.go | 240 ++++++++++++++++++++++++++ core/coreapi/interface/coreapi.go | 3 + core/coreapi/interface/dht.go | 6 +- core/coreapi/interface/options/dht.go | 26 +++ 5 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 core/coreapi/dht.go diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index e9ec439518a..7029866088d 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI { func (api *CoreAPI) Pin() coreiface.PinAPI { return (*PinAPI)(api) } + +// Dht returns the DhtAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) Dht() coreiface.DhtAPI { + return &DhtAPI{api, nil} +} diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go new file mode 100644 index 00000000000..1adda6dbedc --- /dev/null +++ b/core/coreapi/dht.go @@ -0,0 +1,240 @@ +package coreapi + +import ( + "context" + "errors" + "fmt" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + dag "github.com/ipfs/go-ipfs/merkledag" + + routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing" + notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications" + ipdht "gx/ipfs/QmVSep2WwKcXxMonPASsAJ3nZVjfVMKgMcaSigxKnUWpJv/go-libp2p-kad-dht" + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" + ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" +) + +var ErrNotDHT = errors.New("routing service is not a DHT") + +type DhtAPI struct { + *CoreAPI + *caopts.DhtOptions +} + +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) { + dht, ok := api.node.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, ErrNotDHT + } + + outChan := make(chan ma.Multiaddr) + events := make(chan *notif.QueryEvent) + ctx = notif.RegisterForQueryEvents(ctx, events) + + go func() { + defer close(outChan) + + sendAddrs := func(responses []*pstore.PeerInfo) error { + for _, response := range responses { + for _, addr := range response.Addrs { + select { + case outChan <- addr: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + } + + for event := range events { + if event.Type == notif.FinalPeer { + err := sendAddrs(event.Responses) + if err != nil { + return + } + } + } + }() + + go func() { + defer close(events) + pi, err := dht.FindPeer(ctx, peer.ID(p)) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + return + } + + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.FinalPeer, + Responses: []*pstore.PeerInfo{&pi}, + }) + }() + + return outChan, nil +} + +func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) { + settings, err := caopts.DhtFindProvidersOptions(opts...) + if err != nil { + return nil, err + } + + dht, ok := api.node.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, ErrNotDHT + } + + p, err = api.ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + c := p.Cid() + + numProviders := settings.NumProviders + if numProviders < 1 { + return nil, fmt.Errorf("number of providers must be greater than 0") + } + + outChan := make(chan peer.ID) + events := make(chan *notif.QueryEvent) + ctx = notif.RegisterForQueryEvents(ctx, events) + + pchan := dht.FindProvidersAsync(ctx, c, numProviders) + go func() { + defer close(outChan) + + sendProviders := func(responses []*pstore.PeerInfo) error { + for _, response := range responses { + select { + case outChan <- response.ID: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + } + + for event := range events { + if event.Type == notif.Provider { + err := sendProviders(event.Responses) + if err != nil { + return + } + } + } + }() + + go func() { + defer close(events) + for p := range pchan { + np := p + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.Provider, + Responses: []*pstore.PeerInfo{&np}, + }) + } + }() + + return outChan, nil +} + +func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error { + settings, err := caopts.DhtProvideOptions(opts...) + if err != nil { + return err + } + + if api.node.Routing == nil { + return errors.New("cannot provide in offline mode") + } + + if len(api.node.PeerHost.Network().Conns()) == 0 { + return errors.New("cannot provide, no connected peers") + } + + c := path.Cid() + + has, err := api.node.Blockstore.Has(c) + if err != nil { + return err + } + + if !has { + return fmt.Errorf("block %s not found locally, cannot provide", c) + } + + //TODO: either remove or use + //outChan := make(chan interface{}) + + //events := make(chan *notif.QueryEvent) + //ctx = notif.RegisterForQueryEvents(ctx, events) + + /*go func() { + defer close(outChan) + for range events { + select { + case <-ctx.Done(): + return + default: + } + } + }()*/ + + //defer close(events) + if settings.Recursive { + err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c}) + } else { + err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c}) + } + if err != nil { + return err + } + + return nil +} + +func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error { + for _, c := range cids { + err := r.Provide(ctx, c, true) + if err != nil { + return err + } + } + return nil +} + +func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error { + provided := cid.NewSet() + for _, c := range cids { + kset := cid.NewSet() + + err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) + if err != nil { + return err + } + + for _, k := range kset.Keys() { + if provided.Has(k) { + continue + } + + err = r.Provide(ctx, k, true) + if err != nil { + return err + } + provided.Add(k) + } + } + + return nil +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 696eefbaf14..9811b75be85 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -31,6 +31,9 @@ type CoreAPI interface { // ObjectAPI returns an implementation of Object API Object() ObjectAPI + // Dht returns an implementation of Dht API + Dht() DhtAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index 1c8e68bd162..ce8509e01a8 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -17,7 +17,11 @@ type DhtAPI interface { // FindProviders finds peers in the DHT who can provide a specific value // given a key. - FindProviders(context.Context, Path) (<-chan peer.ID, error) //TODO: is path the right choice here? + FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan peer.ID, error) //TODO: is path the right choice here? + + // WithNumProviders is an option for FindProviders which specifies the + // number of peers to look for. Default is 20 + WithNumProviders(numProviders int) options.DhtFindProvidersOption // Provide announces to the network that you are providing given values Provide(context.Context, Path, ...options.DhtProvideOption) error diff --git a/core/coreapi/interface/options/dht.go b/core/coreapi/interface/options/dht.go index 92fd14f4aa2..3867e32c075 100644 --- a/core/coreapi/interface/options/dht.go +++ b/core/coreapi/interface/options/dht.go @@ -4,7 +4,12 @@ type DhtProvideSettings struct { Recursive bool } +type DhtFindProvidersSettings struct { + NumProviders int +} + type DhtProvideOption func(*DhtProvideSettings) error +type DhtFindProvidersOption func(*DhtFindProvidersSettings) error func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) { options := &DhtProvideSettings{ @@ -20,6 +25,20 @@ func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) { return options, nil } +func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) { + options := &DhtFindProvidersSettings{ + NumProviders: 20, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + return options, nil +} + type DhtOptions struct{} func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption { @@ -28,3 +47,10 @@ func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption { return nil } } + +func (api *DhtOptions) WithNumProviders(numProviders int) DhtFindProvidersOption { + return func(settings *DhtFindProvidersSettings) error { + settings.NumProviders = numProviders + return nil + } +} From d2e6a61d1e5dad4698d8dd71106cb5c0fad32872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:17:30 +0100 Subject: [PATCH 03/13] coreapi: test using mock swarm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht.go | 31 ++++++----- core/coreapi/interface/dht.go | 4 +- core/coreapi/name_test.go | 12 ++-- core/coreapi/unixfs_test.go | 100 ++++++++++++++++++++++++---------- 4 files changed, 99 insertions(+), 48 deletions(-) diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 1adda6dbedc..385e61ba0c3 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -7,16 +7,16 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - dag "github.com/ipfs/go-ipfs/merkledag" - - routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing" - notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications" - ipdht "gx/ipfs/QmVSep2WwKcXxMonPASsAJ3nZVjfVMKgMcaSigxKnUWpJv/go-libp2p-kad-dht" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" - pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" - cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" - ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" + + dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + ipdht "gx/ipfs/QmRNxiPpZf3skMAtmDJpgHuW9uj1ukqV1zjANj9d6bmHfE/go-libp2p-kad-dht" + ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format" + routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing" + notif "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing/notifications" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" ) var ErrNotDHT = errors.New("routing service is not a DHT") @@ -93,12 +93,12 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... return nil, ErrNotDHT } - p, err = api.ResolvePath(ctx, p) + rp, err := api.ResolvePath(ctx, p) if err != nil { return nil, err } - c := p.Cid() + c := rp.Cid() numProviders := settings.NumProviders if numProviders < 1 { @@ -162,7 +162,12 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return errors.New("cannot provide, no connected peers") } - c := path.Cid() + rp, err := api.ResolvePath(ctx, path) + if err != nil { + return err + } + + c := rp.Cid() has, err := api.node.Blockstore.Has(c) if err != nil { diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index ce8509e01a8..1d23ece1f0d 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -5,8 +5,8 @@ import ( options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" ) // DhtAPI specifies the interface to the DHT diff --git a/core/coreapi/name_test.go b/core/coreapi/name_test.go index 076a06b4311..c0460b1571c 100644 --- a/core/coreapi/name_test.go +++ b/core/coreapi/name_test.go @@ -21,11 +21,13 @@ func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path, func TestBasicPublishResolve(t *testing.T) { ctx := context.Background() - n, api, err := makeAPIIdent(ctx, true) + nds, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } + n := nds[0] + api := apis[0] p, err := addTestObject(ctx, api) if err != nil { @@ -60,11 +62,12 @@ func TestBasicPublishResolve(t *testing.T) { func TestBasicPublishResolveKey(t *testing.T) { ctx := context.Background() - _, api, err := makeAPIIdent(ctx, true) + _, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } + api := apis[0] k, err := api.Key().Generate(ctx, "foo") if err != nil { @@ -107,12 +110,13 @@ func TestBasicPublishResolveTimeout(t *testing.T) { t.Skip("ValidTime doesn't appear to work at this time resolution") ctx := context.Background() - n, api, err := makeAPIIdent(ctx, true) + nds, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } - + n := nds[0] + api := apis[0] p, err := addTestObject(ctx, api) if err != nil { t.Fatal(err) diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index 9d703566856..ccee30f1349 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "fmt" "io" "math" "strings" @@ -14,6 +15,7 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" coreunix "github.com/ipfs/go-ipfs/core/coreunix" + mock "github.com/ipfs/go-ipfs/core/mock" keystore "github.com/ipfs/go-ipfs/keystore" repo "github.com/ipfs/go-ipfs/repo" @@ -22,8 +24,10 @@ import ( peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" datastore "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore" syncds "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore/sync" + mocknet "gx/ipfs/QmUEqyXr97aUbNmQADHYNknjwjjdVpJXEt1UZXmSG81EV4/go-libp2p/p2p/net/mock" unixfs "gx/ipfs/QmWAfTyD6KEBm7bzqNRBPvqKrZCDtn5PGbs9V1DKfnVK59/go-unixfs" config "gx/ipfs/QmYVqYJTVjetcf1guieEgWpK1PZtHPytP624vKzTF1P3r2/go-ipfs-config" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" cbor "gx/ipfs/QmepvyyduWnXHm1G7ybmGbJfQQHTAo36DjP2nvF7H7ZXjE/go-ipld-cbor" ) @@ -36,51 +40,89 @@ var helloStr = "hello, world!" // `echo -n | ipfs add` var emptyFile = "/ipfs/QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH" -func makeAPIIdent(ctx context.Context, fullIdentity bool) (*core.IpfsNode, coreiface.CoreAPI, error) { - var ident config.Identity - if fullIdentity { - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - return nil, nil, err +func makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]*core.IpfsNode, []coreiface.CoreAPI, error) { + mn := mocknet.New(ctx) + + nodes := make([]*core.IpfsNode, n) + apis := make([]coreiface.CoreAPI, n) + + for i := 0; i < n; i++ { + var ident config.Identity + if fullIdentity { + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) + if err != nil { + return nil, nil, err + } + + id, err := peer.IDFromPublicKey(pk) + if err != nil { + return nil, nil, err + } + + kbytes, err := sk.Bytes() + if err != nil { + return nil, nil, err + } + + ident = config.Identity{ + PeerID: id.Pretty(), + PrivKey: base64.StdEncoding.EncodeToString(kbytes), + } + } else { + ident = config.Identity{ + PeerID: testPeerID, + } } - id, err := peer.IDFromPublicKey(pk) - if err != nil { - return nil, nil, err + c := config.Config{} + c.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.%d.1/tcp/4001", i)} + c.Identity = ident + + r := &repo.Mock{ + C: c, + D: syncds.MutexWrap(datastore.NewMapDatastore()), + K: keystore.NewMemKeystore(), } - kbytes, err := sk.Bytes() + node, err := core.NewNode(ctx, &core.BuildCfg{ + Repo: r, + Host: mock.MockHostOption(mn), + Online: fullIdentity, + }) if err != nil { return nil, nil, err } + nodes[i] = node + apis[i] = coreapi.NewCoreAPI(node) + } - ident = config.Identity{ - PeerID: id.Pretty(), - PrivKey: base64.StdEncoding.EncodeToString(kbytes), - } - } else { - ident = config.Identity{ - PeerID: testPeerID, - } + err := mn.LinkAll() + if err != nil { + return nil, nil, err } - r := &repo.Mock{ - C: config.Config{ - Identity: ident, + bsinf := core.BootstrapConfigWithPeers( + []pstore.PeerInfo{ + nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, - D: syncds.MutexWrap(datastore.NewMapDatastore()), - K: keystore.NewMemKeystore(), + ) + + for _, n := range nodes[1:] { + if err := n.Bootstrap(bsinf); err != nil { + return nil, nil, err + } } - node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r}) + + return nodes, apis, nil +} + +func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) { + nd, api, err := makeAPISwarm(ctx, false, 1) if err != nil { return nil, nil, err } - api := coreapi.NewCoreAPI(node) - return node, api, nil -} -func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) { - return makeAPIIdent(ctx, false) + return nd[0], api[0], nil } func TestAdd(t *testing.T) { From 6cc86c11e8fe6da4ef0e07edaa75f020f9c9bdd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:17:53 +0100 Subject: [PATCH 04/13] coreapi: dht tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht_test.go | 112 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 core/coreapi/dht_test.go diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go new file mode 100644 index 00000000000..ed2ee542f08 --- /dev/null +++ b/core/coreapi/dht_test.go @@ -0,0 +1,112 @@ +package coreapi_test + +import ( + "context" + "io" + "io/ioutil" + "testing" + + coreapi "github.com/ipfs/go-ipfs/core/coreapi" + + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" +) + +func TestDhtFindPeer(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 3) + if err != nil { + t.Fatal(err) + } + + out, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity)) + if err != nil { + t.Fatal(err) + } + + addr := <-out + + if addr.String() != "/ip4/127.0.0.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", addr.String()) + } + + out, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity)) + if err != nil { + t.Fatal(err) + } + + addr = <-out + + if addr.String() != "/ip4/127.0.2.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", addr.String()) + } +} + +func TestDhtFindProviders(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 3) + if err != nil { + t.Fatal(err) + } + + p, err := addTestObject(ctx, apis[0]) + if err != nil { + t.Fatal(err) + } + + out, err := apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider := <-out + + if provider.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + } +} + +func TestDhtProvide(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 3) + if err != nil { + t.Fatal(err) + } + + // TODO: replace once there is local add on unixfs or somewhere + data, err := ioutil.ReadAll(&io.LimitedReader{R: rnd, N: 4092}) + if err != nil { + t.Fatal(err) + } + + b := blocks.NewBlock(data) + nds[0].Blockstore.Put(b) + p := coreapi.ParseCid(b.Cid()) + + out, err := apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider := <-out + + if provider.String() != "" { + t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + } + + err = apis[0].Dht().Provide(ctx, p) + if err != nil { + t.Fatal(err) + } + + out, err = apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider = <-out + + if provider.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + } +} From bda76a1874290b814441f9debd83cdc323aa2caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 19 Jul 2018 12:06:03 +0200 Subject: [PATCH 05/13] coreapi: update dht imports after rebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index ed2ee542f08..b8f5f9e054f 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -8,8 +8,8 @@ import ( coreapi "github.com/ipfs/go-ipfs/core/coreapi" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" - blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" + blocks "gx/ipfs/QmR54CzE4UcdFAZDehj6HFyy3eSHhVsJUpjfnhCmscuStS/go-block-format" + peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" ) func TestDhtFindPeer(t *testing.T) { From 906c747064f155ffa24568c835989c8c3b89990c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 19 Jul 2018 13:18:05 +0200 Subject: [PATCH 06/13] coreapi: dht: simplify the implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht.go | 146 ++++------------------------------ core/coreapi/dht_test.go | 36 ++++----- core/coreapi/interface/dht.go | 8 +- 3 files changed, 35 insertions(+), 155 deletions(-) diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 385e61ba0c3..4f371111673 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -9,143 +9,47 @@ import ( caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" + offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline" + blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - ipdht "gx/ipfs/QmRNxiPpZf3skMAtmDJpgHuW9uj1ukqV1zjANj9d6bmHfE/go-libp2p-kad-dht" - ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format" routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing" - notif "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing/notifications" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" + blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore" ) -var ErrNotDHT = errors.New("routing service is not a DHT") - type DhtAPI struct { *CoreAPI *caopts.DhtOptions } -func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) { - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { + pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p)) + if err != nil { + return pstore.PeerInfo{}, err } - outChan := make(chan ma.Multiaddr) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - go func() { - defer close(outChan) - - sendAddrs := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - for _, addr := range response.Addrs { - select { - case outChan <- addr: - case <-ctx.Done(): - return ctx.Err() - } - } - } - return nil - } - - for event := range events { - if event.Type == notif.FinalPeer { - err := sendAddrs(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - pi, err := dht.FindPeer(ctx, peer.ID(p)) - if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return - } - - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.FinalPeer, - Responses: []*pstore.PeerInfo{&pi}, - }) - }() - - return outChan, nil + return pi, nil } -func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) { +func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) { settings, err := caopts.DhtFindProvidersOptions(opts...) if err != nil { return nil, err } - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT - } - rp, err := api.ResolvePath(ctx, p) if err != nil { return nil, err } - c := rp.Cid() - numProviders := settings.NumProviders if numProviders < 1 { return nil, fmt.Errorf("number of providers must be greater than 0") } - outChan := make(chan peer.ID) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - pchan := dht.FindProvidersAsync(ctx, c, numProviders) - go func() { - defer close(outChan) - - sendProviders := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - select { - case outChan <- response.ID: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - } - - for event := range events { - if event.Type == notif.Provider { - err := sendProviders(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - for p := range pchan { - np := p - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.Provider, - Responses: []*pstore.PeerInfo{&np}, - }) - } - }() - - return outChan, nil + pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders) + return pchan, nil } func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error { @@ -158,10 +62,6 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return errors.New("cannot provide in offline mode") } - if len(api.node.PeerHost.Network().Conns()) == 0 { - return errors.New("cannot provide, no connected peers") - } - rp, err := api.ResolvePath(ctx, path) if err != nil { return err @@ -178,26 +78,8 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return fmt.Errorf("block %s not found locally, cannot provide", c) } - //TODO: either remove or use - //outChan := make(chan interface{}) - - //events := make(chan *notif.QueryEvent) - //ctx = notif.RegisterForQueryEvents(ctx, events) - - /*go func() { - defer close(outChan) - for range events { - select { - case <-ctx.Done(): - return - default: - } - } - }()*/ - - //defer close(events) if settings.Recursive { - err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c}) + err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []*cid.Cid{c}) } else { err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c}) } @@ -218,11 +100,13 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er return nil } -func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error { +func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { provided := cid.NewSet() for _, c := range cids { kset := cid.NewSet() + dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) if err != nil { return err diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index b8f5f9e054f..2d24dda2e84 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -6,10 +6,10 @@ import ( "io/ioutil" "testing" - coreapi "github.com/ipfs/go-ipfs/core/coreapi" + "github.com/ipfs/go-ipfs/core/coreapi/interface" - blocks "gx/ipfs/QmR54CzE4UcdFAZDehj6HFyy3eSHhVsJUpjfnhCmscuStS/go-block-format" - peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format" ) func TestDhtFindPeer(t *testing.T) { @@ -19,26 +19,22 @@ func TestDhtFindPeer(t *testing.T) { t.Fatal(err) } - out, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity)) + pi, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity)) if err != nil { t.Fatal(err) } - addr := <-out - - if addr.String() != "/ip4/127.0.0.1/tcp/4001" { - t.Errorf("got unexpected address from FindPeer: %s", addr.String()) + if pi.Addrs[0].String() != "/ip4/127.0.0.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String()) } - out, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity)) + pi, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity)) if err != nil { t.Fatal(err) } - addr = <-out - - if addr.String() != "/ip4/127.0.2.1/tcp/4001" { - t.Errorf("got unexpected address from FindPeer: %s", addr.String()) + if pi.Addrs[0].String() != "/ip4/127.0.2.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String()) } } @@ -61,8 +57,8 @@ func TestDhtFindProviders(t *testing.T) { provider := <-out - if provider.String() != nds[0].Identity.String() { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } } @@ -81,7 +77,7 @@ func TestDhtProvide(t *testing.T) { b := blocks.NewBlock(data) nds[0].Blockstore.Put(b) - p := coreapi.ParseCid(b.Cid()) + p := iface.IpfsPath(b.Cid()) out, err := apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) if err != nil { @@ -90,8 +86,8 @@ func TestDhtProvide(t *testing.T) { provider := <-out - if provider.String() != "" { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != "" { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } err = apis[0].Dht().Provide(ctx, p) @@ -106,7 +102,7 @@ func TestDhtProvide(t *testing.T) { provider = <-out - if provider.String() != nds[0].Identity.String() { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } } diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index 1d23ece1f0d..01b7d7367ac 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -3,21 +3,21 @@ package iface import ( "context" - options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" ) // DhtAPI specifies the interface to the DHT type DhtAPI interface { // FindPeer queries the DHT for all of the multiaddresses associated with a // Peer ID - FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error) + FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error) // FindProviders finds peers in the DHT who can provide a specific value // given a key. - FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan peer.ID, error) //TODO: is path the right choice here? + FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) // WithNumProviders is an option for FindProviders which specifies the // number of peers to look for. Default is 20 From 77ceb7546b3465c9e5f024eff9f0be3c507f8ba2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 19 Jul 2018 13:27:06 +0200 Subject: [PATCH 07/13] coreapi: dht: refactor options after rebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/coreapi.go | 2 +- core/coreapi/dht.go | 13 +++++++------ core/coreapi/dht_test.go | 7 ++++--- core/coreapi/interface/dht.go | 8 -------- core/coreapi/interface/options/dht.go | 12 +++++++++--- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 7029866088d..bb7afd61a51 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -65,5 +65,5 @@ func (api *CoreAPI) Pin() coreiface.PinAPI { // Dht returns the DhtAPI interface implementation backed by the go-ipfs node func (api *CoreAPI) Dht() coreiface.DhtAPI { - return &DhtAPI{api, nil} + return (*DhtAPI)(api) } diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 4f371111673..107dc8ed5fe 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -18,10 +18,7 @@ import ( blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore" ) -type DhtAPI struct { - *CoreAPI - *caopts.DhtOptions -} +type DhtAPI CoreAPI func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p)) @@ -38,7 +35,7 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... return nil, err } - rp, err := api.ResolvePath(ctx, p) + rp, err := api.core().ResolvePath(ctx, p) if err != nil { return nil, err } @@ -62,7 +59,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return errors.New("cannot provide in offline mode") } - rp, err := api.ResolvePath(ctx, path) + rp, err := api.core().ResolvePath(ctx, path) if err != nil { return err } @@ -127,3 +124,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Bl return nil } + +func (api *DhtAPI) core() coreiface.CoreAPI { + return (*CoreAPI)(api) +} diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index 2d24dda2e84..7dcca66b6d4 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ipfs/go-ipfs/core/coreapi/interface" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format" @@ -50,7 +51,7 @@ func TestDhtFindProviders(t *testing.T) { t.Fatal(err) } - out, err := apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) if err != nil { t.Fatal(err) } @@ -79,7 +80,7 @@ func TestDhtProvide(t *testing.T) { nds[0].Blockstore.Put(b) p := iface.IpfsPath(b.Cid()) - out, err := apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) if err != nil { t.Fatal(err) } @@ -95,7 +96,7 @@ func TestDhtProvide(t *testing.T) { t.Fatal(err) } - out, err = apis[2].Dht().FindProviders(ctx, p, apis[2].Dht().WithNumProviders(1)) + out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) if err != nil { t.Fatal(err) } diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index 01b7d7367ac..f9a08df342e 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -19,14 +19,6 @@ type DhtAPI interface { // given a key. FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) - // WithNumProviders is an option for FindProviders which specifies the - // number of peers to look for. Default is 20 - WithNumProviders(numProviders int) options.DhtFindProvidersOption - // Provide announces to the network that you are providing given values Provide(context.Context, Path, ...options.DhtProvideOption) error - - // WithRecursive is an option for Provide which specifies whether to provide - // the given path recursively - WithRecursive(recursive bool) options.DhtProvideOption } diff --git a/core/coreapi/interface/options/dht.go b/core/coreapi/interface/options/dht.go index 3867e32c075..f989fa5e785 100644 --- a/core/coreapi/interface/options/dht.go +++ b/core/coreapi/interface/options/dht.go @@ -39,16 +39,22 @@ func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersS return options, nil } -type DhtOptions struct{} +type dhtOpts struct{} -func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption { +var Dht dhtOpts + +// WithRecursive is an option for Dht.Provide which specifies whether to provide +// the given path recursively +func (dhtOpts) WithRecursive(recursive bool) DhtProvideOption { return func(settings *DhtProvideSettings) error { settings.Recursive = recursive return nil } } -func (api *DhtOptions) WithNumProviders(numProviders int) DhtFindProvidersOption { +// WithNumProviders is an option for Dht.FindProviders which specifies the +// number of peers to look for. Default is 20 +func (dhtOpts) WithNumProviders(numProviders int) DhtFindProvidersOption { return func(settings *DhtFindProvidersSettings) error { settings.NumProviders = numProviders return nil From 9fe843f1560c1f9a9a15b6027d18d5e1d719a62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Jul 2018 15:09:26 +0200 Subject: [PATCH 08/13] coreapi dht: add a note on name change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/dht.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index f9a08df342e..cd704c3e373 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -10,6 +10,8 @@ import ( ) // DhtAPI specifies the interface to the DHT +// Note: This API will likely get renamed in near future, see +// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context. type DhtAPI interface { // FindPeer queries the DHT for all of the multiaddresses associated with a // Peer ID From e1bdf6cd280bcbe90b08b71500aa86e0a29a2217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Aug 2018 13:24:33 +0200 Subject: [PATCH 09/13] move streaming set to thirdparty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht.go | 20 +++++--------- core/coreapi/interface/dht.go | 2 +- exchange/reprovide/providers.go | 41 +++++++---------------------- thirdparty/streaming-cid-set/set.go | 38 ++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 45 deletions(-) create mode 100644 thirdparty/streaming-cid-set/set.go diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 107dc8ed5fe..6217f925cf4 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -100,26 +100,20 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { provided := cid.NewSet() for _, c := range cids { - kset := cid.NewSet() - dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) + err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visit) if err != nil { return err } + } - for _, k := range kset.Keys() { - if provided.Has(k) { - continue - } - - err = r.Provide(ctx, k, true) - if err != nil { - return err - } - provided.Add(k) + for _, k := range provided.Keys() { + err := r.Provide(ctx, k, true) + if err != nil { + return err } + provided.Add(k) } return nil diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index cd704c3e373..7b8119e4494 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -10,7 +10,7 @@ import ( ) // DhtAPI specifies the interface to the DHT -// Note: This API will likely get renamed in near future, see +// Note: This API will likely get deprecated in near future, see // https://github.com/ipfs/interface-ipfs-core/issues/249 for more context. type DhtAPI interface { // FindPeer queries the DHT for all of the multiaddresses associated with a diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index df688a22907..6b6f5e69387 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -4,6 +4,7 @@ import ( "context" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" merkledag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format" @@ -29,7 +30,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) outCh := make(chan *cid.Cid) go func() { defer close(outCh) - for c := range set.new { + for c := range set.New { select { case <-ctx.Done(): return @@ -43,21 +44,23 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) } } -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingSet, error) { - set := newStreamingSet() +func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingset.StreamingSet, error) { + set := streamingset.NewStreamingSet() go func() { - defer close(set.new) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) for _, key := range pinning.DirectKeys() { - set.add(key) + set.Visitor(ctx)(key) } for _, key := range pinning.RecursiveKeys() { - set.add(key) + set.Visitor(ctx)(key) if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add) + err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx)) if err != nil { log.Errorf("reprovide indirect pins: %s", err) return @@ -68,27 +71,3 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo return set, nil } - -type streamingSet struct { - set *cid.Set - new chan *cid.Cid -} - -// NewSet initializes and returns a new Set. -func newStreamingSet() *streamingSet { - return &streamingSet{ - set: cid.NewSet(), - new: make(chan *cid.Cid), - } -} - -// add adds a Cid to the set only if it is -// not in it already. -func (s *streamingSet) add(c *cid.Cid) bool { - if s.set.Visit(c) { - s.new <- c - return true - } - - return false -} diff --git a/thirdparty/streaming-cid-set/set.go b/thirdparty/streaming-cid-set/set.go new file mode 100644 index 00000000000..05b03fe308e --- /dev/null +++ b/thirdparty/streaming-cid-set/set.go @@ -0,0 +1,38 @@ +package streamingset + +import ( + "context" + + cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" +) + +// StreamingSet is an extension of cid.Set which allows to implement back-pressure +// for the Visit function +type StreamingSet struct { + Set *cid.Set + New chan *cid.Cid +} + +// NewStreamingSet initializes and returns new Set. +func NewStreamingSet() *StreamingSet { + return &StreamingSet{ + Set: cid.NewSet(), + New: make(chan *cid.Cid), + } +} + +// Visitor creates new visitor which adds a Cids to the set and emits them to +// the set.New channel +func (s *StreamingSet) Visitor(ctx context.Context) func(c *cid.Cid) bool { + return func(c *cid.Cid) bool { + if s.Set.Visit(c) { + select { + case s.New <- c: + case <-ctx.Done(): + } + return true + } + + return false + } +} From 4b252a200f2cce27c033e1480e44d7247792ee0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Aug 2018 13:40:32 +0200 Subject: [PATCH 10/13] coreapi: dht: use shared set in provideKeysRec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 6217f925cf4..7441e70302f 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -7,6 +7,7 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline" @@ -98,25 +99,33 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { - provided := cid.NewSet() - for _, c := range cids { - dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + provided := streamingset.NewStreamingSet() - err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visit) - if err != nil { - return err - } - } + errCh := make(chan error) + go func() { + for _, c := range cids { + dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - for _, k := range provided.Keys() { - err := r.Provide(ctx, k, true) - if err != nil { + err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx)) + if err != nil { + errCh <- err + } + } + }() + + for { + select { + case k := <-provided.New: + err := r.Provide(ctx, k, true) + if err != nil { + return err + } + case err := <-errCh: return err + case <-ctx.Done(): + return ctx.Err() } - provided.Add(k) } - - return nil } func (api *DhtAPI) core() coreiface.CoreAPI { From db646d044d4c9636f6eb15390a342253196491c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 Aug 2018 14:38:09 +0200 Subject: [PATCH 11/13] coreapi: dht: make tests less falky MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index 7dcca66b6d4..e7555db15d6 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -15,7 +15,7 @@ import ( func TestDhtFindPeer(t *testing.T) { ctx := context.Background() - nds, apis, err := makeAPISwarm(ctx, true, 3) + nds, apis, err := makeAPISwarm(ctx, true, 5) if err != nil { t.Fatal(err) } @@ -41,7 +41,7 @@ func TestDhtFindPeer(t *testing.T) { func TestDhtFindProviders(t *testing.T) { ctx := context.Background() - nds, apis, err := makeAPISwarm(ctx, true, 3) + nds, apis, err := makeAPISwarm(ctx, true, 5) if err != nil { t.Fatal(err) } @@ -65,7 +65,7 @@ func TestDhtFindProviders(t *testing.T) { func TestDhtProvide(t *testing.T) { ctx := context.Background() - nds, apis, err := makeAPISwarm(ctx, true, 3) + nds, apis, err := makeAPISwarm(ctx, true, 5) if err != nil { t.Fatal(err) } From 7139b8366666492090cb2efb66aab730fc1f5593 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 21 Aug 2018 19:24:08 -0700 Subject: [PATCH 12/13] switch to StreamingSet in cidutil License: MIT Signed-off-by: Steven Allen --- core/coreapi/dht.go | 4 +-- exchange/reprovide/providers.go | 6 ++--- thirdparty/streaming-cid-set/set.go | 38 ----------------------------- 3 files changed, 5 insertions(+), 43 deletions(-) delete mode 100644 thirdparty/streaming-cid-set/set.go diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 7441e70302f..7c1b1327ef8 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -7,10 +7,10 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline" + cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil" blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing" @@ -99,7 +99,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { - provided := streamingset.NewStreamingSet() + provided := cidutil.NewStreamingSet() errCh := make(chan error) go func() { diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 6b6f5e69387..c3ff84b86b4 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -4,9 +4,9 @@ import ( "context" pin "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" merkledag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" + cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil" ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format" cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" blocks "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore" @@ -44,8 +44,8 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) } } -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingset.StreamingSet, error) { - set := streamingset.NewStreamingSet() +func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() go func() { ctx, cancel := context.WithCancel(ctx) diff --git a/thirdparty/streaming-cid-set/set.go b/thirdparty/streaming-cid-set/set.go deleted file mode 100644 index 05b03fe308e..00000000000 --- a/thirdparty/streaming-cid-set/set.go +++ /dev/null @@ -1,38 +0,0 @@ -package streamingset - -import ( - "context" - - cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" -) - -// StreamingSet is an extension of cid.Set which allows to implement back-pressure -// for the Visit function -type StreamingSet struct { - Set *cid.Set - New chan *cid.Cid -} - -// NewStreamingSet initializes and returns new Set. -func NewStreamingSet() *StreamingSet { - return &StreamingSet{ - Set: cid.NewSet(), - New: make(chan *cid.Cid), - } -} - -// Visitor creates new visitor which adds a Cids to the set and emits them to -// the set.New channel -func (s *StreamingSet) Visitor(ctx context.Context) func(c *cid.Cid) bool { - return func(c *cid.Cid) bool { - if s.Set.Visit(c) { - select { - case s.New <- c: - case <-ctx.Done(): - } - return true - } - - return false - } -} From 86f9eb73c34aeabc2e3e5ad7eb88f8e441624033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 28 Aug 2018 02:22:09 +0200 Subject: [PATCH 13/13] coreapi: dht: remove option prefix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dht.go | 3 +-- core/coreapi/dht_test.go | 6 +++--- core/coreapi/interface/options/dht.go | 8 ++++---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 7c1b1327ef8..0585fdb860b 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -103,9 +103,8 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Bl errCh := make(chan error) go func() { + dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) for _, c := range cids { - dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx)) if err != nil { errCh <- err diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index e7555db15d6..759835b567f 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -51,7 +51,7 @@ func TestDhtFindProviders(t *testing.T) { t.Fatal(err) } - out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) if err != nil { t.Fatal(err) } @@ -80,7 +80,7 @@ func TestDhtProvide(t *testing.T) { nds[0].Blockstore.Put(b) p := iface.IpfsPath(b.Cid()) - out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) if err != nil { t.Fatal(err) } @@ -96,7 +96,7 @@ func TestDhtProvide(t *testing.T) { t.Fatal(err) } - out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.WithNumProviders(1)) + out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) if err != nil { t.Fatal(err) } diff --git a/core/coreapi/interface/options/dht.go b/core/coreapi/interface/options/dht.go index f989fa5e785..e13e1602006 100644 --- a/core/coreapi/interface/options/dht.go +++ b/core/coreapi/interface/options/dht.go @@ -43,18 +43,18 @@ type dhtOpts struct{} var Dht dhtOpts -// WithRecursive is an option for Dht.Provide which specifies whether to provide +// Recursive is an option for Dht.Provide which specifies whether to provide // the given path recursively -func (dhtOpts) WithRecursive(recursive bool) DhtProvideOption { +func (dhtOpts) Recursive(recursive bool) DhtProvideOption { return func(settings *DhtProvideSettings) error { settings.Recursive = recursive return nil } } -// WithNumProviders is an option for Dht.FindProviders which specifies the +// NumProviders is an option for Dht.FindProviders which specifies the // number of peers to look for. Default is 20 -func (dhtOpts) WithNumProviders(numProviders int) DhtFindProvidersOption { +func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption { return func(settings *DhtFindProvidersSettings) error { settings.NumProviders = numProviders return nil