Skip to content

Commit

Permalink
snapshot restore-from-archive streaming and filtering (#13658)
Browse files Browse the repository at this point in the history
Stream snapshot to FSM when restoring from archive
The `RestoreFromArchive` helper decompresses the snapshot archive to a
temporary file before reading it into the FSM. For large snapshots
this performs a lot of disk IO. Stream decompress the snapshot as we
read it, without first writing to a temporary file.

Add bexpr filters to the `RestoreFromArchive` helper.
The operator can pass these as `-filter` arguments to `nomad operator
snapshot state` (and other commands in the future) to include only
desired data when reading the snapshot.
  • Loading branch information
tgross committed Jul 11, 2022
1 parent d442e1b commit 596203c
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 101 deletions.
3 changes: 3 additions & 0 deletions .changelog/13658.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: `operator snapshot state` supports `-filter` expressions and avoids writing large temporary files
```
33 changes: 29 additions & 4 deletions command/operator_snapshot_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"os"
"strings"

flaghelper "github.com/hashicorp/nomad/helper/flags"
"github.com/hashicorp/nomad/helper/raftutil"
"github.com/hashicorp/nomad/nomad"
"github.com/posener/complete"
)

Expand All @@ -16,13 +18,19 @@ type OperatorSnapshotStateCommand struct {

func (c *OperatorSnapshotStateCommand) Help() string {
helpText := `
Usage: nomad operator snapshot state <file>
Usage: nomad operator snapshot state [options] <file>
Displays a JSON representation of state in the snapshot.
To inspect the file "backup.snap":
$ nomad operator snapshot state backup.snap
Snapshot State Options:
-filter
Specifies an expression used to filter query results.
`
return strings.TrimSpace(helpText)
}
Expand All @@ -42,22 +50,39 @@ func (c *OperatorSnapshotStateCommand) Synopsis() string {
func (c *OperatorSnapshotStateCommand) Name() string { return "operator snapshot state" }

func (c *OperatorSnapshotStateCommand) Run(args []string) int {
var filterExpr flaghelper.StringFlag

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

flags.Var(&filterExpr, "filter", "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}

filter, err := nomad.NewFSMFilter(filterExpr.String())
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid filter expression %q: %s", filterExpr, err))
return 1
}

// Check that we either got no filename or exactly one.
if len(args) != 1 {
if len(flags.Args()) != 1 {
c.Ui.Error("This command takes one argument: <file>")
c.Ui.Error(commandErrorText(c))
return 1
}

path := args[0]
path := flags.Args()[0]
f, err := os.Open(path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err))
return 1
}
defer f.Close()

state, meta, err := raftutil.RestoreFromArchive(f)
state, meta, err := raftutil.RestoreFromArchive(f, filter)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read archive file: %s", err))
return 1
Expand Down
1 change: 1 addition & 0 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type nomadFSM interface {
raft.FSM
State() *state.StateStore
Restore(io.ReadCloser) error
RestoreWithFilter(io.ReadCloser, *nomad.FSMFilter) error
}

type FSMHelper struct {
Expand Down
43 changes: 23 additions & 20 deletions helper/raftutil/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,47 @@ package raftutil
import (
"fmt"
"io"
"io/ioutil"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"

"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
)

func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMeta, error) {
func RestoreFromArchive(archive io.Reader, filter *nomad.FSMFilter) (*state.StateStore, *raft.SnapshotMeta, error) {
logger := hclog.L()

fsm, err := dummyFSM(logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create FSM: %w", err)
}

snap, err := ioutil.TempFile("", "snap-")
if err != nil {
return nil, nil, fmt.Errorf("failed to create a temp file: %w", err)
}
defer os.Remove(snap.Name())
defer snap.Close()
// r is closed by RestoreFiltered, w is closed by CopySnapshot
r, w := io.Pipe()

meta, err := snapshot.CopySnapshot(archive, snap)
if err != nil {
return nil, nil, fmt.Errorf("failed to read snapshot: %w", err)
}
errCh := make(chan error)
metaCh := make(chan *raft.SnapshotMeta)

_, err = snap.Seek(0, 0)
if err != nil {
return nil, nil, fmt.Errorf("failed to seek: %w", err)
}
go func() {
meta, err := snapshot.CopySnapshot(archive, w)
if err != nil {
errCh <- fmt.Errorf("failed to read snapshot: %w", err)
} else {
metaCh <- meta
}
}()

err = fsm.Restore(snap)
err = fsm.RestoreWithFilter(r, filter)
if err != nil {
return nil, nil, fmt.Errorf("failed to restore from snapshot: %w", err)
}

return fsm.State(), meta, nil
select {
case err := <-errCh:
return nil, nil, err
case meta := <-metaCh:
return fsm.State(), meta, nil
}
}
15 changes: 12 additions & 3 deletions helper/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,22 @@ func (s *Snapshot) Close() error {
return os.Remove(s.file.Name())
}

type Discard struct {
io.Writer
}

func (dc Discard) Close() error { return nil }

// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, ioutil.Discard)
return CopySnapshot(in, Discard{Writer: io.Discard})
}

// CopySnapshot copies the snapshot content from snapshot archive to dest
func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) {
// CopySnapshot copies the snapshot content from snapshot archive to dest.
// It will close the destination once complete.
func CopySnapshot(in io.Reader, dest io.WriteCloser) (*raft.SnapshotMeta, error) {
defer dest.Close()

// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
Expand Down
Loading

0 comments on commit 596203c

Please sign in to comment.