-
Notifications
You must be signed in to change notification settings - Fork 236
/
Copy pathdual.go
392 lines (342 loc) · 11 KB
/
dual.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
// Package dual provides an implementation of a split or "dual" dht, where two parallel instances
// are maintained for the global internet and the local LAN respectively.
package dual
import (
"context"
"fmt"
"sync"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/ipfs/go-cid"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
helper "github.com/libp2p/go-libp2p-routing-helpers"
ci "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/hashicorp/go-multierror"
)
const tracer = tracing.Tracer("go-libp2p-kad-dht/dual")
const dualName = "Dual"
// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
WAN *dht.IpfsDHT
LAN *dht.IpfsDHT
}
// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
const LanExtension protocol.ID = "/lan"
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*DHT)(nil)
_ routing.Routing = (*DHT)(nil)
_ routing.PeerRouting = (*DHT)(nil)
_ routing.PubKeyFetcher = (*DHT)(nil)
_ routing.ValueStore = (*DHT)(nil)
)
var (
maxPrefixCountPerCpl = 2
maxPrefixCount = 3
)
type config struct {
wan, lan []dht.Option
}
func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("dual dht option %d failed: %w", i, err)
}
}
return nil
}
// Option is an option used to configure the Dual DHT.
type Option func(*config) error
// WanDHTOption applies the given DHT options to the WAN DHT.
func WanDHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.wan = append(c.wan, opts...)
return nil
}
}
// LanDHTOption applies the given DHT options to the LAN DHT.
func LanDHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.lan = append(c.lan, opts...)
return nil
}
}
// DHTOption applies the given DHT options to both the WAN and the LAN DHTs.
func DHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.lan = append(c.lan, opts...)
c.wan = append(c.wan, opts...)
return nil
}
}
// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
// Note: query or routing table functional options provided as arguments to this function
// will be overriden by this constructor.
func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) {
var cfg config
err := cfg.apply(
WanDHTOption(
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
// filter out all private addresses
dht.AddressFilter(func(addrs []ma.Multiaddr) []ma.Multiaddr { return ma.FilterAddrs(addrs, manet.IsPublicAddr) }),
),
)
if err != nil {
return nil, err
}
err = cfg.apply(
LanDHTOption(
dht.ProtocolExtension(LanExtension),
dht.QueryFilter(dht.PrivateQueryFilter),
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
// filter out localhost IP addresses
dht.AddressFilter(func(addrs []ma.Multiaddr) []ma.Multiaddr {
return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) })
}),
),
)
if err != nil {
return nil, err
}
err = cfg.apply(options...)
if err != nil {
return nil, err
}
wan, err := dht.New(ctx, h, cfg.wan...)
if err != nil {
return nil, err
}
// Unless overridden by user supplied options, the LAN DHT should default
// to 'AutoServer' mode.
if wan.Mode() != dht.ModeClient {
cfg.lan = append(cfg.lan, dht.Mode(dht.ModeServer))
}
lan, err := dht.New(ctx, h, cfg.lan...)
if err != nil {
return nil, err
}
impl := DHT{wan, lan}
return &impl, nil
}
// Close closes the DHT context.
func (dht *DHT) Close() error {
return combineErrors(dht.WAN.Close(), dht.LAN.Close())
}
// WANActive returns true when the WAN DHT is active (has peers).
func (dht *DHT) WANActive() bool {
return dht.WAN.RoutingTable().Size() > 0
}
// Provide adds the given cid to the content routing system.
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) (err error) {
ctx, end := tracer.Provide(dualName, ctx, key, announce)
defer func() { end(err) }()
if dht.WANActive() {
return dht.WAN.Provide(ctx, key, announce)
}
return dht.LAN.Provide(ctx, key, announce)
}
// GetRoutingTableDiversityStats fetches the Routing Table Diversity Stats.
func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
if dht.WANActive() {
return dht.WAN.GetRoutingTableDiversityStats()
}
return nil
}
// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) {
ctx, end := tracer.FindProvidersAsync(dualName, ctx, key, count)
defer func() { ch = end(ch, nil) }()
reqCtx, cancel := context.WithCancel(ctx)
outCh := make(chan peer.AddrInfo)
// Register for and merge query events if we care about them.
subCtx := reqCtx
var evtCh <-chan *routing.QueryEvent
if routing.SubscribesToQueryEvents(ctx) {
subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx)
}
subCtx, span := internal.StartSpan(subCtx, "Dual.worker")
wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count)
lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count)
zeroCount := (count == 0)
go func() {
defer span.End()
defer cancel()
defer close(outCh)
found := make(map[peer.ID]struct{}, count)
var pi peer.AddrInfo
var qEv *routing.QueryEvent
for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) {
var ok bool
select {
case qEv, ok = <-evtCh:
if !ok {
evtCh = nil
} else if qEv != nil && qEv.Type != routing.QueryError {
routing.PublishQueryEvent(reqCtx, qEv)
}
continue
case pi, ok = <-wanCh:
if !ok {
span.AddEvent("wan finished")
wanCh = nil
continue
}
case pi, ok = <-lanCh:
if !ok {
span.AddEvent("lan finished")
lanCh = nil
continue
}
}
// already found
if _, ok = found[pi.ID]; ok {
continue
}
select {
case outCh <- pi:
found[pi.ID] = struct{}{}
count--
case <-ctx.Done():
return
}
}
if qEv != nil && qEv.Type == routing.QueryError && len(found) == 0 {
routing.PublishQueryEvent(reqCtx, qEv)
}
}()
return outCh
}
// FindPeer searches for a peer with given ID
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (pi peer.AddrInfo, err error) {
ctx, end := tracer.FindPeer(dualName, ctx, pid)
defer func() { end(pi, err) }()
var wg sync.WaitGroup
wg.Add(2)
var wanInfo, lanInfo peer.AddrInfo
var wanErr, lanErr error
go func() {
defer wg.Done()
wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
}()
go func() {
defer wg.Done()
lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
}()
wg.Wait()
// Combine addresses. Try to avoid doing unnecessary work while we're at
// it. Note: We're ignoring the errors for now as many of our DHT
// commands can return both a result and an error.
ai := peer.AddrInfo{ID: pid}
if len(wanInfo.Addrs) == 0 {
ai.Addrs = lanInfo.Addrs
} else if len(lanInfo.Addrs) == 0 {
ai.Addrs = wanInfo.Addrs
} else {
// combine addresses
deduped := make(map[string]ma.Multiaddr, len(wanInfo.Addrs)+len(lanInfo.Addrs))
for _, addr := range wanInfo.Addrs {
deduped[string(addr.Bytes())] = addr
}
for _, addr := range lanInfo.Addrs {
deduped[string(addr.Bytes())] = addr
}
ai.Addrs = make([]ma.Multiaddr, 0, len(deduped))
for _, addr := range deduped {
ai.Addrs = append(ai.Addrs, addr)
}
}
// If one of the commands succeeded, don't return an error.
if wanErr == nil || lanErr == nil {
return ai, nil
}
// Otherwise, return what we have _and_ return the error.
return ai, combineErrors(wanErr, lanErr)
}
func combineErrors(erra, errb error) error {
// if the errors are the same, just return one.
if erra == errb {
return erra
}
// If one of the errors is a kb lookup failure (no peers in routing
// table), return the other.
if erra == kb.ErrLookupFailure {
return errb
} else if errb == kb.ErrLookupFailure {
return erra
}
return multierror.Append(erra, errb).ErrorOrNil()
}
// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state and remain there.
func (dht *DHT) Bootstrap(ctx context.Context) (err error) {
ctx, end := tracer.Bootstrap(dualName, ctx)
defer func() { end(err) }()
erra := dht.WAN.Bootstrap(ctx)
errb := dht.LAN.Bootstrap(ctx)
return combineErrors(erra, errb)
}
// PutValue adds value corresponding to given Key.
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) (err error) {
ctx, end := tracer.PutValue(dualName, ctx, key, val, opts...)
defer func() { end(err) }()
if dht.WANActive() {
return dht.WAN.PutValue(ctx, key, val, opts...)
}
return dht.LAN.PutValue(ctx, key, val, opts...)
}
// GetValue searches for the value corresponding to given Key.
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) {
ctx, end := tracer.GetValue(dualName, ctx, key, opts...)
defer func() { end(result, err) }()
lanCtx, cancelLan := context.WithCancel(ctx)
defer cancelLan()
var (
lanVal []byte
lanErr error
lanWaiter sync.WaitGroup
)
lanWaiter.Add(1)
go func() {
defer lanWaiter.Done()
lanVal, lanErr = d.LAN.GetValue(lanCtx, key, opts...)
}()
wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...)
if wanErr == nil {
cancelLan()
}
lanWaiter.Wait()
if wanErr == nil {
return wanVal, nil
}
if lanErr == nil {
return lanVal, nil
}
return nil, combineErrors(wanErr, lanErr)
}
// SearchValue searches for better values from this value
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) {
ctx, end := tracer.SearchValue(dualName, ctx, key, opts...)
defer func() { ch, err = end(ch, err) }()
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.SearchValue(ctx, key, opts...)
}
// GetPublicKey returns the public key for the given peer.
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.GetPublicKey(ctx, pid)
}