diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 6799d43b9a3..f59bb63010c 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -309,13 +309,7 @@ func (i *Index) Open() (rErr error) { func (i *Index) cleanUpFail(err *error) { if nil != *err { - for _, p := range i.partitions { - if (p != nil) && p.IsOpen() { - if e := p.Close(); e != nil { - i.logger.Warn("Failed to clean up partition") - } - } - } + i.close() } } @@ -352,16 +346,24 @@ func (i *Index) Close() error { // Lock index and close partitions. i.mu.Lock() defer i.mu.Unlock() + return i.close() +} +// close closes the index without locking +func (i *Index) close() (rErr error) { for _, p := range i.partitions { - if err := p.Close(); err != nil { - return err + if (p != nil) && p.IsOpen() { + if pErr := p.Close(); pErr != nil { + i.logger.Warn("Failed to clean up partition", zap.String("path", p.Path())) + if rErr == nil { + rErr = pErr + } + } } } - // Mark index as closed. i.opened = false - return nil + return rErr } // Path returns the path the index was opened with. diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 03bb3ae2ed9..1b58a8c266c 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -85,11 +85,13 @@ type Partition struct { // Index's version. version int + + manifestPathFn func() string } // NewPartition returns a new instance of Partition. func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { - return &Partition{ + p := &Partition{ closing: make(chan struct{}), path: path, sfile: sfile, @@ -104,6 +106,8 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { logger: zap.NewNop(), version: Version, } + p.manifestPathFn = p.manifestPath + return p } // bytes estimates the memory footprint of this Partition, in bytes. @@ -408,27 +412,44 @@ func (p *Partition) nextSequence() int { return p.seq } -// ManifestPath returns the path to the index's manifest file. func (p *Partition) ManifestPath() string { + return p.manifestPathFn() +} + +// ManifestPath returns the path to the index's manifest file. +func (p *Partition) manifestPath() string { return filepath.Join(p.path, ManifestFileName) } // Manifest returns a manifest for the index. func (p *Partition) Manifest() *Manifest { + return p.manifest(p.fileSet) +} + +// manifest returns a manifest for the index, possibly using a +// new FileSet to account for compaction or log prepending +func (p *Partition) manifest(newFileSet *FileSet) *Manifest { m := &Manifest{ Levels: p.levels, - Files: make([]string, len(p.fileSet.files)), + Files: make([]string, len(newFileSet.files)), Version: p.version, path: p.ManifestPath(), } - for j, f := range p.fileSet.files { + for j, f := range newFileSet.files { m.Files[j] = filepath.Base(f.Path()) } return m } +// SetManifestPathForTest is only to force a bad path in testing +func (p *Partition) SetManifestPathForTest(path string) { + p.mu.Lock() + defer p.mu.Unlock() + p.manifestPathFn = func() string { return path } +} + // WithLogger sets the logger for the index. func (p *Partition) WithLogger(logger *zap.Logger) { p.logger = logger.With(zap.String("index", "tsi")) @@ -471,25 +492,35 @@ func (p *Partition) retainFileSet() *FileSet { func (p *Partition) FileN() int { return len(p.fileSet.files) } // prependActiveLogFile adds a new log file so that the current log file can be compacted. -func (p *Partition) prependActiveLogFile() error { +func (p *Partition) prependActiveLogFile() (rErr error) { // Open file and insert it into the first position. f, err := p.openLogFile(filepath.Join(p.path, FormatLogFileName(p.nextSequence()))) if err != nil { return err } - p.activeLogFile = f + var oldActiveFile *LogFile + p.activeLogFile, oldActiveFile = f, p.activeLogFile + + // Prepend and generate new fileset but do not yet update the partition + newFileSet := p.fileSet.PrependLogFile(f) - // Prepend and generate new fileset. - p.fileSet = p.fileSet.PrependLogFile(f) + errors2.Capture(&rErr, func() error { + if rErr != nil { + // close the new file. + f.Close() + p.activeLogFile = oldActiveFile + } + return rErr + })() // Write new manifest. - manifestSize, err := p.Manifest().Write() + manifestSize, err := p.manifest(newFileSet).Write() if err != nil { - // TODO: Close index if write fails. - p.logger.Error("manifest write failed, index is potentially damaged", zap.Error(err)) - return err + return fmt.Errorf("manifest write failed for %q: %w", p.ManifestPath(), err) } p.manifestSize = manifestSize + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet return nil } @@ -1119,20 +1150,28 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch } // Obtain lock to swap in index file and write manifest. - if err := func() error { + if err := func() (rErr error) { p.mu.Lock() defer p.mu.Unlock() // Replace previous files with new index file. - p.fileSet = p.fileSet.MustReplace(IndexFiles(files).Files(), file) + newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file) // Write new manifest. - manifestSize, err := p.Manifest().Write() + manifestSize, err := p.manifest(newFileSet).Write() + defer errors2.Capture(&rErr, func() error { + if rErr != nil { + // Close the new file to avoid leaks. + file.Close() + } + return rErr + })() if err != nil { - // TODO: Close index if write fails. - return err + return fmt.Errorf("manifest file write failed compacting index %q: %w", p.ManifestPath(), err) } p.manifestSize = manifestSize + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet return nil }(); err != nil { log.Error("Cannot write manifest", zap.Error(err)) @@ -1268,20 +1307,29 @@ func (p *Partition) compactLogFile(logFile *LogFile) { } // Obtain lock to swap in index file and write manifest. - if err := func() error { + if err := func() (rErr error) { p.mu.Lock() defer p.mu.Unlock() // Replace previous log file with index file. - p.fileSet = p.fileSet.MustReplace([]File{logFile}, file) + newFileSet := p.fileSet.MustReplace([]File{logFile}, file) + + defer errors2.Capture(&rErr, func() error { + if rErr != nil { + // close new file + file.Close() + } + return rErr + })() // Write new manifest. - manifestSize, err := p.Manifest().Write() + manifestSize, err := p.manifest(newFileSet).Write() + if err != nil { - // TODO: Close index if write fails. - return err + return fmt.Errorf("manifest file write failed compacting log file %q: %w", p.ManifestPath(), err) } - + // Store the new FileSet in the partition now that the manifest has been written + p.fileSet = newFileSet p.manifestSize = manifestSize return nil }(); err != nil { @@ -1395,6 +1443,7 @@ func (m *Manifest) Validate() error { // Write writes the manifest file to the provided path, returning the number of // bytes written and an error, if any. func (m *Manifest) Write() (int64, error) { + var tmp string buf, err := json.MarshalIndent(m, "", " ") if err != nil { return 0, fmt.Errorf("failed marshaling %q: %w", m.path, err) @@ -1407,25 +1456,30 @@ func (m *Manifest) Write() (int64, error) { return 0, err } - tmp := f.Name() // In correct operation, Remove() should fail because the file was renamed defer os.Remove(tmp) + err = func() (rErr error) { // Close() before rename for Windows defer errors2.Capture(&rErr, f.Close)() + + tmp = f.Name() + + if err = f.Chmod(0666); err != nil { + return fmt.Errorf("failed setting permissions on manifest file %q: %w", tmp, err) + } if _, err = f.Write(buf); err != nil { return fmt.Errorf("failed writing temporary manifest file %q: %w", tmp, err) } + if err = f.Sync(); err != nil { + return fmt.Errorf("failed syncing temporary manifest file to disk %q: %w", tmp, err) + } return nil }() if err != nil { return 0, err } - if err = os.Chmod(tmp, 0666); err != nil { - return 0, err - } - if err = os.Rename(tmp, m.path); err != nil { return 0, err } diff --git a/tsdb/index/tsi1/partition_test.go b/tsdb/index/tsi1/partition_test.go index c01e65a5f69..6f81c9a81c1 100644 --- a/tsdb/index/tsi1/partition_test.go +++ b/tsdb/index/tsi1/partition_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "testing" "github.com/influxdata/influxdb/tsdb" @@ -82,6 +83,73 @@ func TestPartition_Manifest(t *testing.T) { }) } +var badManifestPath string = filepath.Join(os.DevNull, tsi1.ManifestFileName) + +func TestPartition_Manifest_Write_Fail(t *testing.T) { + const expectedError = "not a directory" + t.Run("write MANIFEST", func(t *testing.T) { + m := tsi1.NewManifest(badManifestPath) + _, err := m.Write() + if !strings.Contains(err.Error(), expectedError) { + t.Fatalf("expected: %q, got: %q", expectedError, err.Error()) + } + }) +} + +func TestPartition_PrependLogFile_Write_Fail(t *testing.T) { + const expectedError = "not a directory" + t.Run("write MANIFEST", func(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + p := MustOpenPartition(sfile.SeriesFile) + defer func() { + if err := p.Close(); err != nil { + t.Fatalf("error closing partition: %v", err) + } + }() + p.Partition.MaxLogFileSize = -1 + fileN := p.FileN() + p.CheckLogFile() + if fileN >= p.FileN() { + t.Fatalf("manifest write prepending log file should have succeeded but number of files did not change correctly: expected more than %d files, got %d files", fileN, p.FileN()) + } + p.SetManifestPathForTest(badManifestPath) + fileN = p.FileN() + p.CheckLogFile() + if fileN != p.FileN() { + t.Fatalf("manifest write prepending log file should have failed, but number of files changed: expected %d files, got %d files", fileN, p.FileN()) + } + }) +} + +func TestPartition_Compact_Write_Fail(t *testing.T) { + const expectedError = "not a directory" + t.Run("write MANIFEST", func(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + p := MustOpenPartition(sfile.SeriesFile) + defer func() { + if err := p.Close(); err != nil { + t.Fatalf("error closing partition: %v", err) + } + }() + p.Partition.MaxLogFileSize = -1 + fileN := p.FileN() + p.Compact() + if (1 + fileN) != p.FileN() { + t.Fatalf("manifest write in compaction should have succeeded, but number of files did not change correctly: expected %d files, got %d files", fileN+1, p.FileN()) + } + p.SetManifestPathForTest(badManifestPath) + fileN = p.FileN() + p.Compact() + if fileN != p.FileN() { + t.Fatalf("manifest write should have failed the compaction, but number of files changed: expected %d files, got %d files", fileN, p.FileN()) + } + }) +} + // Partition is a test wrapper for tsi1.Partition. type Partition struct { *tsi1.Partition