diff --git a/raftwal/fsm.go b/raftwal/fsm.go index 00c2c0e54..81d9890d4 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -188,11 +188,11 @@ func (fsm *BalloonFSM) Snapshot() (raft.FSMSnapshot, error) { fsm.restoreMu.Lock() defer fsm.restoreMu.Unlock() - version, err := fsm.store.GetLastVersion() + id, err := fsm.store.Snapshot() if err != nil { return nil, err } - log.Debugf("Generating snapshot until version: %d (balloon version %d)", version, fsm.balloon.Version()) + log.Debugf("Generating snapshot until version: %d (balloon version %d)", id, fsm.balloon.Version()) // Copy the node metadata. meta, err := json.Marshal(fsm.meta) @@ -200,8 +200,8 @@ func (fsm *BalloonFSM) Snapshot() (raft.FSMSnapshot, error) { log.Debugf("failed to encode meta for snapshot: %s", err.Error()) return nil, err } - - return &fsmSnapshot{lastVersion: version, store: fsm.store, meta: meta}, nil + // change lastVersion by checkpoint structure + return &fsmSnapshot{id: id, store: fsm.store, meta: meta}, nil } // Restore restores the node to a previous state. diff --git a/raftwal/snapshot.go b/raftwal/snapshot.go index d3ef66b9e..233d1a058 100644 --- a/raftwal/snapshot.go +++ b/raftwal/snapshot.go @@ -23,16 +23,16 @@ import ( ) type fsmSnapshot struct { - lastVersion uint64 - store storage.ManagedStore - meta []byte + id uint64 + store storage.ManagedStore + meta []byte } // Persist writes the snapshot to the given sink. func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { log.Debug("Persisting snapshot...") err := func() error { - if err := f.store.Backup(sink, f.lastVersion); err != nil { + if err := f.store.Backup(sink, f.id); err != nil { return err } return sink.Close() diff --git a/storage/badger/badger_store.go b/storage/badger/badger_store.go index 72d792aac..794a78fba 100644 --- a/storage/badger/badger_store.go +++ b/storage/badger/badger_store.go @@ -335,7 +335,11 @@ func (s *BadgerStore) Load(r io.Reader) error { return s.db.Load(r) } -func (s *BadgerStore) GetLastVersion() (uint64, error) { +// Take a snapshot of the store, and returns and id +// to be used in the back up process. The state of the +// snapshot is stored in the store instance. +// In badger the id corresponds to the last version stored. +func (s *BadgerStore) Snapshot() (uint64, error) { var version uint64 err := s.db.View(func(txn *b.Txn) error { opts := b.DefaultIteratorOptions diff --git a/storage/badger/badger_store_test.go b/storage/badger/badger_store_test.go index 4908a4971..2e5ea0e56 100644 --- a/storage/badger/badger_store_test.go +++ b/storage/badger/badger_store_test.go @@ -238,7 +238,7 @@ func TestBackupLoad(t *testing.T) { } } - version, err := store.GetLastVersion() + version, err := store.Snapshot() require.NoError(t, err) backupDir := mustTempDir() @@ -251,7 +251,7 @@ func TestBackupLoad(t *testing.T) { restore, recloseF := openBadgerStore(t) defer recloseF() restore.Load(backupFile) - reversion, err := store.GetLastVersion() + reversion, err := store.Snapshot() require.NoError(t, err) require.Equal(t, reversion, version, "Error in restored version") diff --git a/storage/rocks/rocksdb_store.go b/storage/rocks/rocksdb_store.go index 81e6b7e72..dad1aa3d8 100644 --- a/storage/rocks/rocksdb_store.go +++ b/storage/rocks/rocksdb_store.go @@ -19,8 +19,8 @@ import ( "bufio" "bytes" "encoding/binary" + "fmt" "io" - "io/ioutil" "os" "github.com/bbva/qed/rocksdb" @@ -30,6 +30,15 @@ import ( type RocksDBStore struct { db *rocksdb.DB + + // checkpoints are stored in a path on the same + // folder as the database, so rocksdb uses hardlinks instead + // of copies + checkPointPath string + + // each checkpoint is created in a subdirectory + // inside checkPointPath folder + checkpoints map[uint64]string } type rocksdbOpts struct { @@ -55,8 +64,17 @@ func NewRocksDBStoreOpts(opts *rocksdbOpts) (*RocksDBStore, error) { if err != nil { return nil, err } + checkPointPath := opts.Path + "/checkpoints" + err = os.MkdirAll(checkPointPath, 0755) + if err != nil { + return nil, err + } - store := &RocksDBStore{db: db} + store := &RocksDBStore{ + db: db, + checkPointPath: checkPointPath, + checkpoints: make(map[uint64]string), + } return store, nil } @@ -172,26 +190,32 @@ func (s RocksDBStore) Delete(prefix byte, key []byte) error { return s.db.Delete(rocksdb.NewDefaultWriteOptions(), k) } -// Backup dumps a protobuf-encoded list of all entries in the database into the -// given writer, that are newer than the specified version. -func (s *RocksDBStore) Backup(w io.Writer, until uint64) error { - +// Take a snapshot of the store, and returns and id +// to be used in the back up process. The state of the +// snapshot is stored in the store instance. +func (s *RocksDBStore) Snapshot() (uint64, error) { // create temp directory - checkDir, err := ioutil.TempDir("", "rocksdb-checkpoint") - if err != nil { - return err - } + id := uint64(len(s.checkpoints) + 1) + checkDir := fmt.Sprintf("%s/rocksdb-checkpoint-%d", s.checkPointPath, id) os.RemoveAll(checkDir) // create checkpoint checkpoint, err := s.db.NewCheckpoint() if err != nil { - return err + return 0, err } defer checkpoint.Destroy() - checkpoint.CreateCheckpoint(checkDir, 0) - defer os.RemoveAll(checkDir) + + s.checkpoints[id] = checkDir + return id, nil +} + +// Backup dumps a protobuf-encoded list of all entries in the database into the +// given writer, that are newer than the specified version. +func (s *RocksDBStore) Backup(w io.Writer, id uint64) error { + + checkDir := s.checkpoints[id] // open db for read-only opts := rocksdb.NewDefaultOptions() @@ -223,6 +247,13 @@ func (s *RocksDBStore) Backup(w io.Writer, until uint64) error { } } + // remove checkpoint from list + // order must be maintained, + delete(s.checkpoints, id) + + // clean up only after we succesfully backup + os.RemoveAll(checkDir) + return nil } diff --git a/storage/rocks/rocksdb_store_test.go b/storage/rocks/rocksdb_store_test.go index 0bc8a23c8..832767a8e 100644 --- a/storage/rocks/rocksdb_store_test.go +++ b/storage/rocks/rocksdb_store_test.go @@ -207,7 +207,9 @@ func TestBackupLoad(t *testing.T) { // create backup ioBuf := bytes.NewBufferString("") - require.NoError(t, store.Backup(ioBuf, 0)) + id, err := store.Snapshot() + require.Nil(t, err) + require.NoError(t, store.Backup(ioBuf, id)) // restore backup restore, recloseF := openRocksDBStore(t) diff --git a/storage/store.go b/storage/store.go index dd4ea1802..daa4f2172 100644 --- a/storage/store.go +++ b/storage/store.go @@ -47,11 +47,12 @@ type DeletableStore interface { Store Delete(prefix byte, key []byte) error } + type ManagedStore interface { Store Backup(w io.Writer, until uint64) error + Snapshot() (uint64, error) Load(r io.Reader) error - GetLastVersion() (uint64, error) } type Mutation struct {