Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/vt/mysqlctl: instrument s3 upload time #12500

Merged
merged 11 commits into from
Sep 20, 2023
14 changes: 13 additions & 1 deletion changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- **[New stats](#new-stats)**
- [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters)
- [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status)
- [Backup and restore metrics for AWS S3](#backup-restore-metrics-aws-s3)
- **[VTTablet](#vttablet)**
- [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences)
- **[Docker](#docker)**
Expand Down Expand Up @@ -129,6 +130,17 @@ The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found
* `Stalled` is set to `1` when replication stops advancing.
* `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary.

#### <a id="backup-restore-metrics-aws-s3"/>Backup and restore metrics for AWS S3

Requests to AWS S3 are instrumented in backup and restore metrics. For example:

```
vtbackup_backup_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 823
vtbackup_backup_duration_nanoseconds{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 1.33632421437e+11
vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
```

### <a id="vttablet"/>VTTablet

#### <a id="vttablet-new-rpc-reset-sequences"/>New ResetSequences rpc
Expand Down Expand Up @@ -165,4 +177,4 @@ removing Vitess support.

#### <a id="new-durability-policies"/>New Durability Policies

2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
30 changes: 14 additions & 16 deletions go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,25 @@ func TestExecuteBackup(t *testing.T) {
var sourceReadStats int

for _, sr := range fakeStats.ScopeReturns {
sfs := sr.(*backupstats.FakeStats)
switch sfs.ScopeV[backupstats.ScopeOperation] {
switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}

Expand Down Expand Up @@ -529,26 +528,25 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) {
var sourceReadStats int

for _, sr := range fakeStats.ScopeReturns {
sfs := sr.(*backupstats.FakeStats)
switch sfs.ScopeV[backupstats.ScopeOperation] {
switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBackupExecutesBackupWithScopedParams(t *testing.T) {
var executeBackupStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeBackupParams.Stats {
executeBackupStats = sr.(*backupstats.FakeStats)
executeBackupStats = sr
}
}
require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
storageStats = sr.(*backupstats.FakeStats)
storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestRestoreExecutesRestoreWithScopedParams(t *testing.T) {
var executeRestoreStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeRestoreParams.Stats {
executeRestoreStats = sr.(*backupstats.FakeStats)
executeRestoreStats = sr
}
}
require.Contains(t, executeRestoreStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestRestoreParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
storageStats = sr.(*backupstats.FakeStats)
storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backupstats/fake_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type FakeStats struct {
Duration time.Duration
}
ScopeCalls [][]Scope
ScopeReturns []Stats
ScopeReturns []*FakeStats
mutex sync.Mutex
}

Expand Down
25 changes: 18 additions & 7 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
Expand All @@ -48,6 +49,7 @@ import (

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/servenv"
)
Expand Down Expand Up @@ -170,16 +172,20 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
u.PartSize = partSizeBytes
})
object := objName(bh.dir, bh.name, filename)

_, err := uploader.Upload(&s3manager.UploadInput{
sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully sure of the implications of switching to UploadWithContext.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What motivated you to make the change in the first place?
ctx will have a timeout associated with it. Assuming standard practice, UploadWithContext will terminate and return if that timeout is exceeded.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motivation was that I thought UploadWithContext let me specify a WithUploaderRequestOptions, whereas Upload does not, but now I can see that's not the case, so I have no reason to use this.

Maybe specifying a context would be a good thing, but I don't know enough to say that with any confidence. Reverted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either you should change this back to be consistent with the change in Line 222 (GetObjectWithContext), or change that back to not use a context. I vote for using a context.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops didn't even notice it was in two places when I tried switching back 😮‍💨

Will switch back to GetObjectWithContext

Bucket: &bucket,
Key: object,
Body: reader,
ServerSideEncryption: bh.bs.s3SSE.awsAlg,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
})
}, s3manager.WithUploaderRequestOptions(func(r *request.Request) {
r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
sendStats.TimedIncrement(time.Since(r.AttemptTime))
})
}))
if err != nil {
reader.CloseWithError(err)
bh.RecordError(err)
Expand Down Expand Up @@ -212,12 +218,17 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
object := objName(bh.dir, bh.name, filename)
out, err := bh.client.GetObject(&s3.GetObjectInput{
sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
out, err := bh.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully sure of the implications of switching to GetObjectWithContext.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change.

Bucket: &bucket,
Key: object,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
}, func(r *request.Request) {
r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
deepthi marked this conversation as resolved.
Show resolved Hide resolved
sendStats.TimedIncrement(time.Since(r.AttemptTime))
})
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -272,6 +283,7 @@ type S3BackupStorage struct {
_client *s3.S3
mu sync.Mutex
s3SSE S3ServerSideEncryption
params backupstorage.Params
}

// ListBackups is part of the backupstorage.BackupStorage interface.
Expand Down Expand Up @@ -411,8 +423,7 @@ func (bs *S3BackupStorage) Close() error {
}

func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
// TODO(maxeng): return a new S3BackupStorage that uses params.
return bs
return &S3BackupStorage{params: params}
}

var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil)
Expand Down Expand Up @@ -485,7 +496,7 @@ func objName(parts ...string) *string {
}

func init() {
backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{}
backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{params: backupstorage.NoParams()}

logNameMap = logNameToLogLevel{
"LogOff": aws.LogOff,
Expand Down
124 changes: 118 additions & 6 deletions go/vt/mysqlctl/s3backupstorage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,135 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/logutil"
stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)

type s3ErrorClient struct{ s3iface.S3API }
type s3FakeClient struct {
s3iface.S3API
err error
delay time.Duration
}

func (s3errclient *s3ErrorClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
func (sfc *s3FakeClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
u, _ := url.Parse("http://localhost:1234")
req := request.Request{
HTTPRequest: &http.Request{}, // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13)
Error: errors.New("some error"), // this forces req.Send() (which is called by the uploader) to always return non-nil error
HTTPRequest: &http.Request{ // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13)
Header: make(http.Header),
URL: u,
},
Retryer: client.DefaultRetryer{},
}

req.Handlers.Send.PushBack(func(r *request.Request) {
r.Error = sfc.err
if sfc.delay > 0 {
time.Sleep(sfc.delay)
}
})

return &req, &s3.PutObjectOutput{}
}

func TestAddFileError(t *testing.T) {
bh := &S3BackupHandle{client: &s3ErrorClient{}, bs: &S3BackupStorage{}, readOnly: false}
bh := &S3BackupHandle{
client: &s3FakeClient{err: errors.New("some error")},
bs: &S3BackupStorage{
params: backupstorage.NoParams(),
},
readOnly: false,
}

wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000)
require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser")

n, err := wc.Write([]byte("here are some bytes"))
require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err)

err = wc.Close()
require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err)

bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error

require.True(t, bh.HasErrors(), "AddFile() expected bh to record async error but did not")
}

func TestAddFileStats(t *testing.T) {
fakeStats := stats.NewFakeStats()

delay := 10 * time.Millisecond

bh := &S3BackupHandle{
client: &s3FakeClient{delay: delay},
bs: &S3BackupStorage{
params: backupstorage.Params{
Logger: logutil.NewMemoryLogger(),
Stats: fakeStats,
},
},
readOnly: false,
}

for i := 0; i < 4; i++ {
wc, err := bh.AddFile(aws.BackgroundContext(), fmt.Sprintf("somefile-%d", i), 100000)
require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser")

n, err := wc.Write([]byte("here are some bytes"))
require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err)

err = wc.Close()
require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err)
}

bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error

require.Equal(t, bh.HasErrors(), false, "AddFile() expected bh not to record async errors but did")

require.Len(t, fakeStats.ScopeCalls, 4)
scopedStats := fakeStats.ScopeReturns[0]
require.Len(t, scopedStats.ScopeV, 1)
require.Equal(t, scopedStats.ScopeV[stats.ScopeOperation], "AWS:Request:Send")
require.Len(t, scopedStats.TimedIncrementCalls, 1)
require.GreaterOrEqual(t, scopedStats.TimedIncrementCalls[0], delay)
require.Len(t, scopedStats.TimedIncrementBytesCalls, 0)
}

func TestAddFileErrorStats(t *testing.T) {
fakeStats := stats.NewFakeStats()

delay := 10 * time.Millisecond

bh := &S3BackupHandle{
client: &s3FakeClient{
delay: delay,
err: errors.New("some error"),
},
bs: &S3BackupStorage{
params: backupstorage.Params{
Logger: logutil.NewMemoryLogger(),
Stats: fakeStats,
},
},
readOnly: false,
}

wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000)
require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
Expand All @@ -43,7 +147,15 @@ func TestAddFileError(t *testing.T) {

bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error

require.Equal(t, bh.HasErrors(), true, "AddFile() expected bh to record async error but did not")
require.True(t, bh.HasErrors(), "AddFile() expected bh not to record async errors but did")

require.Len(t, fakeStats.ScopeCalls, 1)
scopedStats := fakeStats.ScopeReturns[0]
require.Len(t, scopedStats.ScopeV, 1)
require.Equal(t, scopedStats.ScopeV[stats.ScopeOperation], "AWS:Request:Send")
require.Len(t, scopedStats.TimedIncrementCalls, 1)
require.GreaterOrEqual(t, scopedStats.TimedIncrementCalls[0], delay)
require.Len(t, scopedStats.TimedIncrementBytesCalls, 0)
}

func TestNoSSE(t *testing.T) {
Expand Down