-
Notifications
You must be signed in to change notification settings - Fork 0
/
0001-go-libp2p-kad-dht-implement-GetValuesAsync.patch
134 lines (122 loc) · 3.91 KB
/
0001-go-libp2p-kad-dht-implement-GetValuesAsync.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
From 39c20b22f2fc8f72857118df3b0419ccdfe8baa4 Mon Sep 17 00:00:00 2001
From: Mildred Ki'Lya <mildred-pub.git@mildred.fr>
Date: Wed, 22 Feb 2017 13:21:46 +0100
Subject: [PATCH] go-libp2p-kad-dht: implement GetValuesAsync
https://github.com/libp2p/go-libp2p-kad-dht/issues/47
---
.../go-libp2p-kad-dht/routing.go | 68 ++++++++++++++++++----
1 file changed, 56 insertions(+), 12 deletions(-)
diff --git a/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go b/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
index 4f2be3a..9f3e59e 100644
--- a/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
+++ b/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
@@ -143,8 +143,51 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
}
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]routing.RecvdVal, error) {
+ var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0)
+ var errChan chan error = make(chan error, 0)
+ go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, errChan)
+
var vals []routing.RecvdVal
+ for {
+ select {
+ case err := <-errChan:
+ if err == nil {
+ break
+ }
+ return nil, err
+ case res := <-resChan:
+ if res == nil {
+ break
+ }
+ vals = append(vals, *res)
+ }
+ }
+
+ return vals, nil
+}
+
+func (dht *IpfsDHT) GetValuesAsync(ctx context.Context, key string, nvals int) <-chan *routing.RecvdVal {
+ var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0)
+ var errChan chan error = make(chan error, 0)
+ go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, errChan)
+ go func() {
+ for err := range errChan {
+ log.Debugf("Query error: %s", err)
+ notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
+ Type: notif.QueryError,
+ Extra: err.Error(),
+ })
+ }
+ }()
+ return resChan
+}
+
+func (dht *IpfsDHT) getValuesAsyncRoutine(ctx context.Context, key string, nvals int, resChan chan<- *routing.RecvdVal, errChan chan<- error) {
var valslock sync.Mutex
+ var sentRes int
+
+ defer close(errChan)
+ defer close(resChan)
// If we have it local, dont bother doing an RPC!
lrec, err := dht.getLocal(key)
@@ -152,16 +195,18 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
// TODO: this is tricky, we dont always want to trust our own value
// what if the authoritative source updated it?
log.Debug("have it locally")
- vals = append(vals, routing.RecvdVal{
+ sentRes = sentRes + 1
+ resChan <- &routing.RecvdVal{
Val: lrec.GetValue(),
From: dht.self,
- })
+ }
- if nvals <= 1 {
- return vals, nil
+ if nvals == 0 || nvals == 1 {
+ return
}
} else if nvals == 0 {
- return nil, err
+ errChan <- err
+ return
}
// get closest peers in the routing table
@@ -169,7 +214,8 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
log.Debugf("peers in rt: %d %s", len(rtp), rtp)
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
- return nil, kb.ErrLookupFailure
+ errChan <- kb.ErrLookupFailure
+ return
}
// setup the Query
@@ -205,11 +251,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
Val: rec.GetValue(),
From: p,
}
+ resChan <- &rv
valslock.Lock()
- vals = append(vals, rv)
// If weve collected enough records, we're done
- if len(vals) >= nvals {
+ if sentRes >= nvals || nvals >= 0 {
res.success = true
}
valslock.Unlock()
@@ -226,14 +272,12 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
// run it!
_, err = query.Run(ctx, rtp)
- if len(vals) == 0 {
+ if sentRes == 0 {
if err != nil {
- return nil, err
+ errChan <- err
}
}
- return vals, nil
-
}
// Value provider layer of indirection.
--
2.9.3