Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Provide option to compress WAL records
Browse files Browse the repository at this point in the history
In running Prometheus instances, compressing the records was shown to
reduce disk usage by half while incurring a negligible CPU cost.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
  • Loading branch information
csmarchbanks committed Jun 6, 2019
1 parent 149c5dc commit a5ff5d0
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 489 deletions.
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
Expand Down
188 changes: 96 additions & 92 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) {
}

func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
testutil.Ok(t, err)

// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
testutil.Ok(t, err)

// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
defer sr.Close()
// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
testutil.Ok(t, err)

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)
var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())

for r.Next() {
rec := r.Record()
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
defer sr.Close()

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)

for r.Next() {
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
}
}
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
})
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
Expand All @@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.NewSize(nil, nil, dir, 64*1024)
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()
Expand Down
6 changes: 5 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var DefaultOptions = &Options{
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false,
AllowOverlappingBlocks: false,
WALCompression: false,
}

// Options of the DB storage.
Expand Down Expand Up @@ -80,6 +81,9 @@ type Options struct {
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool

// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
}

// Appender allows appending a batch of data. It must be completed with a
Expand Down Expand Up @@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
}()

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000))

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.1.1 // indirect
github.com/golang/protobuf v1.2.0 // indirect
github.com/golang/snappy v0.0.1
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/oklog/ulid v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
Expand Down
Loading

0 comments on commit a5ff5d0

Please sign in to comment.