Skip to content

Commit

Permalink
fix: fix retry code to handle grpc status codes. updated newer stats …
Browse files Browse the repository at this point in the history
…retries to be wrapped with spans (#13592)

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored Jul 20, 2024
1 parent 7232795 commit d3e1edb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
13 changes: 10 additions & 3 deletions pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/dskit/grpcutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand Down Expand Up @@ -97,8 +96,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
return nil, ctx.Err()
}

// Retry if we get a HTTP 500 or an unknown error.
if code := grpcutil.ErrorToStatusCode(err); code == codes.Unknown || code/100 == 5 {
code := grpcutil.ErrorToStatusCode(err)
// Error handling is tricky... There are many places we wrap any error and set an HTTP style status code
// but there are also places where we return an existing GRPC object which will use GRPC status codes
// If the code is < 100 it's a gRPC status code, currently we retry all of these, even codes.Canceled
// because when our pools close connections they do so with a cancel and we want to retry these
// If it's > 100, it's an HTTP code and we only retry 5xx
if code < 100 || code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
Expand All @@ -112,10 +116,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"code", code,
"err", err,
)
bk.Wait()
continue
} else {
level.Warn(util_log.WithContext(ctx, r.log)).Log("msg", "received an error but not a retryable code, this is possibly a bug.", "code", code, "err", err)
}

return nil, err
Expand Down
23 changes: 14 additions & 9 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := mw.Wrap(next)
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace)
statsHandler = rm.Wrap(statsHandler)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
}
splitter := newDefaultSplitter(limits, iqo)

Expand Down Expand Up @@ -559,9 +560,10 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -631,9 +633,10 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -874,9 +877,10 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -1003,9 +1007,10 @@ func NewInstantMetricTripperware(
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down

0 comments on commit d3e1edb

Please sign in to comment.