-
Notifications
You must be signed in to change notification settings - Fork 45
/
main.go
314 lines (306 loc) · 10.6 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// Copyright 2017,2018 Lei Ni (nilei81@gmail.com).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
helloworld is an example program for dragonboat.
*/
package main
import (
"bufio"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/logger"
"github.com/lni/goutils/syncutil"
)
const (
exampleShardID uint64 = 128
)
var (
// initial nodes count is fixed to three, their addresses are also fixed
// these are the initial member nodes of the Raft cluster.
addresses = []string{
"localhost:63001",
"localhost:63002",
"localhost:63003",
}
errNotMembershipChange = errors.New("not a membership change request")
)
// makeMembershipChange makes membership change request.
func makeMembershipChange(nh *dragonboat.NodeHost,
cmd string, addr string, replicaID uint64) {
var rs *dragonboat.RequestState
var err error
if cmd == "add" {
// orderID is ignored in standalone mode
rs, err = nh.RequestAddReplica(exampleShardID, replicaID, addr, 0, 3*time.Second)
} else if cmd == "remove" {
rs, err = nh.RequestDeleteReplica(exampleShardID, replicaID, 0, 3*time.Second)
} else {
panic("unknown cmd")
}
if err != nil {
fmt.Fprintf(os.Stderr, "membership change failed, %v\n", err)
return
}
select {
case r := <-rs.CompletedC:
if r.Completed() {
fmt.Fprintf(os.Stdout, "membership change completed successfully\n")
} else {
fmt.Fprintf(os.Stderr, "membership change failed\n")
}
}
}
// splitMembershipChangeCmd tries to parse the input string as membership change
// request. ADD node request has the following expected format -
// add localhost:63100 4
// REMOVE node request has the following expected format -
// remove 4
func splitMembershipChangeCmd(v string) (string, string, uint64, error) {
parts := strings.Split(v, " ")
if len(parts) == 2 || len(parts) == 3 {
cmd := strings.ToLower(strings.TrimSpace(parts[0]))
if cmd != "add" && cmd != "remove" {
return "", "", 0, errNotMembershipChange
}
addr := ""
var replicaIDStr string
var replicaID uint64
var err error
if cmd == "add" {
addr = strings.TrimSpace(parts[1])
replicaIDStr = strings.TrimSpace(parts[2])
} else {
replicaIDStr = strings.TrimSpace(parts[1])
}
if replicaID, err = strconv.ParseUint(replicaIDStr, 10, 64); err != nil {
return "", "", 0, errNotMembershipChange
}
return cmd, addr, replicaID, nil
}
return "", "", 0, errNotMembershipChange
}
func main() {
replicaID := flag.Int("replicaid", 1, "ReplicaID to use")
addr := flag.String("addr", "", "Nodehost address")
join := flag.Bool("join", false, "Joining a new node")
flag.Parse()
if len(*addr) == 0 && *replicaID != 1 && *replicaID != 2 && *replicaID != 3 {
fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n")
os.Exit(1)
}
// https://github.com/golang/go/issues/17393
if runtime.GOOS == "darwin" {
signal.Ignore(syscall.Signal(0xd))
}
initialMembers := make(map[uint64]string)
// when joining a new node which is not an initial members, the initialMembers
// map should be empty.
// when restarting a node that is not a member of the initial nodes, you can
// leave the initialMembers to be empty. we still populate the initialMembers
// here for simplicity.
if !*join {
for idx, v := range addresses {
// key is the ReplicaID, ReplicaID is not allowed to be 0
// value is the raft address
initialMembers[uint64(idx+1)] = v
}
}
var nodeAddr string
// for simplicity, in this example program, addresses of all those 3 initial
// raft members are hard coded. when address is not specified on the command
// line, we assume the node being launched is an initial raft member.
if len(*addr) != 0 {
nodeAddr = *addr
} else {
nodeAddr = initialMembers[uint64(*replicaID)]
}
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
// change the log verbosity
logger.GetLogger("raft").SetLevel(logger.ERROR)
logger.GetLogger("rsm").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
logger.GetLogger("grpc").SetLevel(logger.WARNING)
// config for raft node
// See GoDoc for all available options
rc := config.Config{
// ShardID and ReplicaID of the raft node
ReplicaID: uint64(*replicaID),
ShardID: exampleShardID,
// In this example, we assume the end-to-end round trip time (RTT) between
// NodeHost instances (on different machines, VMs or containers) are 200
// millisecond, it is set in the RTTMillisecond field of the
// config.NodeHostConfig instance below.
// ElectionRTT is set to 10 in this example, it determines that the node
// should start an election if there is no heartbeat from the leader for
// 10 * RTT time intervals.
ElectionRTT: 10,
// HeartbeatRTT is set to 1 in this example, it determines that when the
// node is a leader, it should broadcast heartbeat messages to its followers
// every such 1 * RTT time interval.
HeartbeatRTT: 1,
CheckQuorum: true,
// SnapshotEntries determines how often should we take a snapshot of the
// replicated state machine, it is set to 10 her which means a snapshot
// will be captured for every 10 applied proposals (writes).
// In your real world application, it should be set to much higher values
// You need to determine a suitable value based on how much space you are
// willing use on Raft Logs, how fast can you capture a snapshot of your
// replicated state machine, how often such snapshot is going to be used
// etc.
SnapshotEntries: 10,
// Once a snapshot is captured and saved, how many Raft entries already
// covered by the new snapshot should be kept. This is useful when some
// followers are just a little bit left behind, with such overhead Raft
// entries, the leaders can send them regular entries rather than the full
// snapshot image.
CompactionOverhead: 5,
}
datadir := filepath.Join(
"example-data",
"helloworld-data",
fmt.Sprintf("node%d", *replicaID))
// config for the nodehost
// See GoDoc for all available options
// by default, insecure transport is used, you can choose to use Mutual TLS
// Authentication to authenticate both servers and clients. To use Mutual
// TLS Authentication, set the MutualTLS field in NodeHostConfig to true, set
// the CAFile, CertFile and KeyFile fields to point to the path of your CA
// file, certificate and key files.
nhc := config.NodeHostConfig{
// WALDir is the directory to store the WAL of all Raft Logs. It is
// recommended to use Enterprise SSDs with good fsync() performance
// to get the best performance. A few SSDs we tested or known to work very
// well
// Recommended SATA SSDs -
// Intel S3700, Intel S3710, Micron 500DC
// Other SATA enterprise class SSDs with power loss protection
// Recommended NVME SSDs -
// Most enterprise NVME currently available on the market.
// SSD to avoid -
// Consumer class SSDs, no matter whether they are SATA or NVME based, as
// they usually have very poor fsync() performance.
//
// You can use the pg_test_fsync tool shipped with PostgreSQL to test the
// fsync performance of your WAL disk. It is recommended to use SSDs with
// fsync latency of well below 1 millisecond.
//
// Note that this is only for storing the WAL of Raft Logs, it is size is
// usually pretty small, 64GB per NodeHost is usually more than enough.
//
// If you just have one disk in your system, just set WALDir and NodeHostDir
// to the same location.
WALDir: datadir,
// NodeHostDir is where everything else is stored.
NodeHostDir: datadir,
// RTTMillisecond is the average round trip time between NodeHosts (usually
// on two machines/vms), it is in millisecond. Such RTT includes the
// processing delays caused by NodeHosts, not just the network delay between
// two NodeHost instances.
RTTMillisecond: 200,
// RaftAddress is used to identify the NodeHost instance
RaftAddress: nodeAddr,
}
nh, err := dragonboat.NewNodeHost(nhc)
if err != nil {
panic(err)
}
if err := nh.StartReplica(initialMembers, *join, NewExampleStateMachine, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
raftStopper := syncutil.NewStopper()
consoleStopper := syncutil.NewStopper()
ch := make(chan string, 16)
consoleStopper.RunWorker(func() {
reader := bufio.NewReader(os.Stdin)
for {
s, err := reader.ReadString('\n')
if err != nil {
close(ch)
return
}
if s == "exit\n" {
raftStopper.Stop()
// no data will be lost/corrupted if nodehost.Stop() is not called
nh.Close()
return
}
ch <- s
}
})
raftStopper.RunWorker(func() {
// this goroutine makes a linearizable read every 10 second. it returns the
// Count value maintained in IStateMachine. see datastore.go for details.
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
result, err := nh.SyncRead(ctx, exampleShardID, []byte{})
cancel()
if err == nil {
var count uint64
count = binary.LittleEndian.Uint64(result.([]byte))
fmt.Fprintf(os.Stdout, "count: %d\n", count)
}
case <-raftStopper.ShouldStop():
return
}
}
})
raftStopper.RunWorker(func() {
// use a NO-OP client session here
// check the example in godoc to see how to use a regular client session
cs := nh.GetNoOPSession(exampleShardID)
for {
select {
case v, ok := <-ch:
if !ok {
return
}
// remove the \n char
msg := strings.Replace(v, "\n", "", 1)
if cmd, addr, replicaID, err := splitMembershipChangeCmd(msg); err == nil {
// input is a membership change request
makeMembershipChange(nh, cmd, addr, replicaID)
} else {
// input is a regular message need to be proposed
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
// make a proposal to update the IStateMachine instance
_, err := nh.SyncPropose(ctx, cs, []byte(msg))
cancel()
if err != nil {
fmt.Fprintf(os.Stderr, "SyncPropose returned error %v\n", err)
}
}
case <-raftStopper.ShouldStop():
return
}
}
})
raftStopper.Wait()
}