-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
243 lines (202 loc) · 5.59 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package main
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"sync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
dht "github.com/libp2p/go-libp2p-kad-dht"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
)
// Creates a LibP2P host with a random peer ID
func makeRoutedHost(listenPort int, bootstrapPeers []peer.AddrInfo) (host.Host, *dht.IpfsDHT, error) {
// Generate a key pair for this host
priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
if err != nil {
return nil, nil, err
}
ctx := context.Background()
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", listenPort)),
libp2p.Identity(priv),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
libp2p.NATPortMap(),
}
basicHost, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, nil, err
}
// Construct a datastore
// TOFIX not sure it is needed
dstore := dsync.MutexWrap(ds.NewMapDatastore())
// Make the DHT
ndht := dht.NewDHT(ctx, basicHost, dstore)
// Make the routed host
routedHost := rhost.Wrap(basicHost, ndht)
// Connect to bootstrap nodes
if len(bootstrapPeers) > 0 {
err = bootstrapConnect(ctx, routedHost, bootstrapPeers)
if err != nil {
return nil, nil, err
}
}
// Bootstrap the host
// TOFIX not sure it is needed
err = ndht.Bootstrap(ctx)
if err != nil {
return nil, nil, err
}
// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", routedHost.ID().Pretty()))
addrs := routedHost.Addrs()
log.Println("My ID is :", routedHost.ID().Pretty())
log.Println("I can be reached at:")
for _, addr := range addrs {
log.Println(addr.Encapsulate(hostAddr))
}
return routedHost, ndht, nil
}
func main() {
// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial")
bootstrapPeer := flag.String("b", "", "bootstrap peer")
flag.Parse()
if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}
// Make routed host
var bootstrapPeers []peer.AddrInfo
if *bootstrapPeer != "" {
bootstrapPeers = convertPeers([]string{*bootstrapPeer})
}
ha, ndht, err := makeRoutedHost(*listenF, bootstrapPeers)
if err != nil {
log.Fatal(err)
}
// Set a stream handler on host A.
ha.SetStreamHandler("/custom-node/0.1.0", func(s network.Stream) {
log.Println("Got a new stream!")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
if *target == "" {
log.Println("listening for connections")
select {} // hang forever
}
/**** This is where the listener code ends ****/
//
peerid, err := peer.IDB58Decode(*target)
if err != nil {
log.Fatalln(err)
}
ctx := context.Background()
// For debug only
log.Println("DEBUG looking for peer " + *target)
pi, err := ndht.FindPeer(ctx, peerid)
if err != nil {
log.Println("DEBUG Didn't found addresses for peer " + *target)
}
log.Println("DEBUG found addresses for peer " + *target + ":")
log.Println(pi.Addrs)
//
log.Println("opening stream")
s, err := ha.NewStream(ctx, peerid, "/custom-node/0.1.0")
if err != nil {
log.Fatalln(err)
}
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Fatalln(err)
}
out, err := ioutil.ReadAll(s)
if err != nil {
log.Fatalln(err)
}
log.Printf("read reply: %q\n", out)
}
// doEcho reads a line of data from a stream and writes it back
func doEcho(s network.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}
log.Printf("read: %s\n", str)
_, err = s.Write([]byte(str))
return err
}
func convertPeers(peers []string) []peer.AddrInfo {
pinfos := make([]peer.AddrInfo, len(peers))
for i, addr := range peers {
maddr := ma.StringCast(addr)
p, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Fatalln(err)
}
pinfos[i] = *p
}
return pinfos
}
// This code is borrowed from the go-ipfs bootstrap process
func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo) error {
if len(peers) < 1 {
return errors.New("not enough bootstrap peers")
}
errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {
// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.
wg.Add(1)
go func(p peer.AddrInfo) {
defer wg.Done()
defer log.Println(ctx, "bootstrapDial", ph.ID(), p.ID)
log.Printf("%s bootstrapping to %s", ph.ID(), p.ID)
ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Println(ctx, "bootstrapDialFailed", p.ID)
log.Printf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Println(ctx, "bootstrapDialSuccess", p.ID)
log.Printf("bootstrapped with %v", p.ID)
}(p)
}
wg.Wait()
// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
}