Skip to content

Commit

Permalink
e2e: Refactored compactor test; Fixed flakiness. (#2313)
Browse files Browse the repository at this point in the history
Also:

* Reduced number of services for e2e for latency
* Fixed halting
* Improved logging.
* Improved test cases (e.g added test for compaction and halting)


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Mar 25, 2020
1 parent b678671 commit 2262f26
Show file tree
Hide file tree
Showing 14 changed files with 502 additions and 626 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e-ci
run: make test-e2e
15 changes: 3 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,9 @@ test-e2e: docker
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go clean -testcache
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...
# NOTE(bwplotka):
# * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine.
@go test -failfast -timeout 10m -v ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func runCompact(

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
return errors.Wrap(err, "compaction")
}

if !disableDownsampling {
Expand Down
10 changes: 5 additions & 5 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,17 +585,17 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, view bool) error {
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, _ bool) error {
for u, meta := range metas {
labels := meta.Thanos.Labels
l := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
if _, exists := l[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
delete(labels, replicaLabel)
delete(l, replicaLabel)
modified.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
metas[u].Thanos.Labels = l
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ func (s *Syncer) Groups() (res []*Group, err error) {
groupKey := GroupKey(m.Thanos)
g, ok := groups[groupKey]
if !ok {
lbls := labels.FromMap(m.Thanos.Labels)
g, err = newGroup(
log.With(s.logger, "compactionGroup", groupKey),
log.With(s.logger, "compactionGroup", fmt.Sprintf("%d@%v", m.Thanos.Downsample.Resolution, lbls.String()), "compactionGroupKey", groupKey),
s.bkt,
labels.FromMap(m.Thanos.Labels),
lbls,
m.Thanos.Downsample.Resolution,
s.acceptMalformedIndex,
s.enableVerticalCompaction,
Expand Down Expand Up @@ -435,7 +436,7 @@ func (e HaltError) Error() string {
// IsHaltError returns true if the base error is a HaltError.
// If a multierror is passed, any halt error will return true.
func IsHaltError(err error) bool {
if multiErr, ok := err.(terrors.MultiError); ok {
if multiErr, ok := errors.Cause(err).(terrors.MultiError); ok {
for _, err := range multiErr {
if _, ok := errors.Cause(err).(HaltError); ok {
return true
Expand Down Expand Up @@ -581,6 +582,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// Check for overlapped blocks.
overlappingBlocks := false
if err := cg.areBlocksOverlapping(nil); err != nil {
// TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked
// in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check.
if !cg.enableVerticalCompaction {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
}
Expand Down Expand Up @@ -853,7 +856,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
continue
}
}
errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key()))
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestHaltMultiError(t *testing.T) {

errs.Add(haltErr)
testutil.Assert(t, IsHaltError(errs), "if any halt errors are present this should return true")
testutil.Assert(t, IsHaltError(errors.Wrap(errs, "wrap")), "halt error with wrap")

}

func TestRetryMultiError(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string,
}
defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "query body")

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "read query instant response")
}

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var m struct {
Expand All @@ -363,11 +368,6 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string,
Warnings []string `json:"warnings"`
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "read query instant response")
}

if err = json.Unmarshal(body, &m); err != nil {
return nil, nil, errors.Wrap(err, "unmarshal query instant response")
}
Expand Down
112 changes: 42 additions & 70 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package e2eutil

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -333,81 +335,22 @@ func CreateEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels,
return uid, nil
}

// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt)
// Block ID will be created with a delay of time duration blockDelay.
func CreateBlockWithBlockDelay(
// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
if err != nil {
return id, errors.Wrap(err, "block creation")
}

id, err = ulid.New(uint64(time.Unix(int64(blockID.Time()), 0).Add(-blockDelay*1000).Unix()), nil)
if err != nil {
return id, errors.Wrap(err, "create block id")
}

if blockID.Compare(id) == 0 {
return
}

metaFile := path.Join(dir, blockID.String(), "meta.json")
r, err := os.Open(metaFile)
if err != nil {
return id, errors.Wrap(err, "open meta file")
}

metaContent, err := ioutil.ReadAll(r)
if err != nil {
return id, errors.Wrap(err, "read meta file")
}

m := &metadata.Meta{}
if err := json.Unmarshal(metaContent, m); err != nil {
return id, errors.Wrap(err, "meta.json corrupted")
}
m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}

if err := os.MkdirAll(path.Join(dir, id.String()), 0777); err != nil {
return id, errors.Wrap(err, "create directory")
}

err = copyRecursive(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
if err != nil {
return id, errors.Wrap(err, "copy directory")
}

err = os.RemoveAll(path.Join(dir, blockID.String()))
if err != nil {
return id, errors.Wrap(err, "delete directory")
}

jsonMeta, err := json.MarshalIndent(m, "", "\t")
if err != nil {
return id, errors.Wrap(err, "meta marshal")
}

err = ioutil.WriteFile(path.Join(dir, id.String(), "meta.json"), jsonMeta, 0644)
if err != nil {
return id, errors.Wrap(err, "write meta.json file")
}

return
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
}

// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block.
func CreateBlockWithTombstone(
ctx context.Context,
dir string,
series []labels.Labels,
Expand All @@ -416,20 +359,45 @@ func CreateBlock(
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true)
}

// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block.
func CreateBlockWithTombstone(
// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt)
// Block ID will be created with a delay of time duration blockDelay.
func CreateBlockWithBlockDelay(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true)
) (ulid.ULID, error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "block creation")
}

id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create block id")
}

m, err := metadata.Read(path.Join(dir, blockID.String()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "open meta file")
}

m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}

if err := metadata.Write(log.NewNopLogger(), path.Join(dir, blockID.String()), m); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}

return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
}

func createBlock(
Expand Down Expand Up @@ -497,6 +465,10 @@ func createBlock(
return id, errors.Wrap(err, "write block")
}

if id.Compare(ulid.ULID{}) == 0 {
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Expand Down
Loading

0 comments on commit 2262f26

Please sign in to comment.