-
Notifications
You must be signed in to change notification settings - Fork 0
/
fsm.go
127 lines (106 loc) · 3.09 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"sync"
"github.com/hashicorp/raft"
"github.com/tidwall/buntdb"
)
/*
Finite State Machine. In hashicorp/raft, you must implement raft.FSM interface to create Finite State Machine. It consists of 3 functions:
* Apply will be invoked when Raft already committed log entries in step 2.
* Snapshot is used to support log compaction. This can be used to save a point-in-time snapshot of the FSM.
* Restore is used to restore an FSM from a snapshot.
*/
type fsm struct {
mu sync.Mutex
db *buntdb.DB
logger *log.Logger
}
// newFSM returns a new Fsm.
func newFSM(db *buntdb.DB) *fsm {
return &fsm{
db: db,
logger: log.New(os.Stderr, "[fsm] ", log.LstdFlags),
}
}
// Apply applies a Raft log entry to the key-value store.
func (f *fsm) Apply(l *raft.Log) interface{} {
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
f.logger.Printf("failed to unmarshal command: %s", err.Error())
panic(err)
}
switch c.Op {
case "set":
return f.applySet(c.Key, c.Value)
default:
panic(fmt.Sprintf("unrecognized command op: %s", c.Op))
}
}
// Snapshot returns a snapshot of the key-value store.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
f.mu.Lock()
defer f.mu.Unlock()
return newSnapshotNoop()
}
// Restore stores the key-value store to a previous state.
func (f *fsm) Restore(rc io.ReadCloser) error {
defer func() {
if err := rc.Close(); err != nil {
f.logger.Printf("[FINALLY RESTORE] close error %s\n", err.Error())
}
}()
f.logger.Printf("[START RESTORE] read all message from snapshot\n")
var totalRestored int
decoder := json.NewDecoder(rc)
for decoder.More() {
var data = &command{}
err := decoder.Decode(data)
if err != nil {
f.logger.Printf("[END RESTORE] error decode data %s\n", err.Error())
return err
}
err = f.db.Update(func(tx *buntdb.Tx) error {
_, _, err := tx.Set(data.Key, data.Value, nil)
return err
})
if err != nil {
f.logger.Printf("[END RESTORE] error persist data %s\n", err.Error())
return err
}
totalRestored++
}
// read closing bracket
_, err := decoder.Token()
if err != nil {
f.logger.Printf("[END RESTORE] error %s\n", err.Error())
return err
}
f.logger.Printf("[END RESTORE] success restore %d messages in snapshot\n", totalRestored)
return nil
}
func (f *fsm) applySet(key, value string) interface{} {
f.mu.Lock()
defer f.mu.Unlock()
return f.db.Update(func(tx *buntdb.Tx) error {
_, _, err := tx.Set(key, value, nil)
return err
})
}
// snapshotNoop handle noop snapshot
type snapshotNoop struct{}
// Persist persist to disk. Return nil on success, otherwise return error.
func (s snapshotNoop) Persist(_ raft.SnapshotSink) error { return nil }
// Release release the lock after persist snapshot.
// Release is invoked when we are finished with the snapshot.
func (s snapshotNoop) Release() {}
// newSnapshotNoop is returned by an FSM in response to a snapshotNoop
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
func newSnapshotNoop() (raft.FSMSnapshot, error) {
return &snapshotNoop{}, nil
}