From ff0f5c8bab4db3d587e37cce6df7188c3e6400ab Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 18 Sep 2024 17:28:33 -0600 Subject: [PATCH 1/3] Use a context in S3's AddFiles Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/s3backupstorage/s3.go | 3 +-- go/vt/mysqlctl/xtrabackupengine.go | 9 ++++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 1af1362ae30..9bec0ed5bbb 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -182,8 +182,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize }) object := objName(bh.dir, bh.name, filename) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) - // Using UploadWithContext breaks uploading to Minio and Ceph https://github.com/vitessio/vitess/issues/14188 - _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ + _, err := uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: &bucket, Key: &object, Body: reader, diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 784d718af26..daba8bb1ec3 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -316,8 +316,15 @@ func (be *XtrabackupEngine) backupFiles( // would impose a timeout that starts counting right now, so it would // include the time spent uploading the file content. We only want to impose // a timeout on the final Close() step. + // This context also allows us to immediately abort AddFiles if we encountered + // an error in this function. addFilesCtx, cancelAddFiles := context.WithCancel(ctx) - defer cancelAddFiles() + defer func() { + if finalErr != nil { + cancelAddFiles() + } + }() + destFiles, err := addStripeFiles(addFilesCtx, params, bh, backupFileName, numStripes) if err != nil { return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) From 3f591d98ab0b98cdb1d8cdff741b9d80246c5676 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 20 Sep 2024 15:50:10 -0600 Subject: [PATCH 2/3] Fix ctx in builtin and fix endpoint resolver Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/builtinbackupengine.go | 10 +++++++--- go/vt/mysqlctl/s3backupstorage/s3.go | 21 ++++++++++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 494d765f2a9..b8350a731d4 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -399,8 +399,8 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac // Save initial state so we can restore. replicaStartRequired := false sourceIsPrimary := false - superReadOnly := true //nolint - readOnly := true //nolint + superReadOnly := true // nolint + readOnly := true // nolint var replicationPosition replication.Position semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled(ctx) @@ -793,7 +793,11 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + if finalErr != nil { + cancel() + } + }() // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 9bec0ed5bbb..eb696e09786 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -47,6 +47,8 @@ import ( "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" + smithyendpoints "github.com/aws/smithy-go/endpoints" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" stats "vitess.io/vitess/go/vt/mysqlctl/backupstats" @@ -110,6 +112,23 @@ var logNameMap logNameToLogLevel const sseCustomerPrefix = "sse_c:" +type endpointResolver struct { + r s3.EndpointResolverV2 + endpoint *string +} + +func (er *endpointResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) { + params.Endpoint = er.endpoint + return er.r.ResolveEndpoint(ctx, params) +} + +func newEndpointResolver() *endpointResolver { + return &endpointResolver{ + r: s3.NewDefaultEndpointResolverV2(), + endpoint: &endpoint, + } +} + type iClient interface { manager.UploadAPIClient manager.DownloadAPIClient @@ -493,7 +512,7 @@ func (bs *S3BackupStorage) client() (*s3.Client, error) { o.RetryMaxAttempts = retryCount o.Retryer = &ClosedConnectionRetryer{} } - }) + }, s3.WithEndpointResolverV2(newEndpointResolver())) if len(bucket) == 0 { return nil, fmt.Errorf("--s3_backup_storage_bucket required") From 1102c592c6a1da63a6849a6819647ef82e28571c Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 24 Sep 2024 10:25:27 -0600 Subject: [PATCH 3/3] review suggestion Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/s3backupstorage/s3.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index eb696e09786..b3a8117aafa 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -44,11 +44,10 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + transport "github.com/aws/smithy-go/endpoints" "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" - smithyendpoints "github.com/aws/smithy-go/endpoints" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" stats "vitess.io/vitess/go/vt/mysqlctl/backupstats" @@ -117,7 +116,7 @@ type endpointResolver struct { endpoint *string } -func (er *endpointResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) { +func (er *endpointResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (transport.Endpoint, error) { params.Endpoint = er.endpoint return er.r.ResolveEndpoint(ctx, params) }