Skip to content

Commit

Permalink
Quick fix for leaks.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jul 9, 2020
1 parent 0062d9d commit f070bc1
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors.
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive,Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors.

### Changed

Expand Down
76 changes: 42 additions & 34 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ type tenantSeriesSetServer struct {

ctx context.Context

warnCh warnSender
recv chan *storepb.Series
cur *storepb.Series
directCh directSender
recv chan *storepb.Series
cur *storepb.Series

err error
tenant string
Expand All @@ -103,13 +103,13 @@ type tenantSeriesSetServer struct {
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
warnCh warnSender,
directCh directSender,
) *tenantSeriesSetServer {
return &tenantSeriesSetServer{
ctx: ctx,
tenant: tenant,
warnCh: warnCh,
recv: make(chan *storepb.Series),
ctx: ctx,
tenant: tenant,
directCh: directCh,
recv: make(chan *storepb.Series),
}
}

Expand All @@ -120,27 +120,30 @@ func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.Ser
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
err = store.Series(r, s)
})

if err != nil {
if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant)
} else {
// Consistently prefix tenant specific warnings as done in various other places.
err = errors.New(prefixTenantWarning(s.tenant, err.Error()))
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
s.directCh.send(storepb.NewWarnSeriesResponse(err))
}
}

close(s.recv)
}

func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
s.recv <- &storepb.Series{
Labels: series.Labels,
Chunks: chunks,
if series == nil {
// Proxy non series responses directly to client
s.directCh.send(r)
return nil
}
// For series, pass it to our AggChunkSeriesSet.
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- series:
}
return nil
}
Expand All @@ -157,37 +160,39 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
return s.cur.Labels, s.cur.Chunks
}

func (s *tenantSeriesSetServer) Err() error {
return s.err
}
func (s *tenantSeriesSetServer) Err() error { return s.err }

// Series returns all series for a requested time range and label matcher. The
// returned data may exceed the requested time bounds. The data returned may
// have been read and merged from multiple underlying TSDBStore instances.
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series")
defer span.Finish()

stores := s.tsdbStores()
if len(stores) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())
span, ctx = tracing.StartSpan(gctx, "multitsdb_series")
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)
defer span.Finish()
g, gctx := errgroup.WithContext(ctx)

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancellableRespChannel(gctx, 10)

g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
// When this go routine finishes or is cancelled, respCh channel is closed.

var (
seriesSet []storepb.SeriesSet
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
close(respCh)
}()

for tenant, store := range stores {
Expand All @@ -214,13 +219,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
g.Go(func() error {
// Go routine for gathering merged responses and sending them over to client. It stops when
// respCh channel is closed OR on error from client.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
}

return nil
})
return g.Wait()
}

Expand Down
53 changes: 32 additions & 21 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,23 @@ func mergeLabels(a []storepb.Label, b labels.Labels) []storepb.Label {
return res
}

type ctxRespSender struct {
// cancellableRespSender is a response channel that does need to be exhausted on cancel.
type cancellableRespSender struct {
ctx context.Context
ch chan<- *storepb.SeriesResponse
}

func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) {
func newCancellableRespChannel(ctx context.Context, buffer int) (*cancellableRespSender, chan *storepb.SeriesResponse) {
respCh := make(chan *storepb.SeriesResponse, buffer)
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) }
return &cancellableRespSender{ctx: ctx, ch: respCh}, respCh
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
s.ch <- r
// send or returns on cancel.
func (s cancellableRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
case s.ch <- r:
}
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
Expand All @@ -213,15 +218,17 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

var (
g, gctx = errgroup.WithContext(srv.Context())
g, gctx := errgroup.WithContext(srv.Context())

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancellableRespChannel(gctx, 10)

g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
// When this go routine finishes or is cancelled, respCh channel is closed.

var (
seriesSet []storepb.SeriesSet
storeDebugMsgs []string
Expand All @@ -239,7 +246,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe

defer func() {
wg.Wait()
closeFn()
close(respCh)
}()

for _, st := range s.stores() {
Expand Down Expand Up @@ -306,21 +313,25 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
g.Go(func() error {
// Go routine for gathering merged responses and sending them over to client. It stops when
// respCh channel is closed OR on error from client.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
}

return nil
})
if err := g.Wait(); err != nil {
// TODO(bwplotka): Replace with request logger.
level.Error(s.logger).Log("err", err)
return err
}
return nil
}

type warnSender interface {
type directSender interface {
send(*storepb.SeriesResponse)
}

Expand All @@ -331,7 +342,7 @@ type streamSeriesSet struct {
logger log.Logger

stream storepb.Store_SeriesClient
warnCh warnSender
warnCh directSender

currSeries *storepb.Series
recvCh chan *storepb.Series
Expand Down Expand Up @@ -367,7 +378,7 @@ func startStreamSeriesSet(
closeSeries context.CancelFunc,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh warnSender,
warnCh directSender,
name string,
partialResponse bool,
responseTimeout time.Duration,
Expand Down

0 comments on commit f070bc1

Please sign in to comment.