Skip to content

Commit

Permalink
chore: improve error messages and logging during shard opening (#25333)
Browse files Browse the repository at this point in the history
Backport from main-2.x.

(cherry picked from commit da9615f)

Closes: #25332
  • Loading branch information
gwossum authored Sep 16, 2024
1 parent 46086c8 commit cc9bd41
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 31 deletions.
8 changes: 5 additions & 3 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,24 @@ func NewEngine(id uint64, i Index, path string, walPath string, sfile *SeriesFil
options.OnNewEngine(engine)
}
return engine, nil
} else if err != nil {
return nil, fmt.Errorf("error getting file stats for %q in NewEngine: %w", path, err)
}

// If it's a dir then it's a tsm1 engine
format := DefaultEngine
if fi, err := os.Stat(path); err != nil {
return nil, err
return nil, fmt.Errorf("error calling Stat on %q in NewEngine: %w", path, err)
} else if !fi.Mode().IsDir() {
return nil, ErrUnknownEngineFormat
return nil, fmt.Errorf("error opening %q: %w", path, ErrUnknownEngineFormat)
} else {
format = "tsm1"
}

// Lookup engine by format.
fn := newEngineFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid engine format: %q", format)
return nil, fmt.Errorf("invalid engine format for %q: %q", path, format)
}

engine := fn(id, i, path, walPath, sfile, options)
Expand Down
23 changes: 13 additions & 10 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,10 @@ func (e *Engine) Open(ctx context.Context) error {
return err
}

fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger)
fieldPath := filepath.Join(e.path, "fields.idx")
fields, err := tsdb.NewMeasurementFieldSet(fieldPath, e.logger)
if err != nil {
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
e.logger.Warn("error opening fields.idx: Rebuilding.", zap.String("path", fieldPath), zap.Error(err))
}

e.mu.Lock()
Expand All @@ -750,7 +751,7 @@ func (e *Engine) Open(ctx context.Context) error {

if e.WALEnabled {
if err := e.WAL.Open(); err != nil {
return err
return fmt.Errorf("error opening WAL for %q: %w", fieldPath, err)
}
}

Expand Down Expand Up @@ -2289,7 +2290,7 @@ func (e *Engine) reloadCache() error {
now := time.Now()
files, err := segmentFileNames(e.WAL.Path())
if err != nil {
return err
return fmt.Errorf("error getting segment file names for %q in Engine.reloadCache: %w", e.WAL.Path(), err)
}

limit := e.Cache.MaxSize()
Expand Down Expand Up @@ -2318,15 +2319,16 @@ func (e *Engine) cleanup() error {
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
return fmt.Errorf("error calling ReadDir for %q in Engine.cleanup: %w", e.path, err)
}

ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
path := filepath.Join(e.path, f.Name())
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q in Engine.cleanup: %w", path, err)
}
}
}
Expand All @@ -2335,14 +2337,15 @@ func (e *Engine) cleanup() error {
}

func (e *Engine) cleanupTempTSMFiles() error {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
pattern := filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension))
files, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("error getting compaction temp files: %s", err.Error())
return fmt.Errorf("error getting compaction temp files for %q in Engine.cleanupTempTSMFiles: %w", pattern, err)
}

for _, f := range files {
if err := os.Remove(f); err != nil {
return fmt.Errorf("error removing temp compaction files: %v", err)
return fmt.Errorf("error removing temp compaction file %q in Engine.cleanupTempTSMFiles: %w", f, err)
}
}
return nil
Expand Down
24 changes: 14 additions & 10 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (f *FileStore) Open(ctx context.Context) error {
// find the current max ID for temp directories
tmpfiles, err := os.ReadDir(f.dir)
if err != nil {
return err
return fmt.Errorf("error calling ReadDir on %q in FileStore.Open: %w", f.dir, err)
}

// ascertain the current temp directory number by examining the existing
Expand All @@ -575,9 +575,10 @@ func (f *FileStore) Open(ctx context.Context) error {
f.currentTempDirID = i
}

files, err := filepath.Glob(filepath.Join(f.dir, "*."+TSMFileExtension))
pattern := filepath.Join(f.dir, "*."+TSMFileExtension)
files, err := filepath.Glob(pattern)
if err != nil {
return err
return fmt.Errorf("error in Glob for %q in FileStore.Open: %w", pattern, err)
}

// struct to hold the result of opening each reader in a goroutine
Expand All @@ -591,7 +592,7 @@ func (f *FileStore) Open(ctx context.Context) error {
// Keep track of the latest ID
generation, _, err := f.parseFileName(fn)
if err != nil {
return err
return fmt.Errorf("error parsing %q in FileStore.Open: %w", fn, err)
}

if generation >= f.currentGeneration {
Expand All @@ -600,7 +601,7 @@ func (f *FileStore) Open(ctx context.Context) error {

file, err := os.OpenFile(fn, os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("error opening file %s: %v", fn, err)
return fmt.Errorf("error calling OpenFile on %q in FileStore.Open: %w", fn, err)
}

go func(idx int, file *os.File) {
Expand All @@ -624,17 +625,20 @@ func (f *FileStore) Open(ctx context.Context) error {
// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
if err != nil {
if cerr := file.Close(); cerr != nil {
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
}
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
file.Close()
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
return
}

df.WithObserver(f.obs)
readerC <- &res{r: df}
}(i, file)
Expand Down Expand Up @@ -668,7 +672,7 @@ func (f *FileStore) Open(ctx context.Context) error {
f.lastModified = fi.ModTime().UTC()
} else {
close(readerC)
return err
return fmt.Errorf("error calling Stat on %q in FileStore.Open: %w", f.dir, err)
}
} else {
f.lastModified = time.Unix(0, lm).UTC()
Expand Down
5 changes: 3 additions & 2 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,10 @@ func (l *WAL) Close() error {

// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
func segmentFileNames(dir string) ([]string, error) {
names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
pattern := filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension))
names, err := filepath.Glob(pattern)
if err != nil {
return nil, err
return nil, fmt.Errorf("segmentFileNames: error in Glob for %q: %w", pattern, err)
}
sort.Strings(names)
return names, nil
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3209,15 +3209,15 @@ func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
return nil, fmt.Errorf("error calling Stat on %q in NewIndex: %w", path, err)
} else if err == nil {
format = TSI1IndexName
}

// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
return nil, fmt.Errorf("invalid index format for %q in NewIndex: %q", path, format)
}
return fn(id, database, path, seriesIDSet, sfile, options), nil
}
Expand Down
5 changes: 5 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (e ShardError) Error() string {
return fmt.Sprintf("[shard %d] %s", e.id, e.Err)
}

// Unwrap returns the underlying error.
func (e ShardError) Unwrap() error {
return e.Err
}

// PartialWriteError indicates a write request could only write a portion of the
// requested values.
type PartialWriteError struct {
Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (s *Store) loadShards(ctx context.Context) error {
err = s.OpenShard(ctx, shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %s", shardID, err)}
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
return
}

Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *Store) OpenShard(ctx context.Context, sh *Shard, force bool) error {
s.badShards.setShardOpenError(sh.ID(), err)
return err
} else {
return oldErr
return fmt.Errorf("not attempting to open shard %d; %w", sh.ID(), oldErr)
}
}

Expand Down
6 changes: 4 additions & 2 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ func TestStore_BadShard(t *testing.T) {
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)

sh := tsdb.NewTempShard(t, idx)
shId := sh.ID()
err := s.OpenShard(context.Background(), sh.Shard, false)
require.NoError(t, err, "opening temp shard")
require.NoError(t, sh.Close(), "closing temporary shard")

s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr))
expErr := errors.New(errStr)
s.SetShardOpenErrorForTest(sh.ID(), expErr)
err2 := s.OpenShard(context.Background(), sh.Shard, false)
require.Error(t, err2, "no error opening bad shard")
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
require.EqualError(t, err2, "opening shard previously failed with: "+errStr)
require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error())

// This should succeed with the force (and because opening an open shard automatically succeeds)
require.NoError(t, s.OpenShard(context.Background(), sh.Shard, true), "forced re-opening previously failing shard")
Expand Down

0 comments on commit cc9bd41

Please sign in to comment.