Skip to content

Commit

Permalink
fix: restore in-memory Manifest on write error (#23552) (#23578)
Browse files Browse the repository at this point in the history
Do not update the `FileSet` or `activeLogFile` field in the in-memory
Partition structure if the Manifest file is not correctly saved to
the disk.

closes #23553

(cherry picked from commit a8732dc)

closes #23554
  • Loading branch information
davidby-influx authored Jul 25, 2022
1 parent 37562c7 commit 619eb1c
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 40 deletions.
24 changes: 13 additions & 11 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,7 @@ func (i *Index) Open() (rErr error) {

func (i *Index) cleanUpFail(err *error) {
if nil != *err {
for _, p := range i.partitions {
if (p != nil) && p.IsOpen() {
if e := p.Close(); e != nil {
i.logger.Warn("Failed to clean up partition")
}
}
}
i.close()
}
}

Expand Down Expand Up @@ -352,16 +346,24 @@ func (i *Index) Close() error {
// Lock index and close partitions.
i.mu.Lock()
defer i.mu.Unlock()
return i.close()
}

// close closes the index without locking
func (i *Index) close() (rErr error) {
for _, p := range i.partitions {
if err := p.Close(); err != nil {
return err
if (p != nil) && p.IsOpen() {
if pErr := p.Close(); pErr != nil {
i.logger.Warn("Failed to clean up partition", zap.String("path", p.Path()))
if rErr == nil {
rErr = pErr
}
}
}
}

// Mark index as closed.
i.opened = false
return nil
return rErr
}

// Path returns the path the index was opened with.
Expand Down
116 changes: 87 additions & 29 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ type Partition struct {

// Index's version.
version int

manifestPathFn func() string
}

// NewPartition returns a new instance of Partition.
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
p := &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
Expand All @@ -104,6 +106,8 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
logger: zap.NewNop(),
version: Version,
}
p.manifestPathFn = p.manifestPath
return p
}

// bytes estimates the memory footprint of this Partition, in bytes.
Expand Down Expand Up @@ -408,27 +412,44 @@ func (p *Partition) nextSequence() int {
return p.seq
}

// ManifestPath returns the path to the index's manifest file.
func (p *Partition) ManifestPath() string {
return p.manifestPathFn()
}

// ManifestPath returns the path to the index's manifest file.
func (p *Partition) manifestPath() string {
return filepath.Join(p.path, ManifestFileName)
}

// Manifest returns a manifest for the index.
func (p *Partition) Manifest() *Manifest {
return p.manifest(p.fileSet)
}

// manifest returns a manifest for the index, possibly using a
// new FileSet to account for compaction or log prepending
func (p *Partition) manifest(newFileSet *FileSet) *Manifest {
m := &Manifest{
Levels: p.levels,
Files: make([]string, len(p.fileSet.files)),
Files: make([]string, len(newFileSet.files)),
Version: p.version,
path: p.ManifestPath(),
}

for j, f := range p.fileSet.files {
for j, f := range newFileSet.files {
m.Files[j] = filepath.Base(f.Path())
}

return m
}

// SetManifestPathForTest is only to force a bad path in testing
func (p *Partition) SetManifestPathForTest(path string) {
p.mu.Lock()
defer p.mu.Unlock()
p.manifestPathFn = func() string { return path }
}

// WithLogger sets the logger for the index.
func (p *Partition) WithLogger(logger *zap.Logger) {
p.logger = logger.With(zap.String("index", "tsi"))
Expand Down Expand Up @@ -468,28 +489,42 @@ func (p *Partition) retainFileSet() *FileSet {
}

// FileN returns the active files in the file set.
func (p *Partition) FileN() int { return len(p.fileSet.files) }
func (p *Partition) FileN() int {
p.mu.RLock()
defer p.mu.RUnlock()
return len(p.fileSet.files)
}

// prependActiveLogFile adds a new log file so that the current log file can be compacted.
func (p *Partition) prependActiveLogFile() error {
func (p *Partition) prependActiveLogFile() (rErr error) {
// Open file and insert it into the first position.
f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence())))
if err != nil {
return err
}
p.activeLogFile = f
var oldActiveFile *LogFile
p.activeLogFile, oldActiveFile = f, p.activeLogFile

// Prepend and generate new fileset.
p.fileSet = p.fileSet.PrependLogFile(f)
// Prepend and generate new fileset but do not yet update the partition
newFileSet := p.fileSet.PrependLogFile(f)

errors2.Capture(&rErr, func() error {
if rErr != nil {
// close the new file.
f.Close()
p.activeLogFile = oldActiveFile
}
return rErr
})()

// Write new manifest.
manifestSize, err := p.Manifest().Write()
manifestSize, err := p.manifest(newFileSet).Write()
if err != nil {
// TODO: Close index if write fails.
p.logger.Error("manifest write failed, index is potentially damaged", zap.Error(err))
return err
return fmt.Errorf("manifest write failed for %q: %w", p.ManifestPath(), err)
}
p.manifestSize = manifestSize
// Store the new FileSet in the partition now that the manifest has been written
p.fileSet = newFileSet
return nil
}

Expand Down Expand Up @@ -1113,20 +1148,28 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
}

// Obtain lock to swap in index file and write manifest.
if err := func() error {
if err := func() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()

// Replace previous files with new index file.
p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file)
newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file)

// Write new manifest.
manifestSize, err := p.Manifest().Write()
manifestSize, err := p.manifest(newFileSet).Write()
defer errors2.Capture(&rErr, func() error {
if rErr != nil {
// Close the new file to avoid leaks.
file.Close()
}
return rErr
})()
if err != nil {
// TODO: Close index if write fails.
return err
return fmt.Errorf("manifest file write failed compacting index %q: %w", p.ManifestPath(), err)
}
p.manifestSize = manifestSize
// Store the new FileSet in the partition now that the manifest has been written
p.fileSet = newFileSet
return nil
}(); err != nil {
log.Error("Cannot write manifest", zap.Error(err))
Expand Down Expand Up @@ -1262,20 +1305,29 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
}

// Obtain lock to swap in index file and write manifest.
if err := func() error {
if err := func() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()

// Replace previous log file with index file.
p.fileSet = p.fileSet.MustReplace([]File{logFile}, file)
newFileSet := p.fileSet.MustReplace([]File{logFile}, file)

defer errors2.Capture(&rErr, func() error {
if rErr != nil {
// close new file
file.Close()
}
return rErr
})()

// Write new manifest.
manifestSize, err := p.Manifest().Write()
manifestSize, err := p.manifest(newFileSet).Write()

if err != nil {
// TODO: Close index if write fails.
return err
return fmt.Errorf("manifest file write failed compacting log file %q: %w", p.ManifestPath(), err)
}

// Store the new FileSet in the partition now that the manifest has been written
p.fileSet = newFileSet
p.manifestSize = manifestSize
return nil
}(); err != nil {
Expand Down Expand Up @@ -1389,6 +1441,7 @@ func (m *Manifest) Validate() error {
// Write writes the manifest file to the provided path, returning the number of
// bytes written and an error, if any.
func (m *Manifest) Write() (int64, error) {
var tmp string
buf, err := json.MarshalIndent(m, "", " ")
if err != nil {
return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err)
Expand All @@ -1401,25 +1454,30 @@ func (m *Manifest) Write() (int64, error) {
return 0, err
}

tmp := f.Name()
// In correct operation, Remove() should fail because the file was renamed
defer os.Remove(tmp)

err = func() (rErr error) {
// Close() before rename for Windows
defer errors2.Capture(&rErr, f.Close)()

tmp = f.Name()

if err = f.Chmod(0666); err != nil {
return fmt.Errorf("failed setting permissions on manifest file %q: %w", tmp, err)
}
if _, err = f.Write(buf); err != nil {
return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err)
}
if err = f.Sync(); err != nil {
return fmt.Errorf("failed syncing temporary manifest file to disk %q: %w", tmp, err)
}
return nil
}()
if err != nil {
return 0, err
}

if err = os.Chmod(tmp, 0666); err != nil {
return 0, err
}

if err = os.Rename(tmp, m.path); err != nil {
return 0, err
}
Expand Down
65 changes: 65 additions & 0 deletions tsdb/index/tsi1/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"syscall"
"testing"

"github.com/influxdata/influxdb/v2/tsdb"
Expand Down Expand Up @@ -82,6 +83,70 @@ func TestPartition_Manifest(t *testing.T) {
})
}

var badManifestPath string = filepath.Join(os.DevNull, tsi1.ManifestFileName)

func TestPartition_Manifest_Write_Fail(t *testing.T) {
t.Run("write MANIFEST", func(t *testing.T) {
m := tsi1.NewManifest(badManifestPath)
_, err := m.Write()
if !errors.Is(err, syscall.ENOTDIR) {
t.Fatalf("expected: syscall.ENOTDIR, got %T: %v", err, err)
}
})
}

func TestPartition_PrependLogFile_Write_Fail(t *testing.T) {
t.Run("write MANIFEST", func(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

p := MustOpenPartition(sfile.SeriesFile)
defer func() {
if err := p.Close(); err != nil {
t.Fatalf("error closing partition: %v", err)
}
}()
p.Partition.MaxLogFileSize = -1
fileN := p.FileN()
p.CheckLogFile()
if fileN >= p.FileN() {
t.Fatalf("manifest write prepending log file should have succeeded but number of files did not change correctly: expected more than %d files, got %d files", fileN, p.FileN())
}
p.SetManifestPathForTest(badManifestPath)
fileN = p.FileN()
p.CheckLogFile()
if fileN != p.FileN() {
t.Fatalf("manifest write prepending log file should have failed, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
}
})
}

func TestPartition_Compact_Write_Fail(t *testing.T) {
t.Run("write MANIFEST", func(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

p := MustOpenPartition(sfile.SeriesFile)
defer func() {
if err := p.Close(); err != nil {
t.Fatalf("error closing partition: %v", err)
}
}()
p.Partition.MaxLogFileSize = -1
fileN := p.FileN()
p.Compact()
if (1 + fileN) != p.FileN() {
t.Fatalf("manifest write in compaction should have succeeded, but number of files did not change correctly: expected %d files, got %d files", fileN+1, p.FileN())
}
p.SetManifestPathForTest(badManifestPath)
fileN = p.FileN()
p.Compact()
if fileN != p.FileN() {
t.Fatalf("manifest write should have failed the compaction, but number of files changed: expected %d files, got %d files", fileN, p.FileN())
}
})
}

// Partition is a test wrapper for tsi1.Partition.
type Partition struct {
*tsi1.Partition
Expand Down

0 comments on commit 619eb1c

Please sign in to comment.