-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeer_test.go
276 lines (201 loc) · 7.06 KB
/
peer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package rpc25519
import (
"context"
"fmt"
//"os"
//"strings"
"testing"
"time"
cv "github.com/glycerine/goconvey/convey"
)
var _ = fmt.Sprintf
type testJunk struct {
name string
cfg *Config
srv *Server
cli *Client
srvSync *syncer
cliSync *syncer
cliServiceName string
srvServiceName string
}
func (j *testJunk) cleanup() {
j.cli.Close()
j.srv.Close()
}
func newTestJunk(name string) (j *testJunk) {
j = &testJunk{
name: name,
cliServiceName: "cliSync_" + name,
srvServiceName: "srvSync_" + name,
}
cfg := NewConfig()
cfg.TCPonly_no_TLS = true
cfg.ServerAddr = "127.0.0.1:0"
srv := NewServer("srv_"+name, cfg)
serverAddr, err := srv.Start()
panicOn(err)
//defer srv.Close()
cfg.ClientDialToHostPort = serverAddr.String()
cli, err := NewClient("cli_"+name, cfg)
panicOn(err)
err = cli.Start()
panicOn(err)
//defer cli.Close()
srvSync := newSyncer(j.srvServiceName)
err = srv.PeerAPI.RegisterPeerServiceFunc(j.srvServiceName, srvSync.Start)
panicOn(err)
cliSync := newSyncer(j.cliServiceName)
err = cli.PeerAPI.RegisterPeerServiceFunc(j.cliServiceName, cliSync.Start)
panicOn(err)
j.cli = cli
j.srv = srv
j.cfg = cfg
j.cliSync = cliSync
j.srvSync = srvSync
return j
}
func Test405_user_can_close_Client_and_Server(t *testing.T) {
cv.Convey("user code calling Close on Client and Server should shut down. make sure j.cleanup works!", t, func() {
j := newTestJunk("close_cli_srv405")
j.cleanup()
<-j.cli.halt.Done.Chan
vv("cli has halted")
<-j.srv.halt.Done.Chan
vv("srv has halted")
// verify children
j = newTestJunk("close_cli_srv405")
j.cleanup()
})
}
func Test406_user_can_cancel_local_service_with_context(t *testing.T) {
cv.Convey("user code calling halt on a running local peer service should stop it", t, func() {
j := newTestJunk("local_ctx_cancel_test406")
defer j.cleanup()
ctx := context.Background()
lpb, err := j.cli.PeerAPI.StartLocalPeer(ctx, j.cliServiceName, nil)
panicOn(err)
lpb.Close()
<-j.cliSync.halt.Done.Chan
vv("good: cliSync has halted")
})
cv.Convey("user code calling cancel on a running local peer service should stop it", t, func() {
j := newTestJunk("local_ctx_cancel_test406")
defer j.cleanup()
ctx, canc := context.WithCancel(context.Background())
lpb, err := j.cli.PeerAPI.StartLocalPeer(ctx, j.cliServiceName, nil)
panicOn(err)
defer lpb.Close()
canc() // should be equivalent to lpb.Close()
<-j.cliSync.halt.Done.Chan
vv("good: cliSync has halted")
})
}
func Test407_single_circuits_can_cancel_and_propagate_to_remote(t *testing.T) {
cv.Convey("a circuit can close down, telling the remote but not closing the peer", t, func() {
j := newTestJunk("circuit_close_test407")
defer j.cleanup()
ctx := context.Background()
cli_lpb, err := j.cli.PeerAPI.StartLocalPeer(ctx, j.cliServiceName, nil)
panicOn(err)
defer cli_lpb.Close()
// later test:
//_, _, err := j.cli.PeerAPI.StartRemotePeer(ctx, j.srvServiceName, cli.RemoteAddr(), 0)
//panicOn(err)
server_lpb, err := j.srv.PeerAPI.StartLocalPeer(ctx, j.srvServiceName, nil)
panicOn(err)
defer server_lpb.Close()
// establish a circuit, then close it
cktname := "407ckt"
// optional first frag
frag0 := NewFragment()
frag0.FragSubject = "initial setup frag0"
ckt, ctxCkt, err := cli_lpb.NewCircuitToPeerURL(cktname, server_lpb.URL(), frag0, 0)
panicOn(err)
_ = ctxCkt
defer ckt.Close(nil)
// verify it is up
serverCkt := <-j.srvSync.gotIncomingCkt
vv("server got circuit '%v'", serverCkt.Name)
fragSrvInRead0 := <-j.srvSync.gotIncomingCktReadFrag
cv.So(fragSrvInRead0.FragSubject, cv.ShouldEqual, "initial setup frag0")
// verify server gets Reads
frag := NewFragment()
frag.FragSubject = "are we live?"
cli_lpb.SendOneWay(ckt, frag, 0)
vv("cli_lpb.SendOneWay() are we live back.")
fragSrvInRead1 := <-j.srvSync.gotIncomingCktReadFrag
vv("good: past 2nd read from server. fragSrvInRead1 = '%v'", fragSrvInRead1)
_ = fragSrvInRead1
if fragSrvInRead1.FragSubject != "are we live?" {
t.Fatalf("error: not expected subject 'are we live?' but: '%v'", fragSrvInRead1.FragSubject)
}
vv("good: past the are we live check.")
if ckt.IsClosed() {
t.Fatalf("error: client side circuit '%v' should NOT be closed.", ckt.Name)
}
if serverCkt.IsClosed() {
t.Fatalf("error: server circuit '%v' should NOT be closed.", serverCkt.Name)
}
vv("about to ckt.Close() from the client side ckt")
ckt.Close(nil)
vv("good: past the ckt.Close()")
if !ckt.IsClosed() {
t.Fatalf("error: circuit '%v' should be closed.", ckt.Name)
}
// verify that the server side also closed the circuit.
// IsClosed() wil race against the close ckt going to the server,
// so wait on serverCkt.Halt.Done.Chan first.
select {
case <-serverCkt.Halt.Done.Chan:
case <-time.After(2 * time.Second):
t.Fatalf("error: server circuit '%v' did not close after 2 sec", serverCkt.Name)
}
if !serverCkt.IsClosed() {
t.Fatalf("error: server circuit '%v' should be closed.", serverCkt.Name)
}
vv("good: past the serverCkt.IsClosed()")
// did the server peer code recognize the closed ckt?
// we have been acting as the client through lbp, so the
// client peer code has not been active. And that's super
// useful to keep this test deterministic and not having
// two competing reads on response channels.
<-j.srvSync.gotCktHaltReq.Chan
vv("good: server saw the ckt peer code stopped reading ckt.")
// sends and reads on the closed ckt should give errors / nil channel hangs
// server side is responding well when this test proxies the client.
vv(" ======== now proxy the server and have ckt to client... separate test?")
// Let's try it the other way: proxy the server and set up
// a circuit with the client
// optional first frag
frag2 := NewFragment()
frag2.FragSubject = "initial setup frag2"
cktname2 := "proxy_the_server407"
ckt2, ctxCkt2, err := server_lpb.NewCircuitToPeerURL(cktname2, cli_lpb.URL(), frag2, 0)
panicOn(err)
_ = ctxCkt2
defer ckt2.Close(nil)
cliCkt := <-j.cliSync.gotIncomingCkt
vv("client got circuit '%v'", cliCkt.Name)
if cliCkt.Name != "proxy_the_server407" {
t.Fatalf("error: cliCktName should be 'proxy_the_server407' but we got '%v'", cliCkt.Name)
}
vv("good: client got the named circuit we expected.")
fragCliInRead2 := <-j.cliSync.gotIncomingCktReadFrag
cv.So(fragCliInRead2.FragSubject, cv.ShouldEqual, "initial setup frag2")
// verify client gets Reads
frag3 := NewFragment()
frag3.FragSubject = "frag3 to the client"
server_lpb.SendOneWay(ckt2, frag3, 0)
fragCliInRead3 := <-j.cliSync.gotIncomingCktReadFrag
vv("good: past frag3 read in the client. fragCliInRead3 = '%v'", fragCliInRead3)
_ = fragCliInRead3
if fragCliInRead3.FragSubject != "frag3 to the client" {
t.Fatalf("error: not expected subject 'are we live?' but: '%v'", fragCliInRead3.FragSubject)
}
vv("good: past the client frag3 read check.")
// shut down the peer service on one side. does the other side
// stay up, but clean up all the circuits associated with that service?
//select {}
})
}