This repository has been archived by the owner on Oct 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfsm.go
73 lines (66 loc) · 1.77 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package arctonyx
import (
"bytes"
"encoding/hex"
"github.com/dgraph-io/badger"
"github.com/golang/protobuf/proto"
"github.com/kataras/go-errors"
"github.com/readystock/golog"
"github.com/readystock/raft"
"io"
"time"
)
type fsm Store
func (f *fsm) Apply(l *raft.Log) interface{} {
var c Command
if err := proto.Unmarshal(l.Data, &c); err != nil {
golog.Fatalf("failed to unmarshal command: %s. %s", err.Error(), hex.Dump(l.Data))
return err
}
r := CommandResponse{
Timestamp: c.Timestamp,
Operation: c.Operation,
}
golog.Verbosef("[%d] FSM Receive Delay [%s]", f.nodeId, time.Since(time.Unix(0, int64(c.Timestamp))))
if err := func() error {
switch c.Operation {
case Operation_SET:
return f.applySet(c.Key, c.Value)
case Operation_DELETE:
return f.applyDelete(c.Key)
default:
return errors.New("unsupported command operation: %d").Format(c.Operation)
}
}(); err != nil {
r.ErrorMessage = err.Error()
r.IsSuccess = false
} else {
r.IsSuccess = true
r.AppliedTimestamp = uint64(time.Now().UnixNano())
}
return r
}
// Restore stores the key-value store to a previous state.
func (f *fsm) Restore(rc io.ReadCloser) error {
return f.badger.Load(rc)
}
// Snapshot returns a snapshot of the key-value store.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
w := &bytes.Buffer{}
f.badger.Backup(w, 0)
return &snapshot{
store: w.Bytes(),
}, nil
}
func (f *fsm) applySet(key, value []byte) error {
return f.badger.Update(func(txn *badger.Txn) error {
golog.Verbosef("[%d] FSM Setting Key: %s", f.nodeId, string(key))
return txn.Set(key, value)
})
}
func (f *fsm) applyDelete(key []byte) error {
return f.badger.Update(func(txn *badger.Txn) error {
golog.Verbosef("[%d] Deleting Key: %s", f.nodeId, string(key))
return txn.Delete(key)
})
}