diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md
index 61cf6be1cf8..6c355d79575 100644
--- a/changelog/18.0/18.0.0/summary.md
+++ b/changelog/18.0/18.0.0/summary.md
@@ -19,6 +19,7 @@
- **[New stats](#new-stats)**
- [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters)
- [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status)
+ - [Backup and restore metrics for AWS S3](#backup-restore-metrics-aws-s3)
- **[VTTablet](#vttablet)**
- [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences)
- **[Docker](#docker)**
@@ -129,6 +130,17 @@ The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found
* `Stalled` is set to `1` when replication stops advancing.
* `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary.
+#### Backup and restore metrics for AWS S3
+
+Requests to AWS S3 are instrumented in backup and restore metrics. For example:
+
+```
+vtbackup_backup_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 823
+vtbackup_backup_duration_nanoseconds{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 1.33632421437e+11
+vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
+vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
+```
+
### VTTablet
#### New ResetSequences rpc
@@ -165,4 +177,4 @@ removing Vitess support.
#### New Durability Policies
-2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
\ No newline at end of file
+2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/backup_blackbox_test.go
index 62b58f2a5c8..8de6a8679fa 100644
--- a/go/vt/mysqlctl/backup_blackbox_test.go
+++ b/go/vt/mysqlctl/backup_blackbox_test.go
@@ -173,26 +173,25 @@ func TestExecuteBackup(t *testing.T) {
var sourceReadStats int
for _, sr := range fakeStats.ScopeReturns {
- sfs := sr.(*backupstats.FakeStats)
- switch sfs.ScopeV[backupstats.ScopeOperation] {
+ switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
- require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
+ require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
- require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
+ require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}
@@ -529,26 +528,25 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) {
var sourceReadStats int
for _, sr := range fakeStats.ScopeReturns {
- sfs := sr.(*backupstats.FakeStats)
- switch sfs.ScopeV[backupstats.ScopeOperation] {
+ switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
- require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
+ require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
- require.Len(t, sfs.TimedIncrementCalls, 1)
+ require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
- require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
+ require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}
diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go
index 5a135c26a30..5b97f709c2f 100644
--- a/go/vt/mysqlctl/backup_test.go
+++ b/go/vt/mysqlctl/backup_test.go
@@ -54,7 +54,7 @@ func TestBackupExecutesBackupWithScopedParams(t *testing.T) {
var executeBackupStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeBackupParams.Stats {
- executeBackupStats = sr.(*backupstats.FakeStats)
+ executeBackupStats = sr
}
}
require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent)
@@ -87,7 +87,7 @@ func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
- storageStats = sr.(*backupstats.FakeStats)
+ storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
@@ -344,7 +344,7 @@ func TestRestoreExecutesRestoreWithScopedParams(t *testing.T) {
var executeRestoreStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeRestoreParams.Stats {
- executeRestoreStats = sr.(*backupstats.FakeStats)
+ executeRestoreStats = sr
}
}
require.Contains(t, executeRestoreStats.ScopeV, backupstats.ScopeComponent)
@@ -379,7 +379,7 @@ func TestRestoreParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
- storageStats = sr.(*backupstats.FakeStats)
+ storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
diff --git a/go/vt/mysqlctl/backupstats/fake_stats.go b/go/vt/mysqlctl/backupstats/fake_stats.go
index e8e84431eb9..29728d86db5 100644
--- a/go/vt/mysqlctl/backupstats/fake_stats.go
+++ b/go/vt/mysqlctl/backupstats/fake_stats.go
@@ -13,7 +13,7 @@ type FakeStats struct {
Duration time.Duration
}
ScopeCalls [][]Scope
- ScopeReturns []Stats
+ ScopeReturns []*FakeStats
mutex sync.Mutex
}
diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go
index 4d10cd7f080..06cff1793b5 100644
--- a/go/vt/mysqlctl/s3backupstorage/s3.go
+++ b/go/vt/mysqlctl/s3backupstorage/s3.go
@@ -36,6 +36,7 @@ import (
"sort"
"strings"
"sync"
+ "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
@@ -48,6 +49,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
+ stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/servenv"
)
@@ -170,8 +172,8 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
u.PartSize = partSizeBytes
})
object := objName(bh.dir, bh.name, filename)
-
- _, err := uploader.Upload(&s3manager.UploadInput{
+ sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
+ _, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: &bucket,
Key: object,
Body: reader,
@@ -179,7 +181,11 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
- })
+ }, s3manager.WithUploaderRequestOptions(func(r *request.Request) {
+ r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
+ sendStats.TimedIncrement(time.Since(r.AttemptTime))
+ })
+ }))
if err != nil {
reader.CloseWithError(err)
bh.RecordError(err)
@@ -212,12 +218,17 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
object := objName(bh.dir, bh.name, filename)
- out, err := bh.client.GetObject(&s3.GetObjectInput{
+ sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
+ out, err := bh.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: object,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
+ }, func(r *request.Request) {
+ r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
+ sendStats.TimedIncrement(time.Since(r.AttemptTime))
+ })
})
if err != nil {
return nil, err
@@ -272,6 +283,7 @@ type S3BackupStorage struct {
_client *s3.S3
mu sync.Mutex
s3SSE S3ServerSideEncryption
+ params backupstorage.Params
}
// ListBackups is part of the backupstorage.BackupStorage interface.
@@ -411,8 +423,7 @@ func (bs *S3BackupStorage) Close() error {
}
func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
- // TODO(maxeng): return a new S3BackupStorage that uses params.
- return bs
+ return &S3BackupStorage{params: params}
}
var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil)
@@ -485,7 +496,7 @@ func objName(parts ...string) *string {
}
func init() {
- backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{}
+ backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{params: backupstorage.NoParams()}
logNameMap = logNameToLogLevel{
"LogOff": aws.LogOff,
diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go
index 5303d88e5e5..a10432b78c2 100644
--- a/go/vt/mysqlctl/s3backupstorage/s3_test.go
+++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go
@@ -5,31 +5,135 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
+ "fmt"
"net/http"
+ "net/url"
"os"
"testing"
+ "time"
"github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/vt/logutil"
+ stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)
-type s3ErrorClient struct{ s3iface.S3API }
+type s3FakeClient struct {
+ s3iface.S3API
+ err error
+ delay time.Duration
+}
-func (s3errclient *s3ErrorClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
+func (sfc *s3FakeClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
+ u, _ := url.Parse("http://localhost:1234")
req := request.Request{
- HTTPRequest: &http.Request{}, // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13)
- Error: errors.New("some error"), // this forces req.Send() (which is called by the uploader) to always return non-nil error
+ HTTPRequest: &http.Request{ // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13)
+ Header: make(http.Header),
+ URL: u,
+ },
+ Retryer: client.DefaultRetryer{},
}
+ req.Handlers.Send.PushBack(func(r *request.Request) {
+ r.Error = sfc.err
+ if sfc.delay > 0 {
+ time.Sleep(sfc.delay)
+ }
+ })
+
return &req, &s3.PutObjectOutput{}
}
func TestAddFileError(t *testing.T) {
- bh := &S3BackupHandle{client: &s3ErrorClient{}, bs: &S3BackupStorage{}, readOnly: false}
+ bh := &S3BackupHandle{
+ client: &s3FakeClient{err: errors.New("some error")},
+ bs: &S3BackupStorage{
+ params: backupstorage.NoParams(),
+ },
+ readOnly: false,
+ }
+
+ wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000)
+ require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
+ assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser")
+
+ n, err := wc.Write([]byte("here are some bytes"))
+ require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err)
+
+ err = wc.Close()
+ require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err)
+
+ bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error
+
+ require.True(t, bh.HasErrors(), "AddFile() expected bh to record async error but did not")
+}
+
+func TestAddFileStats(t *testing.T) {
+ fakeStats := stats.NewFakeStats()
+
+ delay := 10 * time.Millisecond
+
+ bh := &S3BackupHandle{
+ client: &s3FakeClient{delay: delay},
+ bs: &S3BackupStorage{
+ params: backupstorage.Params{
+ Logger: logutil.NewMemoryLogger(),
+ Stats: fakeStats,
+ },
+ },
+ readOnly: false,
+ }
+
+ for i := 0; i < 4; i++ {
+ wc, err := bh.AddFile(aws.BackgroundContext(), fmt.Sprintf("somefile-%d", i), 100000)
+ require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
+ assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser")
+
+ n, err := wc.Write([]byte("here are some bytes"))
+ require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err)
+
+ err = wc.Close()
+ require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err)
+ }
+
+ bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error
+
+ require.Equal(t, bh.HasErrors(), false, "AddFile() expected bh not to record async errors but did")
+
+ require.Len(t, fakeStats.ScopeCalls, 4)
+ scopedStats := fakeStats.ScopeReturns[0]
+ require.Len(t, scopedStats.ScopeV, 1)
+ require.Equal(t, scopedStats.ScopeV[stats.ScopeOperation], "AWS:Request:Send")
+ require.Len(t, scopedStats.TimedIncrementCalls, 1)
+ require.GreaterOrEqual(t, scopedStats.TimedIncrementCalls[0], delay)
+ require.Len(t, scopedStats.TimedIncrementBytesCalls, 0)
+}
+
+func TestAddFileErrorStats(t *testing.T) {
+ fakeStats := stats.NewFakeStats()
+
+ delay := 10 * time.Millisecond
+
+ bh := &S3BackupHandle{
+ client: &s3FakeClient{
+ delay: delay,
+ err: errors.New("some error"),
+ },
+ bs: &S3BackupStorage{
+ params: backupstorage.Params{
+ Logger: logutil.NewMemoryLogger(),
+ Stats: fakeStats,
+ },
+ },
+ readOnly: false,
+ }
wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000)
require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
@@ -43,7 +147,15 @@ func TestAddFileError(t *testing.T) {
bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error
- require.Equal(t, bh.HasErrors(), true, "AddFile() expected bh to record async error but did not")
+ require.True(t, bh.HasErrors(), "AddFile() expected bh not to record async errors but did")
+
+ require.Len(t, fakeStats.ScopeCalls, 1)
+ scopedStats := fakeStats.ScopeReturns[0]
+ require.Len(t, scopedStats.ScopeV, 1)
+ require.Equal(t, scopedStats.ScopeV[stats.ScopeOperation], "AWS:Request:Send")
+ require.Len(t, scopedStats.TimedIncrementCalls, 1)
+ require.GreaterOrEqual(t, scopedStats.TimedIncrementCalls[0], delay)
+ require.Len(t, scopedStats.TimedIncrementBytesCalls, 0)
}
func TestNoSSE(t *testing.T) {