Skip to content

Commit

Permalink
fix(tsm1): "snapshot in progress" error during backup
Browse files Browse the repository at this point in the history
Test the skipCacheOk flag to tsdb.Shard.CreateSnapshot() and
tsdb.Engine.CreateSnapshot()
A value of true allows the backup to proceed even if a cache
snapshot cannot be taken.

influxdata/plutonium#3227
(cherry picked from commit 0dcff81)
  • Loading branch information
davidby-influx committed Nov 13, 2020
1 parent 196f600 commit 07a9c0e
Showing 1 changed file with 116 additions and 0 deletions.
116 changes: 116 additions & 0 deletions tsdb/engine/tsm1/engine_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tsm1

import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"
)

func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on windows")
}

tmpDir, err := ioutil.TempDir("", "shard_test")
if err != nil {
t.Fatalf("error creating temporary directory: %s", err.Error())
}
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")

sfile := NewSeriesFile(tmpDir)
defer sfile.Close()

opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile)
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})

sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

points := make([]models.Point, 0, 10000)
for i := 0; i < cap(points); i++ {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
}
err = sh.WritePoints(points)
if err != nil {
t.Fatalf(err.Error())
}

engineInterface, err := sh.Engine()
if err != nil {
t.Fatalf("error retrieving shard.Engine(): %s", err.Error())
}

// Get the struct underlying the interface. Not a recommended practice.
realEngineStruct, ok := (engineInterface).(*Engine)
if !ok {
t.Log("Engine type does not permit simulating Cache race conditions")
return
}
// fake a race condition in snapshotting the cache.
realEngineStruct.Cache.snapshotting = true
defer func() {
realEngineStruct.Cache.snapshotting = false
}()

snapshotFunc := func(skipCacheOk bool) {
if f, err := sh.CreateSnapshot(skipCacheOk); err == nil {
if err = os.RemoveAll(f); err != nil {
t.Fatalf("Failed to clean up in TestEngine_ConcurrentShardSnapshots: %s", err.Error())
}
} else if err == ErrSnapshotInProgress {
if skipCacheOk {
t.Fatalf("failing to ignore this error,: %s", err.Error())
}
} else if err != nil {
t.Fatalf("error creating shard snapshot: %s", err.Error())
}
}

// Permit skipping cache in the snapshot
snapshotFunc(true)
// do not permit skipping the cache in the snapshot
snapshotFunc(false)
realEngineStruct.Cache.snapshotting = false
}

// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
if err != nil {
panic(err)
}
f := tsdb.NewSeriesFile(dir)
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}
return f
}

type seriesIDSets []*tsdb.SeriesIDSet

func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}

0 comments on commit 07a9c0e

Please sign in to comment.