Skip to content

Commit

Permalink
fix: use multipart upload method to put files larger than 5Gi to OSS. F…
Browse files Browse the repository at this point in the history
…ixes #12877 (#12897)

Signed-off-by: AlbeeSo <suyashi1321@163.com>
Co-authored-by: shuangkun <tsk2013uestc@163.com>
(cherry picked from commit d2369c9)
  • Loading branch information
AlbeeSo authored and agilgur5 committed Apr 19, 2024
1 parent 8c9a857 commit 1c1f433
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package oss
import (
"fmt"
"io"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -41,6 +42,7 @@ var (
// OSS error code reference: https://error-center.alibabacloud.com/status/product/Oss
ossTransientErrorCodes = []string{"RequestTimeout", "QuotaExceeded.Refresh", "Default", "ServiceUnavailable", "Throttling", "RequestTimeTooSkewed", "SocketException", "SocketTimeout", "ServiceBusy", "DomainNetWorkVisitedException", "ConnectionTimeout", "CachedTimeTooLarge"}
bucketLogFilePrefix = "bucket-log-"
maxObjectSize = int64(5 * 1024 * 1024 * 1024)
)

type ossCredentials struct {
Expand Down Expand Up @@ -337,9 +339,66 @@ func isTransientOSSErr(err error) bool {
return false
}

// OSS simple upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/simple-upload?spm=a2c63.p38356.0.0.2c072398fh5k3W#section-ym8-svm-rmu
func simpleUpload(bucket *oss.Bucket, objectName, path string) error {
log.Info("OSS Simple Uploading")
return bucket.PutObjectFromFile(objectName, path)
}

// OSS multipart upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload?spm=a2c63.p38356.0.0.4ebe423fzsaPiN#section-trz-mpy-tes
func multipartUpload(bucket *oss.Bucket, objectName, path string, objectSize int64) error {
log.Info("OSS Multipart Uploading")
// Calculate the number of chunks
chunkNum := int(math.Ceil(float64(objectSize)/float64(maxObjectSize))) + 1
chunks, err := oss.SplitFileByPartNum(path, chunkNum)
if err != nil {
return err
}
fd, err := os.Open(filepath.Clean(path))
if err != nil {
return err
}
defer fd.Close()
// Initialize a multipart upload event.
imur, err := bucket.InitiateMultipartUpload(objectName)
if err != nil {
return err
}
// Upload the chunks.
var parts []oss.UploadPart
for _, chunk := range chunks {
_, err := fd.Seek(chunk.Offset, io.SeekStart)
if err != nil {
return err
}
// Call the UploadPart method to upload each chunck.
part, err := bucket.UploadPart(imur, fd, chunk.Size, chunk.Number)
if err != nil {
log.Warnf("Upload part error: %v", err)
return err
}
log.Infof("Upload part number: %v, ETag: %v", part.PartNumber, part.ETag)
parts = append(parts, part)
}
_, err = bucket.CompleteMultipartUpload(imur, parts)
if err != nil {
log.Warnf("Complete multipart upload error: %v", err)
return err
}
return nil
}

func putFile(bucket *oss.Bucket, objectName, path string) error {
log.Debugf("putFile from %s to %s", path, objectName)
return bucket.PutObjectFromFile(objectName, path)
fStat, err := os.Stat(path)
if err != nil {
return err
}
// Determine upload method based on file size.
if fStat.Size() <= maxObjectSize {
return simpleUpload(bucket, objectName, path)
}
return multipartUpload(bucket, objectName, path, fStat.Size())
}

func putDirectory(bucket *oss.Bucket, objectName, dir string) error {
Expand Down

0 comments on commit 1c1f433

Please sign in to comment.