From 1c1f43313578ece6648c1dd7c93d94596b7a4302 Mon Sep 17 00:00:00 2001 From: AlbeeSo <83764662+AlbeeSo@users.noreply.github.com> Date: Sat, 6 Apr 2024 09:45:41 +0800 Subject: [PATCH] fix: use multipart upload method to put files larger than 5Gi to OSS. Fixes #12877 (#12897) Signed-off-by: AlbeeSo Co-authored-by: shuangkun (cherry picked from commit d2369c977d1c500d3aa9a4dae8352b7008b35f79) --- workflow/artifacts/oss/oss.go | 61 ++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/workflow/artifacts/oss/oss.go b/workflow/artifacts/oss/oss.go index a49b705f438a..d414a4c02a73 100644 --- a/workflow/artifacts/oss/oss.go +++ b/workflow/artifacts/oss/oss.go @@ -3,6 +3,7 @@ package oss import ( "fmt" "io" + "math" "os" "path/filepath" "strings" @@ -41,6 +42,7 @@ var ( // OSS error code reference: https://error-center.alibabacloud.com/status/product/Oss ossTransientErrorCodes = []string{"RequestTimeout", "QuotaExceeded.Refresh", "Default", "ServiceUnavailable", "Throttling", "RequestTimeTooSkewed", "SocketException", "SocketTimeout", "ServiceBusy", "DomainNetWorkVisitedException", "ConnectionTimeout", "CachedTimeTooLarge"} bucketLogFilePrefix = "bucket-log-" + maxObjectSize = int64(5 * 1024 * 1024 * 1024) ) type ossCredentials struct { @@ -337,9 +339,66 @@ func isTransientOSSErr(err error) bool { return false } +// OSS simple upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/simple-upload?spm=a2c63.p38356.0.0.2c072398fh5k3W#section-ym8-svm-rmu +func simpleUpload(bucket *oss.Bucket, objectName, path string) error { + log.Info("OSS Simple Uploading") + return bucket.PutObjectFromFile(objectName, path) +} + +// OSS multipart upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload?spm=a2c63.p38356.0.0.4ebe423fzsaPiN#section-trz-mpy-tes +func multipartUpload(bucket *oss.Bucket, objectName, path string, objectSize int64) error { + log.Info("OSS Multipart Uploading") + // Calculate the number of chunks + chunkNum := int(math.Ceil(float64(objectSize)/float64(maxObjectSize))) + 1 + chunks, err := oss.SplitFileByPartNum(path, chunkNum) + if err != nil { + return err + } + fd, err := os.Open(filepath.Clean(path)) + if err != nil { + return err + } + defer fd.Close() + // Initialize a multipart upload event. + imur, err := bucket.InitiateMultipartUpload(objectName) + if err != nil { + return err + } + // Upload the chunks. + var parts []oss.UploadPart + for _, chunk := range chunks { + _, err := fd.Seek(chunk.Offset, io.SeekStart) + if err != nil { + return err + } + // Call the UploadPart method to upload each chunck. + part, err := bucket.UploadPart(imur, fd, chunk.Size, chunk.Number) + if err != nil { + log.Warnf("Upload part error: %v", err) + return err + } + log.Infof("Upload part number: %v, ETag: %v", part.PartNumber, part.ETag) + parts = append(parts, part) + } + _, err = bucket.CompleteMultipartUpload(imur, parts) + if err != nil { + log.Warnf("Complete multipart upload error: %v", err) + return err + } + return nil +} + func putFile(bucket *oss.Bucket, objectName, path string) error { log.Debugf("putFile from %s to %s", path, objectName) - return bucket.PutObjectFromFile(objectName, path) + fStat, err := os.Stat(path) + if err != nil { + return err + } + // Determine upload method based on file size. + if fStat.Size() <= maxObjectSize { + return simpleUpload(bucket, objectName, path) + } + return multipartUpload(bucket, objectName, path, fStat.Size()) } func putDirectory(bucket *oss.Bucket, objectName, dir string) error {