forked from holochain/holochain-proto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode.go
264 lines (228 loc) · 6.27 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright (C) 2013-2017, The MetaCurrency Project (Eric Harris-Braun, Arthur Brock, et. al.)
// Use of this source code is governed by GPLv3 found in the LICENSE file
//----------------------------------------------------------------------------------------
// node implements ipfs network transport for communicating between holochain nodes
package holochain
import (
"context"
// host "github.com/libp2p/go-libp2p-host"
"encoding/gob"
"errors"
"fmt"
ic "github.com/libp2p/go-libp2p-crypto"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
swarm "github.com/libp2p/go-libp2p-swarm"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
"io"
"time"
)
type ReceiverFn func(h *Holochain, m *Message) (response interface{}, err error)
type MsgType int8
const (
// common messages
ERROR_RESPONSE MsgType = iota
OK_RESPONSE
// DHT messages
PUT_REQUEST
GET_REQUEST
PUTMETA_REQUEST
GETMETA_REQUEST
// Source Messages
SRC_VALIDATE
)
// Message represents data that can be sent to node in the network
type Message struct {
Type MsgType
Time time.Time
From peer.ID
Body interface{}
}
// Node represents a node in the network
type Node struct {
HashAddr peer.ID
NetAddr ma.Multiaddr
Host *rhost.RoutedHost
}
const (
DHTProtocol = protocol.ID("/holochain-dht/0.0.0")
SourceProtocol = protocol.ID("/holochain-src/0.0.0")
)
type HolochainRouter struct {
dummy int
}
func (r *HolochainRouter) FindPeer(context.Context, peer.ID) (peer pstore.PeerInfo, err error) {
err = errors.New("routing not implemented")
return
}
// NewNode creates a new ipfs basichost node with given identity
func NewNode(listenAddr string, priv ic.PrivKey) (node *Node, err error) {
var n Node
n.NetAddr, err = ma.NewMultiaddr(listenAddr)
if err != nil {
return
}
ps := pstore.NewPeerstore()
pid, err := peer.IDFromPrivateKey(priv)
if err != nil {
return
}
n.HashAddr = pid
ps.AddPrivKey(pid, priv)
ps.AddPubKey(pid, priv.GetPublic())
ctx := context.Background()
// create a new swarm to be used by the service host
netw, err := swarm.NewNetwork(ctx, []ma.Multiaddr{n.NetAddr}, pid, ps, nil)
if err != nil {
return nil, err
}
var bh *bhost.BasicHost
bh, err = bhost.New(netw), nil
if err != nil {
return
}
hr := HolochainRouter{}
n.Host = rhost.Wrap(bh, &hr)
node = &n
return
}
// Encode codes a message to gob format
// @TODO generalize for other message encoding formats
func (m *Message) Encode() (data []byte, err error) {
data, err = ByteEncoder(m)
if err != nil {
return
}
return
}
// Decode converts a message from gob format
// @TODO generalize for other message encoding formats
func (m *Message) Decode(r io.Reader) (err error) {
dec := gob.NewDecoder(r)
err = dec.Decode(m)
return
}
// respondWith writes a message either error or otherwise, to the stream
func (node *Node) respondWith(s net.Stream, err error, body interface{}) {
var m *Message
if err != nil {
m = node.NewMessage(ERROR_RESPONSE, err.Error())
} else {
m = node.NewMessage(OK_RESPONSE, body)
}
data, err := m.Encode()
if err != nil {
panic(err) //TODO can't panic, gotta do something else!
}
_, err = s.Write(data)
if err != nil {
panic(err) //TODO can't panic, gotta do something else!
}
}
// StartProtocol initiates listening for a protocol on the node
func (node *Node) StartProtocol(h *Holochain, proto protocol.ID, receiver ReceiverFn) (err error) {
node.Host.SetStreamHandler(proto, func(s net.Stream) {
var m Message
err := m.Decode(s)
var response interface{}
if m.From == "" {
// @todo other sanity checks on From?
err = errors.New("message must have a source")
} else {
if err == nil {
response, err = receiver(h, &m)
}
}
node.respondWith(s, err, response)
})
return
}
// SrcReceiver handles messages on the Source protocol
func SrcReceiver(h *Holochain, m *Message) (response interface{}, err error) {
switch m.Type {
case SRC_VALIDATE:
switch t := m.Body.(type) {
case Hash:
//@TODO should we really be making this distinction!!!
// try to get the hash from the headers
response, err = h.chain.Get(t)
if err == ErrHashNotFound {
// if that fails get it from the entries
response, err = h.chain.GetEntry(t)
}
default:
err = errors.New("expected hash")
}
default:
err = fmt.Errorf("message type %d not in holochain-src protocol", int(m.Type))
}
return
}
// StartSrc initiates listening for Source protocol messages on the node
func (node *Node) StartSrc(h *Holochain) (err error) {
return node.StartProtocol(h, SourceProtocol, SrcReceiver)
}
// Close shuts down the node
func (node *Node) Close() error {
return node.Host.Close()
}
// Send builds a message and either delivers it locally or via node.Send
func (h *Holochain) Send(proto protocol.ID, to peer.ID, t MsgType, body interface{}, receiver ReceiverFn) (response interface{}, err error) {
message := h.node.NewMessage(t, body)
if err != nil {
return
}
// if we are sending to ourselves we should bypass the network mechanics and call
// the receiver directly
if to == h.node.HashAddr {
response, err = receiver(h, message)
} else {
var r Message
r, err = h.node.Send(proto, to, message)
if err != nil {
return
}
if r.Type == ERROR_RESPONSE {
err = fmt.Errorf("response error: %v", r.Body)
} else {
response = r.Body
}
}
return
}
// Send delivers a message to a node via the given protocol
func (node *Node) Send(proto protocol.ID, addr peer.ID, m *Message) (response Message, err error) {
s, err := node.Host.NewStream(context.Background(), addr, proto)
if err != nil {
return
}
defer s.Close()
// encode the message and send it
data, err := m.Encode()
if err != nil {
return
}
n, err := s.Write(data)
if err != nil {
return
}
if n != len(data) {
err = errors.New("unable to send all data")
}
// decode the response
err = response.Decode(s)
if err != nil {
return
}
return
}
// NewMessage creates a message from the node with a new current timestamp
func (node *Node) NewMessage(t MsgType, body interface{}) (msg *Message) {
m := Message{Type: t, Time: time.Now(), Body: body, From: node.HashAddr}
msg = &m
return
}