From 201eea5e0bb4bda1de620a064d73280e74e9207a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 24 Jan 2019 14:06:38 -0800 Subject: [PATCH] Revert "Tidy up bootstrapping" --- dht.go | 10 --- dht_bootstrap.go | 166 +++++++++++++++++++++++++++++------------------ dht_test.go | 18 ++++- routing.go | 3 +- 4 files changed, 121 insertions(+), 76 deletions(-) diff --git a/dht.go b/dht.go index 7a167156df2..d08c37655d7 100644 --- a/dht.go +++ b/dht.go @@ -64,16 +64,6 @@ type IpfsDHT struct { protocols []protocol.ID // DHT protocols } -// Assert that IPFS assumptions about interfaces aren't broken. These aren't a -// guarantee, but we can use them to aid refactoring. -var ( - _ routing.ContentRouting = (*IpfsDHT)(nil) - _ routing.IpfsRouting = (*IpfsDHT)(nil) - _ routing.PeerRouting = (*IpfsDHT)(nil) - _ routing.PubKeyFetcher = (*IpfsDHT)(nil) - _ routing.ValueStore = (*IpfsDHT)(nil) -) - // New creates a new DHT with the specified host and options. func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { var cfg opts.Options diff --git a/dht_bootstrap.go b/dht_bootstrap.go index a31d7bf09f6..a448e7f2b22 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -7,8 +7,9 @@ import ( "time" u "github.com/ipfs/go-ipfs-util" + goprocess "github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/goprocess/periodic" peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" routing "github.com/libp2p/go-libp2p-routing" ) @@ -38,73 +39,87 @@ var DefaultBootstrapConfig = BootstrapConfig{ Timeout: time.Duration(10 * time.Second), } -// A method in the IpfsRouting interface. It calls BootstrapWithConfig with -// the default bootstrap config. +// Bootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig) + proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig) + if err != nil { + return err + } + + // wait till ctx or dht.Context exits. + // we have to do it this way to satisfy the Routing interface (contexts) + go func() { + defer proc.Close() + select { + case <-ctx.Done(): + case <-dht.Context().Done(): + } + }() + + return nil } -// Runs cfg.Queries bootstrap queries every cfg.Period. -func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error { - // Because this method is not synchronous, we have to duplicate sanity - // checks on the config so that callers aren't oblivious. +// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// BootstrapWithConfig returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) { if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } - go func() { + + proc := dht.Process().Go(func(p goprocess.Process) { + <-p.Go(dht.bootstrapWorker(cfg)).Closed() for { - err := dht.runBootstrap(ctx, cfg) - if err != nil { - log.Warningf("error bootstrapping: %s", err) - } select { case <-time.After(cfg.Period): - case <-ctx.Done(): + <-p.Go(dht.bootstrapWorker(cfg)).Closed() + case <-p.Closing(): return } } - }() - return nil + }) + + return proc, nil } -// This is a synchronous bootstrap. cfg.Queries queries will run each with a -// timeout of cfg.Timeout. cfg.Period is not used. -func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error { +// SignalBootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// SignalBootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } - return dht.runBootstrap(ctx, cfg) -} -func newRandomPeerId() peer.ID { - id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs. - rand.Read(id) - id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it. - return peer.ID(id) -} + if signal == nil { + return nil, fmt.Errorf("invalid signal: %v", signal) + } -// Traverse the DHT toward the given ID. -func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) { - // TODO: Extract the query action (traversal logic?) inside FindPeer, - // don't actually call through the FindPeer machinery, which can return - // things out of the peer store etc. - return dht.FindPeer(ctx, target) + proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg)) + + return proc, nil } -// Traverse the DHT toward a random ID. -func (dht *IpfsDHT) randomWalk(ctx context.Context) error { - id := newRandomPeerId() - p, err := dht.walk(ctx, id) - switch err { - case routing.ErrNotFound: - return nil - case nil: - // We found a peer from a randomly generated ID. This should be very - // unlikely. - log.Warningf("random walk toward %s actually found peer: %s", id, p) - return nil - default: - return err +func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) { + return func(worker goprocess.Process) { + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, cfg); err != nil { + log.Warning(err) + // A bootstrapping error is important to notice but not fatal. + } } } @@ -117,24 +132,51 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error defer bslog("end") defer log.EventBegin(ctx, "dhtRunBootstrap").Done() - doQuery := func(n int, target string, f func(context.Context) error) error { - log.Debugf("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target) + var merr u.MultiErr + + randomID := func() peer.ID { + // 16 random bytes is not a valid peer id. it may be fine becuase + // the dht will rehash to its own keyspace anyway. + id := make([]byte, 16) + rand.Read(id) + id = u.Hash(id) + return peer.ID(id) + } + + // bootstrap sequentially, as results will compound + runQuery := func(ctx context.Context, id peer.ID) { ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) defer cancel() - return f(ctx) - } - // Do all but one of the bootstrap queries as random walks. - for i := 1; i < cfg.Queries; i++ { - err := doQuery(i, "random ID", dht.randomWalk) - if err != nil { - return err + p, err := dht.FindPeer(ctx, id) + if err == routing.ErrNotFound { + // this isn't an error. this is precisely what we expect. + } else if err != nil { + merr = append(merr, err) + } else { + // woah, actually found a peer with that ID? this shouldn't happen normally + // (as the ID we use is not a real ID). this is an odd error worth logging. + err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) + log.Warningf("%s", err) + merr = append(merr, err) } } + // these should be parallel normally. but can make them sequential for debugging. + // note that the core/bootstrap context deadline should be extended too for that. + for i := 0; i < cfg.Queries; i++ { + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) + runQuery(ctx, id) + } + // Find self to distribute peer info to our neighbors. - return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), func(ctx context.Context) error { - _, err := dht.walk(ctx, dht.self) - return err - }) + // Do this after bootstrapping. + log.Debugf("Bootstrapping query to self: %s", dht.self) + runQuery(ctx, dht.self) + + if len(merr) > 0 { + return merr + } + return nil } diff --git a/dht_test.go b/dht_test.go index 7899fbcece9..dde2f4eb952 100644 --- a/dht_test.go +++ b/dht_test.go @@ -679,10 +679,23 @@ func TestPeriodicBootstrap(t *testing.T) { } }() + signals := []chan time.Time{} + var cfg BootstrapConfig cfg = DefaultBootstrapConfig cfg.Queries = 5 + // kick off periodic bootstrappers with instrumented signals. + for _, dht := range dhts { + s := make(chan time.Time) + signals = append(signals, s) + proc, err := dht.BootstrapOnSignal(cfg, s) + if err != nil { + t.Fatal(err) + } + defer proc.Close() + } + t.Logf("dhts are not connected. %d", nDHTs) for _, dht := range dhts { rtlen := dht.routingTable.Size() @@ -708,8 +721,9 @@ func TestPeriodicBootstrap(t *testing.T) { } t.Logf("bootstrapping them so they find each other. %d", nDHTs) - for _, dht := range dhts { - go dht.BootstrapOnce(ctx, cfg) + now := time.Now() + for _, signal := range signals { + go func(s chan time.Time) { s <- now }(signal) } // this is async, and we dont know when it's finished with one cycle, so keep checking diff --git a/routing.go b/routing.go index 56d1a16b731..c777d2d5372 100644 --- a/routing.go +++ b/routing.go @@ -21,7 +21,6 @@ import ( routing "github.com/libp2p/go-libp2p-routing" notif "github.com/libp2p/go-libp2p-routing/notifications" ropts "github.com/libp2p/go-libp2p-routing/options" - "github.com/pkg/errors" ) // asyncQueryBuffer is the size of buffered channels in async queries. This @@ -584,7 +583,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) if len(peers) == 0 { - return pstore.PeerInfo{}, errors.WithStack(kb.ErrLookupFailure) + return pstore.PeerInfo{}, kb.ErrLookupFailure } // Sanity...