Skip to content

Commit

Permalink
pkg/receive: remove flushed WAL (#1654)
Browse files Browse the repository at this point in the history
This commit ensures that we delete the WAL after it has been flushed to
a block. Flushing the WAL simply creates a block but does not remove the
WAL directory or its contents. This means that once the DB is re-opened,
new samples are added to the same WAL. Flushing the WAL again does not
result in blocks with overlapping time ranges because the flushing logic
guards against this
(https://github.com/prometheus/prometheus/blob/master/tsdb/db.go#L300).
Nevertheless, we should delete the WAL after flushing it to ensure that
flushed samples are not needlessly re-processed. Also, once multi-TSDB
support is added, holding old samples in the WAL could cause problems.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat authored and brancz committed Oct 16, 2019
1 parent fb0db63 commit 48a8fb6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/receive/tsdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package receive

import (
"os"
"path/filepath"
"sync"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -86,6 +88,9 @@ func (f *FlushableStorage) Flush() error {
if err := ro.FlushWAL(f.Dir()); err != nil {
return errors.Wrap(err, "flushing WAL")
}
if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil {
return errors.Wrap(err, "removing stale WAL")
}
if reopen {
return errors.Wrap(f.open(), "re-starting storage")
}
Expand Down
80 changes: 80 additions & 0 deletions pkg/receive/tsdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package receive

import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestFlushableStorage(t *testing.T) {
{
// Ensure that flushing storage does not cause data loss.
// This test:
// * opens a flushable storage;
// * appends values;
// * flushes the storage; and
// * queries the storage to ensure the samples are present.

dbDir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }()

tsdbCfg := &tsdb.Options{
RetentionDuration: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
MinBlockDuration: model.Duration(time.Hour * 2),
MaxBlockDuration: model.Duration(time.Hour * 2),
WALCompression: true,
}

db := NewFlushableStorage(
dbDir,
log.NewNopLogger(),
prometheus.NewRegistry(),
tsdbCfg,
)

testutil.Ok(t, db.Open())
defer func() { testutil.Ok(t, db.Close()) }()

// Append data to the WAL.
app := db.Appender()
maxt := 1000
for i := 0; i < maxt; i++ {
_, err := app.Add(labels.FromStrings("thanos", "flush"), int64(i), 1.0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

// Flush the WAL.
testutil.Ok(t, db.Flush())

querier, err := db.Querier(0, int64(maxt)-1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()

// Sum the values.
seriesSet, err := querier.Select(labels.NewEqualMatcher("thanos", "flush"))
testutil.Ok(t, err)
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
for series.Next() {
_, v := series.At()
sum += v
}
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 1000.0, sum)
}
}

0 comments on commit 48a8fb6

Please sign in to comment.