Skip to content

Commit

Permalink
ProxyStore timeouts.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Sin <asin@ozon.ru>
  • Loading branch information
Aleksey Sin committed Nov 27, 2019
1 parent 4e06e2e commit 7977409
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
150 changes: 106 additions & 44 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand All @@ -29,7 +30,6 @@ import (
type Client interface {
// Client to access the store.
storepb.StoreClient

// LabelSets that each apply to some data exposed by the backing store.
LabelSets() []storepb.LabelSet

Expand All @@ -41,6 +41,20 @@ type Client interface {
Addr() string
}

const WITH_PAYLOAD_LABEL = "with_payload"
const WITHOUT_PAYLOAD_LABEL = "without_payload"

type firstRecvMetrics struct {
withPayload prometheus.Observer
withoutPayload prometheus.Observer
timeoutCount prometheus.Counter
}

type proxyStoreMetrics struct {
firstRecvDuration *prometheus.SummaryVec
timeoutRecvCount *prometheus.CounterVec
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger
Expand All @@ -49,12 +63,37 @@ type ProxyStore struct {
selectorLabels labels.Labels

responseTimeout time.Duration
metrics *proxyStoreMetrics
}

func newProxyStoreMetrics(reg *prometheus.Registry) *proxyStoreMetrics {
var m proxyStoreMetrics

m.firstRecvDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "thanos_proxy_first_recv_duration",
Help: "Time to get first part data from store(ms).",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
MaxAge: 2 * time.Minute,
}, []string{"store", "payload"})

m.timeoutRecvCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_proxy_timeout_recv_count",
Help: "Timeout recv count.",
}, []string{"store"})

if reg != nil {
reg.MustRegister(m.firstRecvDuration)
reg.MustRegister(m.timeoutRecvCount)
}

return &m
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
logger log.Logger,
reg *prometheus.Registry,
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
Expand All @@ -64,12 +103,15 @@ func NewProxyStore(
logger = log.NewNopLogger()
}

metrics := newProxyStoreMetrics(reg)

s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
}
return s
}
Expand Down Expand Up @@ -256,10 +298,16 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
continue
}

metrics := &firstRecvMetrics{
withoutPayload: s.metrics.firstRecvDuration.WithLabelValues(st.Addr(), WITHOUT_PAYLOAD_LABEL),
withPayload: s.metrics.firstRecvDuration.WithLabelValues(st.Addr(), WITH_PAYLOAD_LABEL),
timeoutCount: s.metrics.timeoutRecvCount.WithLabelValues(st.Addr()),
}

seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, metrics))
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -319,6 +367,11 @@ type streamSeriesSet struct {
closeSeries context.CancelFunc
}

type recvResponse struct {
r *storepb.SeriesResponse
err error
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
Expand All @@ -329,6 +382,7 @@ func startStreamSeriesSet(
name string,
partialResponse bool,
responseTimeout time.Duration,
metrics *firstRecvMetrics,
) *streamSeriesSet {
s := &streamSeriesSet{
ctx: ctx,
Expand All @@ -344,11 +398,55 @@ func startStreamSeriesSet(

wg.Add(1)
go func() {
isFirstRecv := true
defer wg.Done()
defer close(s.recvCh)

for {
r, err := s.stream.Recv()
var cancel context.CancelFunc
if s.responseTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, s.responseTimeout)
defer cancel()
}
rCh := make(chan *recvResponse, 1)
var rr *recvResponse
go func() {
t0 := time.Now()
r, err := s.stream.Recv()
if isFirstRecv {
if r.Size() > 0 {
metrics.withPayload.Observe(time.Since(t0).Seconds())
} else {
metrics.withoutPayload.Observe(time.Since(t0).Seconds())
}
isFirstRecv = false
}
rCh <- &recvResponse{r: r, err: err}
}()

select {
case <-ctx.Done():
s.closeSeries()
metrics.timeoutCount.Inc()
var err error
if s.responseTimeout != 0 {
err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name))
} else {
err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name))
}
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
return
case rr = <-rCh:
}
close(rCh)
err := rr.err
r := rr.r

if err == io.EOF {
return
Expand All @@ -371,52 +469,16 @@ func startStreamSeriesSet(
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}

select {
case s.recvCh <- r.GetSeries():
continue
case <-ctx.Done():
return
}

s.recvCh <- r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
s.currSeries, ok = <-s.recvCh
return ok
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading

0 comments on commit 7977409

Please sign in to comment.