Skip to content

Commit

Permalink
Hedgin Bets and Requests (#750)
Browse files Browse the repository at this point in the history
* hedged client

Signed-off-by: Joe Elliott <number101010@gmail.com>

* edited vendored files!

Signed-off-by: Joe Elliott <number101010@gmail.com>

* 2 seconds?

Signed-off-by: Joe Elliott <number101010@gmail.com>

* don't cancel

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Upgraded to 0.4.0

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added config and docs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Cleaned up bucket creation logic

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added to s3

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* go mod tidy

Signed-off-by: Joe Elliott <number101010@gmail.com>

* go mod vendor

Signed-off-by: Joe Elliott <number101010@gmail.com>

* be a better developer

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added doc details

Signed-off-by: Joe Elliott <number101010@gmail.com>

* s3 tests?

Signed-off-by: Joe Elliott <number101010@gmail.com>

* gcs tests?

Signed-off-by: Joe Elliott <number101010@gmail.com>

* azure support

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* vendor

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fixed races

Signed-off-by: Joe Elliott <number101010@gmail.com>

* review

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Jun 14, 2021
1 parent 3b9f309 commit f57f515
Show file tree
Hide file tree
Showing 20 changed files with 860 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [FEATURE] Added the ability to hedge requests with all backends [#750](https://github.com/grafana/tempo/pull/750) (@joe-elliott)

## v1.0.0

Expand Down
21 changes: 21 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ storage:
# Set to true to enable authentication and certificate checks on gcs requests
[insecure: <bool>]
# Optional. Default is 0 (disabled)
# Example: "hedge_requests_at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of GCS requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge_requests_at: <duration>]
# S3 configuration. Will be used only if value of backend is "s3"
# Check the S3 doc within this folder for information on s3 specific permissions.
s3:
Expand Down Expand Up @@ -283,6 +290,13 @@ storage:
# enable to use path-style requests.
[forcepathstyle: <bool>]
# Optional. Default is 0 (disabled)
# Example: "hedge_requests_at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of S3 requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge_requests_at: <duration>]
# azure configuration. Will be used only if value of backend is "azure"
# EXPERIMENTAL
azure:
Expand All @@ -305,6 +319,13 @@ storage:
# access key when using access key credentials.
[storage-account-key: <string>]
# Optional. Default is 0 (disabled)
# Example: "hedge-requests-at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of Axure Blog Storage requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge-requests-at: <duration>]
# How often to repoll the backend for new blocks. Default is 5m
[blocklist_poll: <duration>]
Expand Down
3 changes: 3 additions & 0 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ storage:
chunk_buffer_size: 10485760
endpoint: ""
insecure: false
hedge_requests_at: 0
s3:
bucket: ""
endpoint: ""
Expand All @@ -317,13 +318,15 @@ storage:
part_size: 0
signature_v2: false
forcepathstyle: false
hedge_requests_at: 0
azure:
storage-account-name: ""
storage-account-key: ""
container-name: ""
endpoint-suffix: blob.core.windows.net
max-buffers: 4
buffer-size: 3145728
hedge-requests-at: 0
cache: ""
background_cache:
writeback_goroutines: 10
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go 1.16
require (
cloud.google.com/go/storage v1.15.0
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/alecthomas/kong v0.2.11
github.com/cespare/xxhash v1.1.0
github.com/cortexproject/cortex v1.8.1-0.20210422151339-cf1c444e0905
github.com/cristalhq/hedgedhttp v0.4.0
github.com/dustin/go-humanize v1.0.0
github.com/go-kit/kit v0.10.0
github.com/gogo/protobuf v1.3.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.4.0 h1:J1z1zKJ1bEFpMLjZWgtX0inUWlWecNyouWRIQknGzgM=
github.com/cristalhq/hedgedhttp v0.4.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ=
github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8=
github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg=
Expand Down
21 changes: 14 additions & 7 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ const (
)

type readerWriter struct {
cfg *Config
containerURL blob.ContainerURL
cfg *Config
containerURL blob.ContainerURL
hedgedContainerURL blob.ContainerURL
}

type appendTracker struct {
Expand All @@ -39,14 +40,20 @@ type appendTracker struct {
func New(cfg *Config) (backend.Reader, backend.Writer, backend.Compactor, error) {
ctx := context.Background()

container, err := GetContainer(ctx, cfg)
container, err := GetContainer(ctx, cfg, false)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "getting storage container")
}

hedgedContainer, err := GetContainer(ctx, cfg, true)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "getting hedged storage container")
}

rw := &readerWriter{
cfg: cfg,
containerURL: container,
cfg: cfg,
containerURL: container,
hedgedContainerURL: hedgedContainer,
}

return rw, rw, rw, nil
Expand Down Expand Up @@ -280,7 +287,7 @@ func (rw *readerWriter) writer(ctx context.Context, src io.Reader, name string)
}

func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64, destBuffer []byte) error {
blobURL := rw.containerURL.NewBlockBlobURL(name)
blobURL := rw.hedgedContainerURL.NewBlockBlobURL(name)

var props *blob.BlobGetPropertiesResponse
props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
Expand Down Expand Up @@ -319,7 +326,7 @@ func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64
}

func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) {
blobURL := rw.containerURL.NewBlockBlobURL(name)
blobURL := rw.hedgedContainerURL.NewBlockBlobURL(name)

var props *blob.BlobGetPropertiesResponse
props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
Expand Down
36 changes: 28 additions & 8 deletions tempodb/backend/azure/azure_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"strings"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
blob "github.com/Azure/azure-storage-blob-go/azblob"
"github.com/cristalhq/hedgedhttp"
)

const maxRetries = 3
const (
maxRetries = 1
uptoHedgedRequests = 2
)

func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
func GetContainerURL(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(conf.StorageAccountName.String(), conf.StorageAccountKey.String())
if err != nil {
return blob.ContainerURL{}, err
Expand All @@ -26,9 +31,24 @@ func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, erro
retryOptions.TryTimeout = time.Until(deadline)
}

var httpSender pipeline.Factory
if hedge && conf.HedgeRequestsAt != 0 {
httpSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
client := hedgedhttp.NewClient(conf.HedgeRequestsAt, uptoHedgedRequests, nil)

// Send the request over the network
resp, err := client.Do(request.WithContext(ctx))

return pipeline.NewHTTPResponse(resp), err
}
})
}

p := blob.NewPipeline(c, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Tempo"},
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Tempo"},
HTTPSender: httpSender,
})

u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
Expand All @@ -48,8 +68,8 @@ func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, erro
return service.NewContainerURL(conf.ContainerName), nil
}

func GetContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf)
func GetContainer(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf, hedge)
if err != nil {
return blob.ContainerURL{}, err
}
Expand All @@ -59,15 +79,15 @@ func GetContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error)
}

func GetBlobURL(ctx context.Context, conf *Config, blobName string) (blob.BlockBlobURL, error) {
c, err := GetContainerURL(ctx, conf)
c, err := GetContainerURL(ctx, conf, false)
if err != nil {
return blob.BlockBlobURL{}, err
}
return c.NewBlockBlobURL(blobName), nil
}

func CreateContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf)
c, err := GetContainerURL(ctx, conf, false)
if err != nil {
return blob.ContainerURL{}, err
}
Expand Down
110 changes: 110 additions & 0 deletions tempodb/backend/azure/azure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package azure

import (
"context"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHedge(t *testing.T) {
tests := []struct {
name string
returnIn time.Duration
hedgeAt time.Duration
expectedHedgedRequests int32
}{
{
name: "hedge disabled",
expectedHedgedRequests: 1,
},
{
name: "hedge enabled doesn't hit",
hedgeAt: time.Hour,
expectedHedgedRequests: 1,
},
{
name: "hedge enabled and hits",
hedgeAt: time.Millisecond,
returnIn: 100 * time.Millisecond,
expectedHedgedRequests: 2,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
count := int32(0)
server := fakeServer(t, tc.returnIn, &count)

r, w, _, err := New(&Config{
MaxBuffers: 3,
BufferSize: 1000,
ContainerName: "blerg",
Endpoint: server.URL[7:], // [7:] -> strip http://,
HedgeRequestsAt: tc.hedgeAt,
})
require.NoError(t, err)

ctx := context.Background()

// the first call on each client initiates an extra http request
// clearing that here
_, _ = r.Read(ctx, "object", uuid.New(), "tenant")
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _ = r.Read(ctx, "object", uuid.New(), "tenant")
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count)) // *2 b/c reads execute a HEAD and GET
atomic.StoreInt32(&count, 0)

// this panics with the garbage test setup. todo: make it not panic
// _ = r.ReadRange(ctx, "object", uuid.New(), "tenant", 10, make([]byte, 100))
// time.Sleep(tc.returnIn)
// assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count))
// atomic.StoreInt32(&count, 0)

_, _ = r.BlockMeta(ctx, uuid.New(), "tenant") // *2 b/c reads execute a HEAD and GET
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

// calls that should not hedge
_, _ = r.Tenants(ctx)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_, _ = r.Blocks(ctx, "tenant")
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", uuid.New(), "tenant", make([]byte, 10))
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.WriteBlockMeta(ctx, &backend.BlockMeta{})
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
}
}

func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(returnIn)

atomic.AddInt32(counter, 1)
_, _ = w.Write([]byte(`{}`))
}))
t.Cleanup(server.Close)

return server
}
7 changes: 6 additions & 1 deletion tempodb/backend/azure/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package azure

import "github.com/cortexproject/cortex/pkg/util/flagext"
import (
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
)

type Config struct {
StorageAccountName flagext.Secret `yaml:"storage-account-name"`
Expand All @@ -9,4 +13,5 @@ type Config struct {
Endpoint string `yaml:"endpoint-suffix"`
MaxBuffers int `yaml:"max-buffers"`
BufferSize int `yaml:"buffer-size"`
HedgeRequestsAt time.Duration `yaml:"hedge-requests-at"`
}
11 changes: 7 additions & 4 deletions tempodb/backend/gcs/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package gcs

import "time"

type Config struct {
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
}
Loading

0 comments on commit f57f515

Please sign in to comment.