Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiting and metrics to hedging #4860

Merged
merged 4 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,12 @@ You should configure the latency based on your p99 of object store requests.
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
# The maximum amount of hedge requests to be issued for a given request.
[up_to: <int> | default = 2]
# Optional. Default is 5
# The maximum amount of hedged requests to be issued per seconds.
[max_per_second: <int> | default = 5]

```

## local_storage_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.1
github.com/cristalhq/hedgedhttp v0.6.2
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cristalhq/hedgedhttp v0.6.2 h1:aWnUOzqPaM8/dgmPLR7wl0AoFOPYnqgdhTkcWgWUgpA=
github.com/cristalhq/hedgedhttp v0.6.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func Test_Hedging(t *testing.T) {
})
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
Expand Down
81 changes: 44 additions & 37 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,25 @@ var (
}

// default Azure http client.
defaultClient = &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
}
}
)

Expand Down Expand Up @@ -132,20 +134,30 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig
hedgingCfg hedging.Config
cfg *BlobStorageConfig

containerURL azblob.ContainerURL

pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage")
blobStorage := &BlobStorage{
cfg: cfg,
hedgingCfg: hedgingCfg,
cfg: cfg,
}

var err error
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
return nil, err
}
blobStorage.pipeline = pipeline
hedgingPipeline, err := blobStorage.newPipeline(hedgingCfg, true)
if err != nil {
return nil, err
}
blobStorage.hedgingPipeline = hedgingPipeline
blobStorage.containerURL, err = blobStorage.buildContainerURL()
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,13 +222,12 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
if err != nil {
return azblob.BlockBlobURL{}, err
}

azPipeline, err := b.newPipeline(hedging)
if err != nil {
return azblob.BlockBlobURL{}, err
pipeline := b.pipeline
if hedging {
pipeline = b.hedgingPipeline
}

return azblob.NewBlockBlobURL(*u, azPipeline), nil
return azblob.NewBlockBlobURL(*u, pipeline), nil
}

func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
Expand All @@ -225,15 +236,10 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
return azblob.ContainerURL{}, err
}

azPipeline, err := b.newPipeline(false)
if err != nil {
return azblob.ContainerURL{}, err
}

return azblob.NewContainerURL(*u, azPipeline), nil
return azblob.NewContainerURL(*u, b.pipeline), nil
}

func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
Expand All @@ -248,17 +254,18 @@ func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}
client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := defaultClient.Do(request.WithContext(ctx))
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})

if hedging {
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
client := b.hedgingCfg.Client(defaultClient)
client := hedgingCfg.Client(client)
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
Expand Down
19 changes: 11 additions & 8 deletions pkg/storage/chunk/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,23 @@ func Test_Hedging(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the client to count the number of calls
defaultClient = &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
}
}
c, err := NewBlobStorage(&BlobStorageConfig{
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/gcp/gcs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func Test_Hedging(t *testing.T) {
BucketName: "test-bucket",
Insecure: true,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())
Expand Down
70 changes: 68 additions & 2 deletions pkg/storage/chunk/hedging/hedging.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
package hedging

import (
"errors"
"flag"
"net/http"
"time"

"github.com/cristalhq/hedgedhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/time/rate"
)

var (
ErrTooManyHedgeRequests = errors.New("too many hedge requests")
totalHedgeRequests prometheus.Counter
totalRateLimitedHedgeRequests prometheus.Counter
)

func init() {
registerMetrics()
}

func registerMetrics() {
totalHedgeRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_total",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these be prefixed with loki_?

Suggested change
Name: "hedged_requests_total",
Name: "hedged_requests_total",
Namespace: "loki",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I did not prefix them because I figure we might move this to dskit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should prefix them until we move it to dskit. All of our other metrics have loki_ prefixes.

Help: "The total number of hedged requests.",
})

totalRateLimitedHedgeRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_rate_limited_total",
Help: "The total number of hedged requests rejected via rate limiting.",
})
}

// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
// The maximun of hedge requests allowed per second.
MaxPerSecond int `yaml:"max_per_second"`
}

// RegisterFlags registers flags.
Expand All @@ -25,18 +53,56 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.")
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.")
}

// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) Client(client *http.Client) *http.Client {
if client == nil {
client = http.DefaultClient
}
if cfg.At == 0 {
return client
}
return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client)
client.Transport = cfg.RoundTripper(client.Transport)
return client
}

// RoundTripper returns a hedged roundtripper.
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
if next == nil {
next = http.DefaultTransport
}
if cfg.At == 0 {
return next
}
return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next)
return hedgedhttp.NewRoundTripper(
cfg.At,
cfg.UpTo,
newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next),
)
}

type limitedHedgingRoundTripper struct {
next http.RoundTripper
limiter *rate.Limiter
}

func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper {
return &limitedHedgingRoundTripper{
next: next,
limiter: rate.NewLimiter(rate.Limit(max), max),
}
}

func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if hedgedhttp.IsHedgedRequest(req) {
if !rt.limiter.Allow() {
totalRateLimitedHedgeRequests.Inc()
return nil, ErrTooManyHedgeRequests
}
totalHedgeRequests.Inc()
}
return rt.next.RoundTrip(req)
}
Loading