Skip to content

Commit

Permalink
stream snapshot to FSM when restoring from archive
Browse files Browse the repository at this point in the history
The `RestoreFromArchive` helper decompresses the snapshot archive to a
temporary file before reading it into the FSM. For large snapshots
this performs a lot of disk IO. Stream decompress the snapshot as we
read it, without first writing to a temporary file.
  • Loading branch information
tgross committed Jul 8, 2022
1 parent 264d2dd commit f7d7a13
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
41 changes: 22 additions & 19 deletions helper/raftutil/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package raftutil
import (
"fmt"
"io"
"io/ioutil"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"

"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
)

func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMeta, error) {
Expand All @@ -20,27 +19,31 @@ func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMet
return nil, nil, fmt.Errorf("failed to create FSM: %w", err)
}

snap, err := ioutil.TempFile("", "snap-")
if err != nil {
return nil, nil, fmt.Errorf("failed to create a temp file: %w", err)
}
defer os.Remove(snap.Name())
defer snap.Close()
// r is closed by Restore, w is closed by CopySnapshot
r, w := io.Pipe()

meta, err := snapshot.CopySnapshot(archive, snap)
if err != nil {
return nil, nil, fmt.Errorf("failed to read snapshot: %w", err)
}
errCh := make(chan error)
metaCh := make(chan *raft.SnapshotMeta)

_, err = snap.Seek(0, 0)
if err != nil {
return nil, nil, fmt.Errorf("failed to seek: %w", err)
}
go func() {
meta, err := snapshot.CopySnapshot(archive, w)
if err != nil {
errCh <- fmt.Errorf("failed to read snapshot: %w", err)
}
if meta != nil {
metaCh <- meta
}
}()

err = fsm.Restore(snap)
err = fsm.Restore(r)
if err != nil {
return nil, nil, fmt.Errorf("failed to restore from snapshot: %w", err)
}

return fsm.State(), meta, nil
select {
case err := <-errCh:
return nil, nil, err
case meta := <-metaCh:
return fsm.State(), meta, nil
}
}
15 changes: 12 additions & 3 deletions helper/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,22 @@ func (s *Snapshot) Close() error {
return os.Remove(s.file.Name())
}

type Discard struct {
io.Writer
}

func (dc Discard) Close() error { return nil }

// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, ioutil.Discard)
return CopySnapshot(in, Discard{Writer: io.Discard})
}

// CopySnapshot copies the snapshot content from snapshot archive to dest
func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) {
// CopySnapshot copies the snapshot content from snapshot archive to dest.
// It will close the destination once complete.
func CopySnapshot(in io.Reader, dest io.WriteCloser) (*raft.SnapshotMeta, error) {
defer dest.Close()

// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
Expand Down

0 comments on commit f7d7a13

Please sign in to comment.