Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/vt/mysqlctl: instrument s3 upload time #12500

Merged
merged 11 commits into from
Sep 20, 2023
18 changes: 16 additions & 2 deletions changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
- [Local examples now use etcd v3 storage and API](#local-examples-etcd-v3)
- **[New command line flags and behavior](#new-flag)**
- [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers)
- [VTOrc flag `--change-tablets-with-errant-gtid-to-drained`](#new-flag-errant-gtid-convert)
- [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag)
- [VTOrc flag `--change-tablets-with-errant-gtid-to-drained`](#new-flag-errant-gtid-convert)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this back to Line 10? That is a better ordering.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry! Was unintentional but should have noticed this

- **[VTAdmin](#vtadmin)**
- [Updated to node v18.16.0](#update-node)
- **[Deprecations and Deletions](#deprecations-and-deletions)**
Expand All @@ -19,6 +19,7 @@
- **[New stats](#new-stats)**
- [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters)
- [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status)
- [Backup and restore metrics for AWS S3](#backup-restore-metrics-aws-s3)
- **[VTTablet](#vttablet)**
- [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences)
- **[Docker](#docker)**
Expand Down Expand Up @@ -63,6 +64,8 @@ for a response from all the tablets. Originally `EmergencyReparentShard` was mea
We have realized now that there are cases when the replication is broken but all the tablets are reachable. In these cases, it is advisable to
call `EmergencyReparentShard` with `--wait-for-all-tablets` so that it doesn't ignore one of the tablets.

#### <a id="new-flag-toggle-ers"/>VTOrc flag `--allow-emergency-reparent`

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines should go. They duplicate lines 45-46.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, yes

### <a id="vtadmin"/>VTAdmin

#### <a id="updated-node"/>vtadmin-web updated to node v18.16.0 (LTS)
Expand Down Expand Up @@ -129,6 +132,17 @@ The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found
* `Stalled` is set to `1` when replication stops advancing.
* `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary.

#### <a id="backup-restore-metrics-aws-s3"/>Backup and restore metrics for AWS S3

Requests to AWS S3 are instrumented in backup and restore metrics. For example:

```
vtbackup_backup_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 823
vtbackup_backup_duration_nanoseconds{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 1.33632421437e+11
vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
vtbackup_restore_count{component="BackupStorage",implementation="S3",operation="AWS:Request:Send"} 165
```

### <a id="vttablet"/>VTTablet

#### <a id="vttablet-new-rpc-reset-sequences"/>New ResetSequences rpc
Expand Down Expand Up @@ -165,4 +179,4 @@ removing Vitess support.

#### <a id="new-durability-policies"/>New Durability Policies

2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
30 changes: 14 additions & 16 deletions go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,25 @@ func TestExecuteBackup(t *testing.T) {
var sourceReadStats int

for _, sr := range fakeStats.ScopeReturns {
sfs := sr.(*backupstats.FakeStats)
switch sfs.ScopeV[backupstats.ScopeOperation] {
switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}

Expand Down Expand Up @@ -529,26 +528,25 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) {
var sourceReadStats int

for _, sr := range fakeStats.ScopeReturns {
sfs := sr.(*backupstats.FakeStats)
switch sfs.ScopeV[backupstats.ScopeOperation] {
switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
destinationCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Open":
destinationOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Destination:Write":
destinationWriteStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
case "Source:Close":
sourceCloseStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Open":
sourceOpenStats++
require.Len(t, sfs.TimedIncrementCalls, 1)
require.Len(t, sr.TimedIncrementCalls, 1)
case "Source:Read":
sourceReadStats++
require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1)
require.GreaterOrEqual(t, len(sr.TimedIncrementBytesCalls), 1)
}
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBackupExecutesBackupWithScopedParams(t *testing.T) {
var executeBackupStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeBackupParams.Stats {
executeBackupStats = sr.(*backupstats.FakeStats)
executeBackupStats = sr
}
}
require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
storageStats = sr.(*backupstats.FakeStats)
storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestRestoreExecutesRestoreWithScopedParams(t *testing.T) {
var executeRestoreStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeRestoreParams.Stats {
executeRestoreStats = sr.(*backupstats.FakeStats)
executeRestoreStats = sr
}
}
require.Contains(t, executeRestoreStats.ScopeV, backupstats.ScopeComponent)
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestRestoreParameterizesBackupStorageWithScopedStats(t *testing.T) {
var storageStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
storageStats = sr.(*backupstats.FakeStats)
storageStats = sr
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backupstats/fake_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type FakeStats struct {
Duration time.Duration
}
ScopeCalls [][]Scope
ScopeReturns []Stats
ScopeReturns []*FakeStats
mutex sync.Mutex
}

Expand Down
23 changes: 17 additions & 6 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
Expand All @@ -48,6 +49,7 @@ import (

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/servenv"
)
Expand Down Expand Up @@ -170,7 +172,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
u.PartSize = partSizeBytes
})
object := objName(bh.dir, bh.name, filename)

sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: &bucket,
Key: object,
Expand All @@ -179,7 +181,11 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
})
}, s3manager.WithUploaderRequestOptions(func(r *request.Request) {
r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
sendStats.TimedIncrement(time.Since(r.AttemptTime))
})
}))
if err != nil {
reader.CloseWithError(err)
bh.RecordError(err)
Expand Down Expand Up @@ -212,12 +218,17 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
object := objName(bh.dir, bh.name, filename)
out, err := bh.client.GetObject(&s3.GetObjectInput{
sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send"))
out, err := bh.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully sure of the implications of switching to GetObjectWithContext.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change.

Bucket: &bucket,
Key: object,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
}, func(r *request.Request) {
r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) {
deepthi marked this conversation as resolved.
Show resolved Hide resolved
sendStats.TimedIncrement(time.Since(r.AttemptTime))
})
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -272,6 +283,7 @@ type S3BackupStorage struct {
_client *s3.S3
mu sync.Mutex
s3SSE S3ServerSideEncryption
params backupstorage.Params
}

// ListBackups is part of the backupstorage.BackupStorage interface.
Expand Down Expand Up @@ -411,8 +423,7 @@ func (bs *S3BackupStorage) Close() error {
}

func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
// TODO(maxeng): return a new S3BackupStorage that uses params.
return bs
return &S3BackupStorage{params: params}
}

var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil)
Expand Down Expand Up @@ -485,7 +496,7 @@ func objName(parts ...string) *string {
}

func init() {
backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{}
backupstorage.BackupStorageMap["s3"] = &S3BackupStorage{params: backupstorage.NoParams()}

logNameMap = logNameToLogLevel{
"LogOff": aws.LogOff,
Expand Down
Loading
Loading