Skip to content

Commit

Permalink
Implement snapshoting for FSM
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Jul 11, 2019
1 parent a86eada commit 82a8a64
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 52 deletions.
64 changes: 12 additions & 52 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dkron

import (
"encoding/json"
"io"
"sync"

Expand Down Expand Up @@ -113,73 +112,34 @@ func (d *dkronFSM) applySetExecution(buf []byte) interface{} {
}

// Snapshot returns a snapshot of the key-value store. We wrap
// the things we need in fsmSnapshot and then send that over to Persist.
// Persist encodes the needed data from fsmsnapshot and transport it to
// the things we need in dkronSnapshot and then send that over to Persist.
// Persist encodes the needed data from dkronSnapshot and transport it to
// Restore where the necessary data is replicated into the finite state machine.
// This allows the consensus algorithm to truncate the replicated log.
func (d *dkronFSM) Snapshot() (raft.FSMSnapshot, error) {
d.mu.Lock()
defer d.mu.Unlock()

// Clone the kvstore into a map for easy transport
mapClone := make(map[string]string)
// opt := badger.DefaultIteratorOptions
// itr := f.kvs.kv.NewIterator(opt)
// for itr.Rewind(); itr.Valid(); itr.Next() {
// item := itr.Item()
// mapClone[string(item.Key()[:])] = string(item.Value()[:])
// }
// itr.Close()

return &dkronSnapshot{kvMap: mapClone}, nil
return &dkronSnapshot{store: d.store}, nil
}

// Restore stores the key-value store to a previous state.
func (d *dkronFSM) Restore(kvMap io.ReadCloser) error {
kvSnapshot := make(map[string]string)
if err := json.NewDecoder(kvMap).Decode(&kvSnapshot); err != nil {
return err
}

// Set the state from the snapshot, no lock required according to
// Hashicorp docs.
//for k, v := range kvSnapshot {
// f.kvs.Set([]byte(k), []byte(v))
//}

return nil
func (d *dkronFSM) Restore(r io.ReadCloser) error {
return d.Restore(r)
}

type dkronSnapshot struct {
kvMap map[string]string
store Storage
}

func (d *dkronSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Encode data.
b, err := json.Marshal(d.kvMap)
if err != nil {
return err
}

// Write data to sink.
if _, err := sink.Write(b); err != nil {
return err
}

// Close the sink.
if err := sink.Close(); err != nil {
return err
}

return nil
}()

if err != nil {
if err := d.store.Snapshot(sink); err != nil {
sink.Cancel()
return err
}

// Close the sink.
if err := sink.Close(); err != nil {
return err
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions dkron/storage.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dkron

import "io"

// Storage is the interface that should be used by any
// storage engine implemented for dkron. It contains the
// minumum set of operations that are needed to have a working
Expand All @@ -18,4 +20,6 @@ type Storage interface {
GetExecutionGroup(execution *Execution) ([]*Execution, error)
GetGroupedExecutions(jobName string) (map[int64][]*Execution, []int64, error)
Shutdown() error
Snapshot(w io.WriteCloser) error
Restore(r io.ReadCloser) error
}
12 changes: 12 additions & 0 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -670,6 +671,17 @@ func (s *Store) Shutdown() error {
return s.db.Close()
}

// Snapshot creates a backup of the data stored in Badger
func (s *Store) Snapshot(w io.WriteCloser) error {
_, err := s.db.Backup(w, 0)
return err
}

// Restore load data created with backup in to Badger
func (s *Store) Restore(r io.ReadCloser) error {
return s.db.Load(r)
}

func (s *Store) unmarshalExecutions(items []*kv, stopWord string) ([]*Execution, error) {
var executions []*Execution
for _, item := range items {
Expand Down

0 comments on commit 82a8a64

Please sign in to comment.