Skip to content

Commit

Permalink
Merge pull request ipfs#232 from libp2p/revert-225-tidy-up-bootstrapping
Browse files Browse the repository at this point in the history
Revert "Tidy up bootstrapping"
  • Loading branch information
Stebalien committed Jan 24, 2019
2 parents 66cc80c + 201eea5 commit 9e65f29
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 76 deletions.
10 changes: 0 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,6 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.IpfsRouting = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
)

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
Expand Down
166 changes: 104 additions & 62 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"

u "github.com/ipfs/go-ipfs-util"
goprocess "github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/goprocess/periodic"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
)

Expand Down Expand Up @@ -38,73 +39,87 @@ var DefaultBootstrapConfig = BootstrapConfig{
Timeout: time.Duration(10 * time.Second),
}

// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
if err != nil {
return err
}

// wait till ctx or dht.Context exits.
// we have to do it this way to satisfy the Routing interface (contexts)
go func() {
defer proc.Close()
select {
case <-ctx.Done():
case <-dht.Context().Done():
}
}()

return nil
}

// Runs cfg.Queries bootstrap queries every cfg.Period.
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
// Because this method is not synchronous, we have to duplicate sanity
// checks on the config so that callers aren't oblivious.
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
go func() {

proc := dht.Process().Go(func(p goprocess.Process) {
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
for {
err := dht.runBootstrap(ctx, cfg)
if err != nil {
log.Warningf("error bootstrapping: %s", err)
}
select {
case <-time.After(cfg.Period):
case <-ctx.Done():
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
case <-p.Closing():
return
}
}
}()
return nil
})

return proc, nil
}

// This is a synchronous bootstrap. cfg.Queries queries will run each with a
// timeout of cfg.Timeout. cfg.Period is not used.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
return dht.runBootstrap(ctx, cfg)
}

func newRandomPeerId() peer.ID {
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand.Read(id)
id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
return peer.ID(id)
}
if signal == nil {
return nil, fmt.Errorf("invalid signal: %v", signal)
}

// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return dht.FindPeer(ctx, target)
proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg))

return proc, nil
}

// Traverse the DHT toward a random ID.
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
id := newRandomPeerId()
p, err := dht.walk(ctx, id)
switch err {
case routing.ErrNotFound:
return nil
case nil:
// We found a peer from a randomly generated ID. This should be very
// unlikely.
log.Warningf("random walk toward %s actually found peer: %s", id, p)
return nil
default:
return err
func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
return func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?

ctx := dht.Context()
if err := dht.runBootstrap(ctx, cfg); err != nil {
log.Warning(err)
// A bootstrapping error is important to notice but not fatal.
}
}
}

Expand All @@ -117,24 +132,51 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
defer bslog("end")
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()

doQuery := func(n int, target string, f func(context.Context) error) error {
log.Debugf("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target)
var merr u.MultiErr

randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
id = u.Hash(id)
return peer.ID(id)
}

// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel()
return f(ctx)
}

// Do all but one of the bootstrap queries as random walks.
for i := 1; i < cfg.Queries; i++ {
err := doQuery(i, "random ID", dht.randomWalk)
if err != nil {
return err
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Warningf("%s", err)
merr = append(merr, err)
}
}

// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < cfg.Queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id)
}

// Find self to distribute peer info to our neighbors.
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), func(ctx context.Context) error {
_, err := dht.walk(ctx, dht.self)
return err
})
// Do this after bootstrapping.
log.Debugf("Bootstrapping query to self: %s", dht.self)
runQuery(ctx, dht.self)

if len(merr) > 0 {
return merr
}
return nil
}
18 changes: 16 additions & 2 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,23 @@ func TestPeriodicBootstrap(t *testing.T) {
}
}()

signals := []chan time.Time{}

var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 5

// kick off periodic bootstrappers with instrumented signals.
for _, dht := range dhts {
s := make(chan time.Time)
signals = append(signals, s)
proc, err := dht.BootstrapOnSignal(cfg, s)
if err != nil {
t.Fatal(err)
}
defer proc.Close()
}

t.Logf("dhts are not connected. %d", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
Expand All @@ -708,8 +721,9 @@ func TestPeriodicBootstrap(t *testing.T) {
}

t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.BootstrapOnce(ctx, cfg)
now := time.Now()
for _, signal := range signals {
go func(s chan time.Time) { s <- now }(signal)
}

// this is async, and we dont know when it's finished with one cycle, so keep checking
Expand Down
3 changes: 1 addition & 2 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
routing "github.com/libp2p/go-libp2p-routing"
notif "github.com/libp2p/go-libp2p-routing/notifications"
ropts "github.com/libp2p/go-libp2p-routing/options"
"github.com/pkg/errors"
)

// asyncQueryBuffer is the size of buffered channels in async queries. This
Expand Down Expand Up @@ -584,7 +583,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo

peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return pstore.PeerInfo{}, errors.WithStack(kb.ErrLookupFailure)
return pstore.PeerInfo{}, kb.ErrLookupFailure
}

// Sanity...
Expand Down

0 comments on commit 9e65f29

Please sign in to comment.