From 48a8fb6e2f6a476bcffa508d6609a19847c695ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Wed, 16 Oct 2019 11:31:44 +0200 Subject: [PATCH] pkg/receive: remove flushed WAL (#1654) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit ensures that we delete the WAL after it has been flushed to a block. Flushing the WAL simply creates a block but does not remove the WAL directory or its contents. This means that once the DB is re-opened, new samples are added to the same WAL. Flushing the WAL again does not result in blocks with overlapping time ranges because the flushing logic guards against this (https://github.com/prometheus/prometheus/blob/master/tsdb/db.go#L300). Nevertheless, we should delete the WAL after flushing it to ensure that flushed samples are not needlessly re-processed. Also, once multi-TSDB support is added, holding old samples in the WAL could cause problems. Signed-off-by: Lucas Servén Marín --- pkg/receive/tsdb.go | 5 +++ pkg/receive/tsdb_test.go | 80 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 pkg/receive/tsdb_test.go diff --git a/pkg/receive/tsdb.go b/pkg/receive/tsdb.go index b2f504a535..16fdae6e8f 100644 --- a/pkg/receive/tsdb.go +++ b/pkg/receive/tsdb.go @@ -1,6 +1,8 @@ package receive import ( + "os" + "path/filepath" "sync" "github.com/go-kit/kit/log" @@ -86,6 +88,9 @@ func (f *FlushableStorage) Flush() error { if err := ro.FlushWAL(f.Dir()); err != nil { return errors.Wrap(err, "flushing WAL") } + if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil { + return errors.Wrap(err, "removing stale WAL") + } if reopen { return errors.Wrap(f.open(), "re-starting storage") } diff --git a/pkg/receive/tsdb_test.go b/pkg/receive/tsdb_test.go new file mode 100644 index 0000000000..cd9ed3651e --- /dev/null +++ b/pkg/receive/tsdb_test.go @@ -0,0 +1,80 @@ +package receive + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/tsdb" + "github.com/prometheus/prometheus/tsdb/labels" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestFlushableStorage(t *testing.T) { + { + // Ensure that flushing storage does not cause data loss. + // This test: + // * opens a flushable storage; + // * appends values; + // * flushes the storage; and + // * queries the storage to ensure the samples are present. + + dbDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }() + + tsdbCfg := &tsdb.Options{ + RetentionDuration: model.Duration(time.Hour * 24 * 15), + NoLockfile: true, + MinBlockDuration: model.Duration(time.Hour * 2), + MaxBlockDuration: model.Duration(time.Hour * 2), + WALCompression: true, + } + + db := NewFlushableStorage( + dbDir, + log.NewNopLogger(), + prometheus.NewRegistry(), + tsdbCfg, + ) + + testutil.Ok(t, db.Open()) + defer func() { testutil.Ok(t, db.Close()) }() + + // Append data to the WAL. + app := db.Appender() + maxt := 1000 + for i := 0; i < maxt; i++ { + _, err := app.Add(labels.FromStrings("thanos", "flush"), int64(i), 1.0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Flush the WAL. + testutil.Ok(t, db.Flush()) + + querier, err := db.Querier(0, int64(maxt)-1) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, querier.Close()) }() + + // Sum the values. + seriesSet, err := querier.Select(labels.NewEqualMatcher("thanos", "flush")) + testutil.Ok(t, err) + sum := 0.0 + for seriesSet.Next() { + series := seriesSet.At().Iterator() + for series.Next() { + _, v := series.At() + sum += v + } + testutil.Ok(t, series.Err()) + } + testutil.Ok(t, seriesSet.Err()) + testutil.Equals(t, 1000.0, sum) + } +}