-
Notifications
You must be signed in to change notification settings - Fork 2
/
disco_test.go
158 lines (136 loc) · 3.17 KB
/
disco_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package disco
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/forestgiant/disco/node"
)
var testMulticastAddress = "[ff12::9000]:30000"
func Test_register(t *testing.T) {
d := &Disco{}
n := &node.Node{}
r := make(chan *node.Node)
closeCh := make(chan struct{})
errCh := make(chan error)
go func() {
defer close(closeCh)
select {
case <-r:
// Make sure we have 1 member
if len(d.Members()) != 1 {
t.Errorf("TestDeregister: One node should be registered. Received: %b, Should be: %b \n",
len(d.Members()), 0)
}
// Now deregister and make sure we have 0 members
d.deregister(n)
if len(d.Members()) != 0 {
t.Errorf("TestDeregister: All nodes should be deregistered. Received: %b, Should be: %b \n",
len(d.Members()), 0)
}
case <-time.After(100 * time.Millisecond):
errCh <- errors.New("Test_register timed out")
}
}()
d.register(r, n)
// Block until closeCh is closed on a timed out happens
for {
select {
case <-closeCh:
return
case err := <-errCh:
t.Fatal(err)
}
}
}
func TestDiscover(t *testing.T) {
var tests = []struct {
n *node.Node
shouldErr bool
}{
{&node.Node{Payload: []byte("Discover")}, false},
{&node.Node{SendInterval: 2 * time.Second}, false},
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc() // stop disco
wg := &sync.WaitGroup{}
d := &Disco{}
discoveredChan, err := d.Discover(ctx, testMulticastAddress)
if err != nil {
t.Fatal(err)
}
go func() {
// Select will block until a result comes in
for {
select {
case rn := <-discoveredChan:
for _, test := range tests {
if rn.Equal(test.n) {
test.n.Stop() // stop the node from multicasting
wg.Done()
}
}
case <-ctx.Done():
return
}
}
}()
// Multicast nodes so they can be discovered
for _, test := range tests {
// Add to the WaitGroup for each test that should pass and add it to the nodes to verify
if !test.shouldErr {
wg.Add(1)
test.n.Multicast(ctx, testMulticastAddress)
} else {
test.n.Multicast(ctx, testMulticastAddress)
}
}
wg.Wait()
}
func TestDiscoverSameNode(t *testing.T) {
var tests = []struct {
n *node.Node
shouldErr bool
}{
{&node.Node{Payload: []byte("DiscoverSameNode")}, false},
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc() // stop disco
wg := &sync.WaitGroup{}
d := &Disco{}
discoveredChan, err := d.Discover(ctx, testMulticastAddress)
if err != nil {
t.Fatal(err)
}
go func() {
// Select will block until a result comes in
for {
select {
case rn := <-discoveredChan:
for _, test := range tests {
if rn.Equal(test.n) {
test.n.Stop() // stop the node from multicasting
wg.Done()
} else {
fmt.Println("not equal")
}
}
case <-ctx.Done():
return
}
}
}()
// Multicast nodes so they can be discovered
for _, test := range tests {
// Add to the WaitGroup for each test that should pass and add it to the nodes to verify
if !test.shouldErr {
wg.Add(1)
test.n.Multicast(ctx, testMulticastAddress)
} else {
test.n.Multicast(ctx, testMulticastAddress)
}
}
wg.Wait()
}