Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Fixed leak on receive and querier proxying Store API Series, which was leaking on errors. #2866

Merged
merged 5 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors.

### Changed

Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
return merr.Err()
}

func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*store.TSDBStore, len(t.tenants))
res := make(map[string]storepb.StoreServer, len(t.tenants))
for k, tenant := range t.tenants {
s := tenant.store()
if s != nil {
Expand Down
92 changes: 54 additions & 38 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
)

// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
type MultiTSDBStore struct {
logger log.Logger
component component.SourceStoreAPI
tsdbStores func() map[string]*TSDBStore
tsdbStores func() map[string]storepb.StoreServer
}

// NewMultiTSDBStore creates a new MultiTSDBStore.
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -89,59 +90,70 @@ type tenantSeriesSetServer struct {

ctx context.Context

warnCh warnSender
recv chan *storepb.Series
cur *storepb.Series
directCh directSender
recv chan *storepb.Series
cur *storepb.Series

err error
tenant string
}

// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
// Details https://github.com/thanos-io/thanos/issues/2864.
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
warnCh warnSender,
directCh directSender,
) *tenantSeriesSetServer {
return &tenantSeriesSetServer{
ctx: ctx,
tenant: tenant,
warnCh: warnCh,
recv: make(chan *storepb.Series),
ctx: ctx,
tenant: tenant,
directCh: directCh,
recv: make(chan *storepb.Series),
}
}

func (s *tenantSeriesSetServer) Context() context.Context {
return s.ctx
}
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }

func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
var err error
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
err = store.Series(r, s)
})

if err != nil {
if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant)
} else {
// Consistently prefix tenant specific warnings as done in various other places.
err = errors.New(prefixTenantWarning(s.tenant, err.Error()))
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
s.directCh.send(storepb.NewWarnSeriesResponse(err))
}
}

close(s.recv)
}

func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
if series == nil {
// Proxy non series responses directly to client
s.directCh.send(r)
return nil
}

// TODO(bwplotka): Consider avoid copying / learn why it has to copied.
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
s.recv <- &storepb.Series{

// For series, pass it to our AggChunkSeriesSet.
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- &storepb.Series{
Labels: series.Labels,
Chunks: chunks,
}:
return nil
}
return nil
}

func (s *tenantSeriesSetServer) Next() (ok bool) {
Expand All @@ -156,37 +168,39 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
return s.cur.Labels, s.cur.Chunks
}

func (s *tenantSeriesSetServer) Err() error {
return s.err
}
func (s *tenantSeriesSetServer) Err() error { return s.err }

// Series returns all series for a requested time range and label matcher. The
// returned data may exceed the requested time bounds. The data returned may
// have been read and merged from multiple underlying TSDBStore instances.
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series")
defer span.Finish()

stores := s.tsdbStores()
if len(stores) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())
span, ctx = tracing.StartSpan(gctx, "multitsdb_series")
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)
defer span.Finish()
g, gctx := errgroup.WithContext(ctx)

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancelableRespChannel(gctx, 10)

g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
// When this go routine finishes or is canceled, respCh channel is closed.

var (
seriesSet []storepb.SeriesSet
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
close(respCh)
}()

for tenant, store := range stores {
Expand All @@ -202,7 +216,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
defer wg.Done()
ss.Series(store, r)
}()

seriesSet = append(seriesSet, ss)
}

Expand All @@ -214,13 +227,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
g.Go(func() error {
// Go routine for gathering merged responses and sending them over to client. It stops when
// respCh channel is closed OR on error from client.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
}

return nil
})
return g.Wait()
}

Expand Down
140 changes: 138 additions & 2 deletions pkg/store/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
package store

import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand All @@ -21,6 +25,8 @@ import (
)

func TestMultiTSDBSeries(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
if ok := t.Run("headOnly", func(t testutil.TB) {
Expand Down Expand Up @@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
}

tsdbs := map[string]*TSDBStore{}
tsdbs := map[string]storepb.StoreServer{}
for i, db := range dbs {
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger, maxSamplesPerChunk: 120} // On production we have math.MaxInt64
}

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
lastLabels := storepb.Series{}
Expand Down Expand Up @@ -154,3 +160,133 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
},
)
}

type mockedStoreServer struct {
storepb.StoreServer

responses []*storepb.SeriesResponse
}

func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
for _, r := range m.responses {
if err := server.Send(r); err != nil {
return err
}
}
return nil
}

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
t.Run("exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

s := newTenantSeriesSetServer(context.Background(), "a", nil)

resps := []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}

m := &mockedStoreServer{responses: resps}

go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
i := 0
for s.Next() {
l, c := s.At()

testutil.Equals(t, resps[i].GetSeries().Labels, l)
testutil.Equals(t, resps[i].GetSeries().Chunks, c)

i++
}
testutil.Ok(t, s.Err())
testutil.Equals(t, 3, i)
})

t.Run("canceled, not exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx, cancel := context.WithCancel(context.Background())
s := newTenantSeriesSetServer(ctx, "a", nil)

m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}}
go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
testutil.Equals(t, true, s.Next())
cancel()
})
}

type mockedSeriesServer struct {
storepb.Store_SeriesServer
ctx context.Context

send func(*storepb.SeriesResponse) error
}

func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error {
return s.send(r)
}
func (s *mockedSeriesServer) Context() context.Context { return s.ctx }

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted.
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
return map[string]storepb.StoreServer{
// Ensure more than 10 (internal respCh channel).
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
}
})

t.Run("failing send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
})
}
Loading