diff --git a/pkg/receive/tsdb.go b/pkg/receive/tsdb.go index b2f504a535..16fdae6e8f 100644 --- a/pkg/receive/tsdb.go +++ b/pkg/receive/tsdb.go @@ -1,6 +1,8 @@ package receive import ( + "os" + "path/filepath" "sync" "github.com/go-kit/kit/log" @@ -86,6 +88,9 @@ func (f *FlushableStorage) Flush() error { if err := ro.FlushWAL(f.Dir()); err != nil { return errors.Wrap(err, "flushing WAL") } + if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil { + return errors.Wrap(err, "removing stale WAL") + } if reopen { return errors.Wrap(f.open(), "re-starting storage") } diff --git a/pkg/receive/tsdb_test.go b/pkg/receive/tsdb_test.go new file mode 100644 index 0000000000..cd9ed3651e --- /dev/null +++ b/pkg/receive/tsdb_test.go @@ -0,0 +1,80 @@ +package receive + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/tsdb" + "github.com/prometheus/prometheus/tsdb/labels" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestFlushableStorage(t *testing.T) { + { + // Ensure that flushing storage does not cause data loss. + // This test: + // * opens a flushable storage; + // * appends values; + // * flushes the storage; and + // * queries the storage to ensure the samples are present. + + dbDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }() + + tsdbCfg := &tsdb.Options{ + RetentionDuration: model.Duration(time.Hour * 24 * 15), + NoLockfile: true, + MinBlockDuration: model.Duration(time.Hour * 2), + MaxBlockDuration: model.Duration(time.Hour * 2), + WALCompression: true, + } + + db := NewFlushableStorage( + dbDir, + log.NewNopLogger(), + prometheus.NewRegistry(), + tsdbCfg, + ) + + testutil.Ok(t, db.Open()) + defer func() { testutil.Ok(t, db.Close()) }() + + // Append data to the WAL. + app := db.Appender() + maxt := 1000 + for i := 0; i < maxt; i++ { + _, err := app.Add(labels.FromStrings("thanos", "flush"), int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Flush the WAL. + testutil.Ok(t, db.Flush()) + + querier, err := db.Querier(0, int64(maxt)-1) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum the values. + seriesSet, err := querier.Select(labels.NewEqualMatcher("thanos", "flush")) + testutil.Ok(t, err) + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + testutil.Equals(t, 1000.0, sum) + } +}