Skip to content

Commit

Permalink
Add store.receive-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Mar 15, 2019
1 parent 51ff267 commit b36db84
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 19 deletions.
8 changes: 7 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified.").
Default("true").Bool()

storeReceiveTimeout := modelDuration(cmd.Flag("store.receive-timeout", "Maximum time to wait for any data from store. If Store doesn't send any data any storeReceiveTimeout the Store will be igored and partial data will be returned.").
Default("200ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
Expand Down Expand Up @@ -119,6 +122,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
fileSD = file.NewDiscovery(conf, logger)
}

storeReceiveMaxDuration := time.Duration(*storeReceiveTimeout)
return runQuery(
g,
logger,
Expand All @@ -139,6 +143,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*webPrefixHeaderName,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
storeReceiveMaxDuration,
*replicaLabel,
peer,
selectorLset,
Expand Down Expand Up @@ -254,6 +259,7 @@ func runQuery(
webPrefixHeaderName string,
maxConcurrentQueries int,
queryTimeout time.Duration,
storeReceiveTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
Expand Down Expand Up @@ -304,7 +310,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeReceiveTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,10 @@ Flags:
if no max_source_resolution param is specified.
--query.partial-response Enable partial response for queries if no
partial_response param is specified.
--store.receive-timeout=200ms
Maximum time to wait for any data from store.
If Store doesn't send any data any
storeReceiveTimeout the Store will be igored
and partial data will be returned.
```
105 changes: 88 additions & 17 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -40,6 +41,8 @@ type ProxyStore struct {
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels

receiveTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
Expand All @@ -49,6 +52,7 @@ func NewProxyStore(
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
receiveTimeout time.Duration,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -58,6 +62,7 @@ func NewProxyStore(
stores: stores,
component: component,
selectorLabels: selectorLabels,
receiveTimeout: receiveTimeout,
}
return s
}
Expand Down Expand Up @@ -147,7 +152,11 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

sc, err := st.Series(gctx, r)
//This is used to cancel this stream when one operations takes too long
seriesCtx, closeSeries := context.WithCancel(gctx)
defer closeSeries()

sc, err := st.Series(seriesCtx, r)
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
Expand All @@ -159,15 +168,15 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return err
}
respSender.send(storepb.NewWarnSeriesResponse(err))

continue
}

// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled))
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.receiveTimeout))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))

if len(seriesSet) == 0 {
// This is indicates that configured StoreAPIs are not the ones end user expects
err := errors.New("No store matched for this query")
Expand All @@ -177,17 +186,37 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}

mergedSet := storepb.MergeSeriesSets(seriesSet...)

for mergedSet.Next() {
var series storepb.Series

series.Labels, series.Chunks = mergedSet.At()

respSender.send(storepb.NewSeriesResponse(&series))
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
//Cancel the sending, if context is canceled.
forLoop:
for {
select {
case resp, ok := <-respRecv:
if !ok {
break forLoop
}
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
level.Info(s.logger).Log("msg", "sending series done")
case <-gctx.Done():
err := errors.Wrap(gctx.Err(), "sending series")
level.Info(s.logger).Log("msg", "sending series context timed out", "err", err)
if r.PartialResponseDisabled {
return err
}
//TODO(povilasv): check if actually sent smth, if response is 0 then err for partialResponse case.
break forLoop
}
}

Expand All @@ -196,7 +225,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return err
}
return nil

}

type warnSender interface {
Expand All @@ -206,6 +234,8 @@ type warnSender interface {
// streamSeriesSet iterates over incoming stream of series.
// All errors are sent out of band via warning channel.
type streamSeriesSet struct {
ctx context.Context

stream storepb.Store_SeriesClient
warnCh warnSender

Expand All @@ -215,28 +245,39 @@ type streamSeriesSet struct {
errMtx sync.Mutex
err error

name string
name string
partialResponse bool

receiveTimeout time.Duration
closeSeries context.CancelFunc
}

func startStreamSeriesSet(
ctx context.Context,
closeSeries context.CancelFunc,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh warnSender,
name string,
partialResponse bool,
receiveTimeout time.Duration,
) *streamSeriesSet {
s := &streamSeriesSet{
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
ctx: ctx,
closeSeries: closeSeries,
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
partialResponse: partialResponse,
receiveTimeout: receiveTimeout,
}

wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)

for {
r, err := s.stream.Recv()
if err == io.EOF {
Expand All @@ -249,13 +290,13 @@ func startStreamSeriesSet(

if err != nil {
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")))
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrapf(err, "receive series from %s:", s.name)))
return
}

s.errMtx.Lock()
defer s.errMtx.Unlock()
s.err = err
s.errMtx.Unlock()
return
}

Expand All @@ -269,10 +310,40 @@ func startStreamSeriesSet(
return s
}

// Next blocks until new message is received or stream is closed.
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
//Some times GRPC streams get stuck and don't send any data for long periods of time
//E.G. Simple GRPC Store API, which just sleeps.
//
//Right now the only option to time them out in GRPC is to set context.WithTimeout()
// The issue is that it is a global timeout,
// if we set it to 10s, we won't ever get results from Thanos Store (backed by object store)
// if we set it to 120s, we will always be waiting 120s for the HTTP queries to finish, which is :/
// As we will be waiting for the slowest one to finish

// This fix adds a timeout per single operation
// This way we identifies stuck connection meaning it will only timeout a GRPC store,
// which hasn't put data into recvCh in X seconds
ctx, done := context.WithTimeout(s.ctx, s.receiveTimeout)
defer done()
select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
//shutdown a goroutine in startStreamSeriesSet
s.closeSeries()

err := errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s:", s.receiveTimeout.String(), s.name)
if s.partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

return false
}
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProxyStore_Info(t *testing.T) {
q := NewProxyStore(nil,
func() []Client { return nil },
component.Query,
nil,
nil, 1*time.Second,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -405,6 +405,7 @@ func TestProxyStore_Series(t *testing.T) {
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
1*time.Second,
)

s := newStoreSeriesServer(context.Background())
Expand Down Expand Up @@ -446,6 +447,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
func() []Client { return cls },
component.Query,
nil,
1*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -504,6 +506,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
func() []Client { return cls },
component.Query,
tlabels.FromStrings("fed", "a"),
1*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -541,6 +544,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
func() []Client { return cls },
component.Query,
nil,
1*time.Second,
)

ctx := context.Background()
Expand Down

0 comments on commit b36db84

Please sign in to comment.