-
Notifications
You must be signed in to change notification settings - Fork 3
/
discovery.go
183 lines (157 loc) · 4.49 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package p2pnet
import (
"context"
"errors"
"fmt"
"path"
"time"
"github.com/libp2p/go-libp2p-kad-dht/dual"
libp2pdiscovery "github.com/libp2p/go-libp2p/core/discovery"
libp2phost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
libp2prouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
)
const (
tryAdvertiseTimeout = time.Second * 30
defaultAdvertiseTTL = time.Minute * 5
defaultMinPeers = 3 // TODO: make this configurable
defaultMaxPeers = 50 // TODO: make this configurable
)
type discovery struct {
ctx context.Context
dht *dual.DHT
h libp2phost.Host
rd *libp2prouting.RoutingDiscovery
advertiseCh chan struct{} // signals to advertise
namespacePrefix string // Prefix to append before all advertised namespaces
advertisedNamespaces func() []string
}
func (d *discovery) getAdvertisedNamespaces() []string {
if d.advertisedNamespaces == nil {
return []string{""}
}
return d.advertisedNamespaces()
}
func (d *discovery) start() error {
err := d.dht.Bootstrap(d.ctx)
if err != nil {
return fmt.Errorf("failed to bootstrap DHT: %w", err)
}
// wait to connect to bootstrap peers
time.Sleep(time.Second)
go d.advertiseLoop()
go d.discoverLoop()
log.Debug("discovery started!")
return nil
}
func (d *discovery) stop() error {
return d.dht.Close()
}
func (d *discovery) advertiseLoop() {
ttl := time.Duration(0) // don't block on first loop iteration
for {
select {
case <-d.ctx.Done():
return
case <-d.advertiseCh:
// we've been asked to publish advertisements immediately, presumably
// because a new namespace was added
case <-time.After(ttl):
// publish advertisements on regular interval
}
// The DHT clears provider records (ie. who is advertising what content)
// every 24 hours. So, if we don't advertise a namespace for 24 hours,
// then we are no longer present in the DHT under that namespace.
ttl = d.advertise(d.getAdvertisedNamespaces())
}
}
// advertise advertises the passed set of namespaces in the DHT.
func (d *discovery) advertise(namespaces []string) time.Duration {
err := d.dht.Bootstrap(d.ctx)
if err != nil {
log.Warnf("failed to bootstrap DHT: %s", err)
return tryAdvertiseTimeout
}
for _, provides := range namespaces {
_, err = d.rd.Advertise(d.ctx, path.Join(d.namespacePrefix, provides))
if err != nil {
log.Debugf("did not advertise %q in the DHT: %s", provides, err)
return tryAdvertiseTimeout
}
log.Debugf("advertised %q in the DHT", provides)
}
return defaultAdvertiseTTL
}
func (d *discovery) discoverLoop() {
const discoverLoopDuration = time.Minute
timer := time.NewTicker(discoverLoopDuration)
for {
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
if len(d.h.Network().Peers()) >= defaultMinPeers {
continue
}
// if our peer count is low, try to find some peers
_, err := d.findPeers("", discoverLoopDuration)
if err != nil {
log.Errorf("failed to find peers: %s", err)
}
}
}
}
func (d *discovery) findPeers(provides string, timeout time.Duration) ([]peer.ID, error) {
peerCh, err := d.rd.FindPeers(
d.ctx,
path.Join(d.namespacePrefix, provides),
libp2pdiscovery.Limit(defaultMaxPeers),
)
if err != nil {
return nil, err
}
ourPeerID := d.h.ID()
var peerIDs []peer.ID
ctx, cancel := context.WithTimeout(d.ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return peerIDs, nil
}
return peerIDs, ctx.Err()
case peerAddr, ok := <-peerCh:
if !ok {
// channel was closed, no more peers to read
return peerIDs, nil
}
if peerAddr.ID == ourPeerID {
continue
}
log.Debugf("found new peer via DHT: %s", peerAddr)
peerIDs = append(peerIDs, peerAddr.ID)
// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) < defaultMaxPeers {
err = d.h.Connect(d.ctx, peerAddr)
if err != nil {
log.Debugf("failed to connect to discovered peer %s: %s", peerAddr.ID, err)
}
} else {
d.h.Peerstore().AddAddrs(peerAddr.ID, peerAddr.Addrs, peerstore.PermanentAddrTTL)
}
}
}
}
func (d *discovery) discover(
provides string,
searchTime time.Duration,
) ([]peer.ID, error) {
log.Debugf("attempting to find DHT peers that provide [%s] for %vs",
provides,
searchTime.Seconds(),
)
return d.findPeers(provides, searchTime)
}