diff --git a/core/bootstrap/bootstrap.go b/core/bootstrap/bootstrap.go index 4edc5ac840e8..ab2be5789844 100644 --- a/core/bootstrap/bootstrap.go +++ b/core/bootstrap/bootstrap.go @@ -3,10 +3,10 @@ package bootstrap import ( "context" "errors" - "fmt" "io" "math/rand" "sync" + "sync/atomic" "time" logging "github.com/ipfs/go-log" @@ -50,6 +50,21 @@ type BootstrapConfig struct { // for the bootstrap process to use. This makes it possible for clients // to control the peers the process uses at any moment. BootstrapPeers func() []peer.AddrInfo + + // FIXME(BLOCKING): Review names, default values and doc. + // SavePeersPeriod governs the periodic interval at which the node will + // attempt to save connected nodes to use as temporary bootstrap peers. + SavePeersPeriod time.Duration + // SaveConnectedPeersRatio controls the number peers we're saving compared + // to the target MinPeerThreshold. For example, if MinPeerThreshold is 4, + // and we have a ratio of 5 we will save 20 connected peers. + // Note: one peer can have many addresses under its ID, so saving a peer + // might translate to more than one line in the config (following the above + // example that means TempBootstrapPeers may have more than 20 lines, but + // all those lines will be addresses of at most 20 peers). + SaveConnectedPeersRatio int + SaveTempPeersForBootstrap func(context.Context, []peer.AddrInfo) + LoadTempPeersForBootstrap func(context.Context) []peer.AddrInfo } // DefaultBootstrapConfig specifies default sane parameters for bootstrapping. @@ -57,6 +72,12 @@ var DefaultBootstrapConfig = BootstrapConfig{ MinPeerThreshold: 4, Period: 30 * time.Second, ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 + // FIXME(BLOKING): Review this number. We're making it ridiculously small + // only for testing purposes, but this is saving the peers to the config + // file every time so should not be run frequently. (Original proposal 24 + // hours.) + SavePeersPeriod: 10 * time.Second, + SaveConnectedPeersRatio: 2, } func BootstrapConfigWithPeers(pis []peer.AddrInfo) BootstrapConfig { @@ -90,6 +111,9 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf log.Debugf("%s bootstrap error: %s", id, err) } + // Exit the first call (triggered independently by `proc.Go`, not `Tick`) + // only after being done with the *single* Routing.Bootstrap call. Following + // periodic calls (`Tick`) will not block on this. <-doneWithRound } @@ -108,9 +132,88 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf doneWithRound <- struct{}{} close(doneWithRound) // it no longer blocks periodic + + startSavePeersAsTemporaryBootstrapProc(cfg, host, proc) + return proc, nil } +// Aside of the main bootstrap process we also run a secondary one that saves +// connected peers as a backup measure if we can't connect to the official +// bootstrap ones. These peers will serve as *temporary* bootstrap nodes. +func startSavePeersAsTemporaryBootstrapProc(cfg BootstrapConfig, host host.Host, bootstrapProc goprocess.Process) { + + savePeersFn := func(worker goprocess.Process) { + ctx := goprocessctx.OnClosingContext(worker) + + if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil { + log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err) + } + } + savePeersProc := periodicproc.Tick(cfg.SavePeersPeriod, savePeersFn) + // When the main bootstrap process ends also terminate the 'save connected + // peers' ones. Coupling the two seems the easiest way to handle this backup + // process without additional complexity. + go func() { + <-bootstrapProc.Closing() + savePeersProc.Close() + }() + // Run the first round now (after the first bootstrap process has finished) + // as the SavePeersPeriod can be much longer than bootstrap. + savePeersProc.Go(savePeersFn) +} + +func saveConnectedPeersAsTemporaryBootstrap(ctx context.Context, host host.Host, cfg BootstrapConfig) error { + allConnectedPeers := host.Network().Peers() + // Randomize the list of connected peers, we don't prioritize anyone. + // FIXME: Maybe use randomizeAddressList if we change from []peer.ID to + // []peer.AddrInfo earlier in the logic. + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(allConnectedPeers), + func(i, j int) { + allConnectedPeers[i], allConnectedPeers[j] = allConnectedPeers[j], allConnectedPeers[i] + }) + + saveNumber := cfg.SaveConnectedPeersRatio * cfg.MinPeerThreshold + savedPeers := make([]peer.AddrInfo, 0, saveNumber) + + // Save peers from the connected list that aren't bootstrap ones. + bootsrapPeers := cfg.BootstrapPeers() +OUTER: + for _, p := range allConnectedPeers { + for _, bootstrapPeer := range bootsrapPeers { + if p == bootstrapPeer.ID { + continue OUTER + } + } + savedPeers = append(savedPeers, + peer.AddrInfo{ID: p, Addrs: host.Network().Peerstore().Addrs(p)}) + if len(savedPeers) >= saveNumber { + break + } + } + + // If we didn't reach the target number use previously stored connected peers. + if len(savedPeers) < saveNumber { + oldSavedPeers := cfg.LoadTempPeersForBootstrap(ctx) + log.Debugf("missing %d peers to reach backup bootstrap target of %d, trying from previous list of %d saved peers", + saveNumber-len(savedPeers), saveNumber, len(oldSavedPeers)) + for _, p := range oldSavedPeers { + savedPeers = append(savedPeers, p) + if len(savedPeers) >= saveNumber { + break + } + } + } + + cfg.SaveTempPeersForBootstrap(ctx, savedPeers) + log.Debugf("saved %d connected peers (of %d target) as bootstrap backup in the config", len(savedPeers), saveNumber) + return nil +} + +// Connect to as many peers needed to reach the BootstrapConfig.MinPeerThreshold. +// Peers can be original bootstrap or temporary ones (drawn from a list of +// persisted previously connected peers). func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error { ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout) @@ -127,35 +230,58 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er id, len(connected), cfg.MinPeerThreshold) return nil } - numToDial := cfg.MinPeerThreshold - len(connected) + numToDial := cfg.MinPeerThreshold - len(connected) // numToDial > 0 - // filter out bootstrap nodes we are already connected to - var notConnected []peer.AddrInfo - for _, p := range peers { - if host.Network().Connectedness(p.ID) != network.Connected { - notConnected = append(notConnected, p) + if len(peers) > 0 { + numToDial -= int(peersConnect(ctx, host, peers, numToDial, true)) + if numToDial <= 0 { + return nil } } - // if connected to all bootstrap peer candidates, exit - if len(notConnected) < 1 { - log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial) - return ErrNotEnoughBootstrapPeers + log.Debugf("not enough bootstrap peers to fill the remaining target of %d connections, trying backup list", numToDial) + + tempBootstrapPeers := cfg.LoadTempPeersForBootstrap(ctx) + if len(tempBootstrapPeers) > 0 { + numToDial -= int(peersConnect(ctx, host, tempBootstrapPeers, numToDial, false)) + if numToDial <= 0 { + return nil + } } - // connect to a random susbset of bootstrap candidates - randSubset := randomSubsetOfPeers(notConnected, numToDial) + log.Debugf("tried both original bootstrap peers and temporary ones but still missing target of %d connections", numToDial) - log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset) - return bootstrapConnect(ctx, host, randSubset) + return ErrNotEnoughBootstrapPeers } -func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo) error { - if len(peers) < 1 { - return ErrNotEnoughBootstrapPeers - } +// Attempt to make `needed` connections from the `availablePeers` list. Mark +// peers as either `permanent` or temporary when adding them to the Peerstore. +// Return the number of connections completed. We eagerly over-connect in parallel, +// so we might connect to more than needed. +// (We spawn as many routines and attempt connections as the number of availablePeers, +// but this list comes from restricted sets of original or temporary bootstrap +// nodes which will keep it under a sane value.) +func peersConnect(ctx context.Context, ph host.Host, availablePeers []peer.AddrInfo, needed int, permanent bool) uint64 { + peers := randomizeAddressList(availablePeers) + + // Monitor the number of connections and stop if we reach the target. + var connected uint64 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + if int(atomic.LoadUint64(&connected)) >= needed { + cancel() + return + } + } + } + }() - errs := make(chan error, len(peers)) var wg sync.WaitGroup for _, p := range peers { @@ -164,45 +290,53 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo) // fail/abort due to an expiring context. // Also, performed asynchronously for dial speed. + if int(atomic.LoadUint64(&connected)) >= needed { + cancel() + break + } + wg.Add(1) go func(p peer.AddrInfo) { defer wg.Done() + + // Skip addresses belonging to a peer we're already connected to. + // (Not a guarantee but a best-effort policy.) + if ph.Network().Connectedness(p.ID) == network.Connected { + return + } log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID) - ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) if err := ph.Connect(ctx, p); err != nil { - log.Debugf("failed to bootstrap with %v: %s", p.ID, err) - errs <- err + if ctx.Err() != context.Canceled { + log.Debugf("failed to bootstrap with %v: %s", p.ID, err) + } return } + if permanent { + // We're connecting to an original bootstrap peer, mark it as + // a permanent address (Connect will register it as TempAddrTTL). + // FIXME(BLOCKING): From the code it seems this will overwrite the + // temporary TTL from Connect: need confirmation from libp2p folks. + // Registering it *after* the connect give less chances of registering + // many addresses we won't be using in case we already reached the + // target and the context has already been cancelled. (This applies + // only to the very restricted list of original bootstrap nodes so + // this issue is not critical.) + ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) + } + log.Infof("bootstrapped with %v", p.ID) + atomic.AddUint64(&connected, 1) }(p) } wg.Wait() - // our failure condition is when no connection attempt succeeded. - // So drain the errs channel, counting the results. - close(errs) - count := 0 - var err error - for err = range errs { - if err != nil { - count++ - } - } - if count == len(peers) { - return fmt.Errorf("failed to bootstrap. %s", err) - } - return nil + return connected } -func randomSubsetOfPeers(in []peer.AddrInfo, max int) []peer.AddrInfo { - if max > len(in) { - max = len(in) - } - - out := make([]peer.AddrInfo, max) - for i, val := range rand.Perm(len(in))[:max] { +func randomizeAddressList(in []peer.AddrInfo) []peer.AddrInfo { + out := make([]peer.AddrInfo, len(in)) + for i, val := range rand.Perm(len(in)) { out[i] = in[val] } return out diff --git a/core/bootstrap/bootstrap_test.go b/core/bootstrap/bootstrap_test.go index 23128c31f26b..437c38e8751d 100644 --- a/core/bootstrap/bootstrap_test.go +++ b/core/bootstrap/bootstrap_test.go @@ -7,9 +7,9 @@ import ( "github.com/libp2p/go-libp2p-core/test" ) -func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) { +func TestRandomizeAddressList(t *testing.T) { var ps []peer.AddrInfo - sizeofSlice := 100 + sizeofSlice := 10 for i := 0; i < sizeofSlice; i++ { pid, err := test.RandPeerID() if err != nil { @@ -18,7 +18,7 @@ func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) { ps = append(ps, peer.AddrInfo{ID: pid}) } - out := randomSubsetOfPeers(ps, 2*sizeofSlice) + out := randomizeAddressList(ps) if len(out) != len(ps) { t.Fail() } diff --git a/core/core.go b/core/core.go index 1a09b85e83f3..e7ff239f0ea7 100644 --- a/core/core.go +++ b/core/core.go @@ -11,12 +11,15 @@ package core import ( "context" + "encoding/json" "io" "github.com/ipfs/go-filestore" "github.com/ipfs/go-ipfs-pinner" bserv "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-ipfs/config" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-fetcher" "github.com/ipfs/go-graphsync" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -161,12 +164,33 @@ func (n *IpfsNode) Bootstrap(cfg bootstrap.BootstrapConfig) error { return ps } } + if cfg.SaveTempPeersForBootstrap == nil { + cfg.SaveTempPeersForBootstrap = func(ctx context.Context, peerList []peer.AddrInfo) { + err := n.saveTempBootstrapPeers(ctx, peerList) + if err != nil { + log.Warnf("saveTempBootstrapPeers failed: %s", err) + return + } + } + } + if cfg.LoadTempPeersForBootstrap == nil { + cfg.LoadTempPeersForBootstrap = func(ctx context.Context) []peer.AddrInfo { + peerList, err := n.loadTempBootstrapPeers(ctx) + if err != nil { + log.Warnf("loadTempBootstrapPeers failed: %s", err) + return nil + } + return peerList + } + } var err error n.Bootstrapper, err = bootstrap.Bootstrap(n.Identity, n.PeerHost, n.Routing, cfg) return err } +var TempBootstrapPeersKey = datastore.NewKey("/local/temp_bootstrap_peers") + func (n *IpfsNode) loadBootstrapPeers() ([]peer.AddrInfo, error) { cfg, err := n.Repo.Config() if err != nil { @@ -176,6 +200,31 @@ func (n *IpfsNode) loadBootstrapPeers() ([]peer.AddrInfo, error) { return cfg.BootstrapPeers() } +func (n *IpfsNode) saveTempBootstrapPeers(ctx context.Context, peerList []peer.AddrInfo) error { + ds := n.Repo.Datastore() + bytes, err := json.Marshal(config.BootstrapPeerStrings(peerList)) + if err != nil { + return err + } + + if err := ds.Put(ctx, TempBootstrapPeersKey, bytes); err != nil { + return err + } + return ds.Sync(ctx, TempBootstrapPeersKey) +} + +func (n *IpfsNode) loadTempBootstrapPeers(ctx context.Context) ([]peer.AddrInfo, error) { + ds := n.Repo.Datastore() + bytes, err := ds.Get(ctx, TempBootstrapPeersKey) + if err != nil { + return nil, err + } + + var addrs []string + json.Unmarshal(bytes, &addrs) + return config.ParseBootstrapPeers(addrs) +} + type ConstructPeerHostOpts struct { AddrsFactory p2pbhost.AddrsFactory DisableNatPortMap bool