diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fb551c7b961..88e434d28dbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [4860](https://github.com/grafana/loki/pull/4860) **cyriltovena**: Add rate limiting and metrics to hedging * [4865](https://github.com/grafana/loki/pull/4865) **taisho6339**: Fix duplicate registry.MustRegister call in Promtail Kafka * [4845](https://github.com/grafana/loki/pull/4845) **chaudum** Return error responses consistently as JSON * [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests. diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 164c3b2d7b25..343af576179c 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -872,8 +872,12 @@ You should configure the latency based on your p99 of object store requests. # used with queriers and has minimal to no impact on other pieces. [at: | default = 0] # Optional. Default is 2 -# The maximum amount of requests to be issued. +# The maximum amount of hedge requests to be issued for a given request. [up_to: | default = 2] +# Optional. Default is 5 +# The maximum amount of hedged requests to be issued per seconds. +[max_per_second: | default = 5] + ``` ## local_storage_config diff --git a/go.mod b/go.mod index 8efc265b594b..2802df81ebdc 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab - github.com/cristalhq/hedgedhttp v0.6.1 + github.com/cristalhq/hedgedhttp v0.6.2 github.com/davecgh/go-spew v1.1.1 github.com/docker/docker v20.10.11+incompatible github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8 diff --git a/go.sum b/go.sum index d78675ad6c28..c754551c963f 100644 --- a/go.sum +++ b/go.sum @@ -482,8 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA= -github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI= +github.com/cristalhq/hedgedhttp v0.6.2 h1:aWnUOzqPaM8/dgmPLR7wl0AoFOPYnqgdhTkcWgWUgpA= +github.com/cristalhq/hedgedhttp v0.6.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI= github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index ef6db2cafd1a..68683afdf5dc 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -316,7 +316,7 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S } if hedging { - httpClient = hedgingCfg.Client(httpClient) + httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer)) } s3Config = s3Config.WithHTTPClient(httpClient) diff --git a/pkg/storage/chunk/aws/s3_storage_client_test.go b/pkg/storage/chunk/aws/s3_storage_client_test.go index ad84691a0261..65dd8fc9b0a5 100644 --- a/pkg/storage/chunk/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/aws/s3_storage_client_test.go @@ -138,8 +138,9 @@ func Test_Hedging(t *testing.T) { }) }, }, hedging.Config{ - At: tc.hedgeAt, - UpTo: tc.upTo, + At: tc.hedgeAt, + UpTo: tc.upTo, + MaxPerSecond: 1000, }) require.NoError(t, err) tc.do(c) diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index e571a4c3c6dd..66523e5ef79c 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/mattn/go-ieproxy" + "github.com/prometheus/client_golang/prometheus" cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure" "github.com/cortexproject/cortex/pkg/util" @@ -57,23 +58,25 @@ var ( } // default Azure http client. - defaultClient = &http.Client{ - Transport: &http.Transport{ - Proxy: ieproxy.GetProxyFunc(), - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - DualStack: true, - }).Dial, - MaxIdleConns: 0, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - DisableKeepAlives: false, - DisableCompression: false, - MaxResponseHeaderBytes: 0, - }, + defaultClientFactory = func() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: ieproxy.GetProxyFunc(), + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).Dial, + MaxIdleConns: 0, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DisableKeepAlives: false, + DisableCompression: false, + MaxResponseHeaderBytes: 0, + }, + } } ) @@ -132,20 +135,30 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig // Implements ObjectStorage type BlobStorage struct { // blobService storage.Serv - cfg *BlobStorageConfig - hedgingCfg hedging.Config + cfg *BlobStorageConfig + containerURL azblob.ContainerURL + + pipeline pipeline.Pipeline + hedgingPipeline pipeline.Pipeline } // NewBlobStorage creates a new instance of the BlobStorage struct. func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) { log.WarnExperimentalUse("Azure Blob Storage") blobStorage := &BlobStorage{ - cfg: cfg, - hedgingCfg: hedgingCfg, + cfg: cfg, } - - var err error + pipeline, err := blobStorage.newPipeline(hedgingCfg, false) + if err != nil { + return nil, err + } + blobStorage.pipeline = pipeline + hedgingPipeline, err := blobStorage.newPipeline(hedgingCfg, true) + if err != nil { + return nil, err + } + blobStorage.hedgingPipeline = hedgingPipeline blobStorage.containerURL, err = blobStorage.buildContainerURL() if err != nil { return nil, err @@ -210,13 +223,12 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU if err != nil { return azblob.BlockBlobURL{}, err } - - azPipeline, err := b.newPipeline(hedging) - if err != nil { - return azblob.BlockBlobURL{}, err + pipeline := b.pipeline + if hedging { + pipeline = b.hedgingPipeline } - return azblob.NewBlockBlobURL(*u, azPipeline), nil + return azblob.NewBlockBlobURL(*u, pipeline), nil } func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { @@ -225,15 +237,10 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { return azblob.ContainerURL{}, err } - azPipeline, err := b.newPipeline(false) - if err != nil { - return azblob.ContainerURL{}, err - } - - return azblob.NewContainerURL(*u, azPipeline), nil + return azblob.NewContainerURL(*u, b.pipeline), nil } -func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) { +func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) { credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value) if err != nil { return nil, err @@ -248,17 +255,18 @@ func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) { MaxRetryDelay: b.cfg.MaxRetryDelay, }, } + client := defaultClientFactory() opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { - resp, err := defaultClient.Do(request.WithContext(ctx)) + resp, err := client.Do(request.WithContext(ctx)) return pipeline.NewHTTPResponse(resp), err } }) if hedging { opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { - client := b.hedgingCfg.Client(defaultClient) + client := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer)) return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { resp, err := client.Do(request.WithContext(ctx)) return pipeline.NewHTTPResponse(resp), err diff --git a/pkg/storage/chunk/azure/blob_storage_client_test.go b/pkg/storage/chunk/azure/blob_storage_client_test.go index 491ce0b3eb31..2ad8a4bd3699 100644 --- a/pkg/storage/chunk/azure/blob_storage_client_test.go +++ b/pkg/storage/chunk/azure/blob_storage_client_test.go @@ -62,20 +62,23 @@ func Test_Hedging(t *testing.T) { t.Run(tc.name, func(t *testing.T) { count := atomic.NewInt32(0) // hijack the client to count the number of calls - defaultClient = &http.Client{ - Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - count.Inc() - time.Sleep(200 * time.Millisecond) - return nil, errors.New("fo") - }), + defaultClientFactory = func() *http.Client { + return &http.Client{ + Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return nil, errors.New("fo") + }), + } } c, err := NewBlobStorage(&BlobStorageConfig{ ContainerName: "foo", Environment: azureGlobal, MaxRetries: 1, }, hedging.Config{ - At: tc.hedgeAt, - UpTo: tc.upTo, + At: tc.hedgeAt, + UpTo: tc.upTo, + MaxPerSecond: 1000, }) require.NoError(t, err) tc.do(c) diff --git a/pkg/storage/chunk/gcp/gcs_object_client.go b/pkg/storage/chunk/gcp/gcs_object_client.go index 416fe243a7f4..066e4ff4e3b6 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/gcp/gcs_object_client.go @@ -9,6 +9,7 @@ import ( "cloud.google.com/go/storage" cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -87,7 +88,7 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf } if hedging { - httpClient = hedgingCfg.Client(httpClient) + httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer)) } opts = append(opts, option.WithHTTPClient(httpClient)) diff --git a/pkg/storage/chunk/gcp/gcs_object_client_test.go b/pkg/storage/chunk/gcp/gcs_object_client_test.go index 68104aa3d656..cabaaf6b9727 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client_test.go +++ b/pkg/storage/chunk/gcp/gcs_object_client_test.go @@ -63,8 +63,9 @@ func Test_Hedging(t *testing.T) { BucketName: "test-bucket", Insecure: true, }, hedging.Config{ - At: tc.hedgeAt, - UpTo: tc.upTo, + At: tc.hedgeAt, + UpTo: tc.upTo, + MaxPerSecond: 1000, }, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) { opts = append(opts, option.WithEndpoint(server.URL)) opts = append(opts, option.WithoutAuthentication()) diff --git a/pkg/storage/chunk/hedging/hedging.go b/pkg/storage/chunk/hedging/hedging.go index 8ba8bdcb3782..326469d5ba70 100644 --- a/pkg/storage/chunk/hedging/hedging.go +++ b/pkg/storage/chunk/hedging/hedging.go @@ -1,19 +1,49 @@ package hedging import ( + "errors" "flag" "net/http" + "sync" "time" "github.com/cristalhq/hedgedhttp" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" ) +var ( + ErrTooManyHedgeRequests = errors.New("too many hedge requests") + totalHedgeRequests prometheus.Counter + totalRateLimitedHedgeRequests prometheus.Counter + once sync.Once +) + +func init() { + initMetrics() +} + +func initMetrics() { + once = sync.Once{} + totalHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_total", + Help: "The total number of hedged requests.", + }) + + totalRateLimitedHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_rate_limited_total", + Help: "The total number of hedged requests rejected via rate limiting.", + }) +} + // Config is the configuration for hedging requests. type Config struct { // At is the duration after which a second request will be issued. At time.Duration `yaml:"at"` // UpTo is the maximum number of requests that will be issued. UpTo int `yaml:"up_to"` + // The maximun of hedge requests allowed per second. + MaxPerSecond int `yaml:"max_per_second"` } // RegisterFlags registers flags. @@ -25,18 +55,78 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.") f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)") + f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.") } +// Client returns a hedged http client. +// The client transport will be mutated to use the hedged roundtripper. func (cfg *Config) Client(client *http.Client) *http.Client { + return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer) +} + +// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer. +// The client transport will be mutated to use the hedged roundtripper. +func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) *http.Client { + if reg == nil { + reg = prometheus.DefaultRegisterer + } + if client == nil { + client = http.DefaultClient + } if cfg.At == 0 { return client } - return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client) + client.Transport = cfg.RoundTripperWithRegisterer(client.Transport, reg) + return client } -func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper { +// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer. +func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) http.RoundTripper { + if reg == nil { + reg = prometheus.DefaultRegisterer + } + if next == nil { + next = http.DefaultTransport + } if cfg.At == 0 { return next } - return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next) + // register metrics + once.Do(func() { + reg.MustRegister(totalHedgeRequests) + reg.MustRegister(totalRateLimitedHedgeRequests) + }) + return hedgedhttp.NewRoundTripper( + cfg.At, + cfg.UpTo, + newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next), + ) +} + +// RoundTripper returns a hedged roundtripper. +func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper { + return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer) +} + +type limitedHedgingRoundTripper struct { + next http.RoundTripper + limiter *rate.Limiter +} + +func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper { + return &limitedHedgingRoundTripper{ + next: next, + limiter: rate.NewLimiter(rate.Limit(max), max), + } +} + +func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if hedgedhttp.IsHedgedRequest(req) { + if !rt.limiter.Allow() { + totalRateLimitedHedgeRequests.Inc() + return nil, ErrTooManyHedgeRequests + } + totalHedgeRequests.Inc() + } + return rt.next.RoundTrip(req) } diff --git a/pkg/storage/chunk/hedging/hedging_test.go b/pkg/storage/chunk/hedging/hedging_test.go new file mode 100644 index 000000000000..06a7a42f6008 --- /dev/null +++ b/pkg/storage/chunk/hedging/hedging_test.go @@ -0,0 +1,90 @@ +package hedging + +import ( + "net/http" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +func resetMetrics() { + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer = reg + prometheus.DefaultGatherer = reg + initMetrics() +} + +func TestHedging(t *testing.T) { + resetMetrics() + cfg := &Config{ + At: time.Duration(1), + UpTo: 3, + MaxPerSecond: 1000, + } + count := atomic.NewInt32(0) + client := cfg.Client(&http.Client{ + Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }), + }) + _, _ = client.Get("http://example.com") + + require.Equal(t, int32(3), count.Load()) + require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, + strings.NewReader(` +# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting. +# TYPE hedged_requests_rate_limited_total counter +hedged_requests_rate_limited_total 0 +# HELP hedged_requests_total The total number of hedged requests. +# TYPE hedged_requests_total counter +hedged_requests_total 2 +`, + ), "hedged_requests_total", "hedged_requests_rate_limited_total")) +} + +func TestHedgingRateLimit(t *testing.T) { + resetMetrics() + cfg := &Config{ + At: time.Duration(1), + UpTo: 20, + MaxPerSecond: 1, + } + count := atomic.NewInt32(0) + client := cfg.Client(&http.Client{ + Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }), + }) + _, _ = client.Get("http://example.com") + + require.Equal(t, int32(2), count.Load()) + require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, + strings.NewReader(` +# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting. +# TYPE hedged_requests_rate_limited_total counter +hedged_requests_rate_limited_total 18 +# HELP hedged_requests_total The total number of hedged requests. +# TYPE hedged_requests_total counter +hedged_requests_total 1 +`, + ), "hedged_requests_total", "hedged_requests_rate_limited_total")) +} diff --git a/pkg/storage/chunk/openstack/swift_object_client.go b/pkg/storage/chunk/openstack/swift_object_client.go index 4226e78f95e0..4c5afe2c342c 100644 --- a/pkg/storage/chunk/openstack/swift_object_client.go +++ b/pkg/storage/chunk/openstack/swift_object_client.go @@ -12,6 +12,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" cortex_openstack "github.com/cortexproject/cortex/pkg/chunk/openstack" cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" @@ -110,7 +111,7 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) c.DomainId = cfg.UserDomainID } if hedging { - c.Transport = hedgingCfg.RoundTripper(c.Transport) + c.Transport = hedgingCfg.RoundTripperWithRegisterer(c.Transport, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer)) } err := c.Authenticate() diff --git a/pkg/storage/chunk/openstack/swift_object_client_test.go b/pkg/storage/chunk/openstack/swift_object_client_test.go index ca2c5d715505..75a421c53dec 100644 --- a/pkg/storage/chunk/openstack/swift_object_client_test.go +++ b/pkg/storage/chunk/openstack/swift_object_client_test.go @@ -99,8 +99,9 @@ func Test_Hedging(t *testing.T) { RequestTimeout: 10 * time.Second, }, }, hedging.Config{ - At: tc.hedgeAt, - UpTo: tc.upTo, + At: tc.hedgeAt, + UpTo: tc.upTo, + MaxPerSecond: 1000, }) require.NoError(t, err) tc.do(c) diff --git a/vendor/github.com/cristalhq/hedgedhttp/hedged.go b/vendor/github.com/cristalhq/hedgedhttp/hedged.go index f3a3f041c563..17d06630d342 100644 --- a/vendor/github.com/cristalhq/hedgedhttp/hedged.go +++ b/vendor/github.com/cristalhq/hedgedhttp/hedged.go @@ -107,7 +107,7 @@ func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error) for sent := 0; len(errOverall.Errors) < ht.upto; sent++ { if sent < ht.upto { idx := sent - subReq, cancel := reqWithCtx(req, mainCtx) + subReq, cancel := reqWithCtx(req, mainCtx, idx != 0) cancels[idx] = cancel runInPool(func() { @@ -174,12 +174,23 @@ type indexedResp struct { Resp *http.Response } -func reqWithCtx(r *http.Request, ctx context.Context) (*http.Request, func()) { +func reqWithCtx(r *http.Request, ctx context.Context, isHedged bool) (*http.Request, func()) { ctx, cancel := context.WithCancel(ctx) + if isHedged { + ctx = context.WithValue(ctx, hedgedRequest{}, struct{}{}) + } req := r.WithContext(ctx) return req, cancel } +type hedgedRequest struct{} + +// IsHedgedRequest reports when a request is hedged. +func IsHedgedRequest(r *http.Request) bool { + val := r.Context().Value(hedgedRequest{}) + return val != nil +} + // atomicCounter is a false sharing safe counter. type atomicCounter struct { count uint64 diff --git a/vendor/modules.txt b/vendor/modules.txt index 30a982c43770..6e4bdc8a953b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -308,7 +308,7 @@ github.com/cortexproject/cortex/pkg/util/spanlogger github.com/cortexproject/cortex/pkg/util/test github.com/cortexproject/cortex/pkg/util/validation github.com/cortexproject/cortex/tools/querytee -# github.com/cristalhq/hedgedhttp v0.6.1 +# github.com/cristalhq/hedgedhttp v0.6.2 ## explicit; go 1.16 github.com/cristalhq/hedgedhttp # github.com/davecgh/go-spew v1.1.1