Skip to content

Commit

Permalink
compact: Add benchmark for #4084 (#4617)
Browse files Browse the repository at this point in the history
* testing: Add a bucket that adds a delay for ops

Create a bucket that simulates (a non jittering) network RTT to be used
for system like benchmarks.

Signed-off-by: Holger Hans Peter Freyther <holger@moiji-mobile.com>

* compact: Add benchmark for GatherNoCompactionMarkFilter

Create a benchmark to show the benefit of adding concurrency
to the no compaction mark filter. Set-up an in-memory bucket
with a simulated (network) delay. In the future we can refactor
this a bit more to test more/all filters.

Fixes: #4084
Signed-off-by: Holger Hans Peter Freyther <holger@moiji-mobile.com>

* Update pkg/compact/compact_test.go

Co-authored-by: Giedrius Statkevičius <giedriuswork@gmail.com>
Signed-off-by: Holger Hans Peter Freyther <holger@moiji-mobile.com>

Co-authored-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
zecke and GiedriusS authored Sep 14, 2021
1 parent 00bb6f7 commit d669cd8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,25 @@
package compact

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -125,3 +137,40 @@ func TestGroupMaxMinTime(t *testing.T) {
testutil.Equals(t, int64(0), g.MinTime())
testutil.Equals(t, int64(30), g.MaxTime())
}

func BenchmarkGatherNoCompactionMarkFilter_Filter(b *testing.B) {
ctx := context.TODO()
logger := log.NewLogfmtLogger(ioutil.Discard)

m := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})

for blocksNum := 10; blocksNum <= 10000; blocksNum *= 10 {
bkt := objstore.NewInMemBucket()

metas := make(map[ulid.ULID]*metadata.Meta, blocksNum)

for i := 0; i < blocksNum; i++ {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ulid.MustNew(uint64(i), nil)
metas[meta.ULID] = &meta

var buf bytes.Buffer
testutil.Ok(b, json.NewEncoder(&buf).Encode(&meta))
testutil.Ok(b, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf))
}

for i := 10; i <= 60; i += 10 {
b.Run(fmt.Sprintf("Bench-%d-%d", blocksNum, i), func(b *testing.B) {
b.ResetTimer()

for n := 0; n <= b.N; n++ {
slowBucket := objstore.WithNoopInstr(objstore.WithDelay(bkt, time.Millisecond*2))
f := NewGatherNoCompactionMarkFilter(logger, slowBucket, i)
testutil.Ok(b, f.Filter(ctx, metas, m))
}
})
}
}

}
59 changes: 59 additions & 0 deletions pkg/objstore/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sort"
Expand Down Expand Up @@ -250,3 +251,61 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
testutil.Ok(t, bkt.Upload(ctx, "obj_6.som", bytes.NewReader(make([]byte, 1024*1024*200))))
testutil.Ok(t, bkt.Delete(ctx, "obj_6.som"))
}

type delayingBucket struct {
bkt Bucket
delay time.Duration
}

func WithDelay(bkt Bucket, delay time.Duration) Bucket {
return &delayingBucket{bkt: bkt, delay: delay}
}

func (d *delayingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
time.Sleep(d.delay)
return d.bkt.Get(ctx, name)
}

func (d *delayingBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
time.Sleep(d.delay)
return d.bkt.Attributes(ctx, name)
}

func (d *delayingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
time.Sleep(d.delay)
return d.bkt.Iter(ctx, dir, f, options...)
}

func (d *delayingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
time.Sleep(d.delay)
return d.bkt.GetRange(ctx, name, off, length)
}

func (d *delayingBucket) Exists(ctx context.Context, name string) (bool, error) {
time.Sleep(d.delay)
return d.bkt.Exists(ctx, name)
}

func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
time.Sleep(d.delay)
return d.bkt.Upload(ctx, name, r)
}

func (d *delayingBucket) Delete(ctx context.Context, name string) error {
time.Sleep(d.delay)
return d.bkt.Delete(ctx, name)
}

func (d *delayingBucket) Name() string {
time.Sleep(d.delay)
return d.bkt.Name()
}

func (d *delayingBucket) Close() error {
// No delay for a local operation.
return d.bkt.Close()
}
func (d *delayingBucket) IsObjNotFoundErr(err error) bool {
// No delay for a local operation.
return d.bkt.IsObjNotFoundErr(err)
}

0 comments on commit d669cd8

Please sign in to comment.