Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use multipart upload method to put files larger than 5Gi to OSS. Fixes #12877 #12897

Merged
merged 2 commits into from
Apr 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package oss
import (
"fmt"
"io"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -41,6 +42,7 @@ var (
// OSS error code reference: https://error-center.alibabacloud.com/status/product/Oss
ossTransientErrorCodes = []string{"RequestTimeout", "QuotaExceeded.Refresh", "Default", "ServiceUnavailable", "Throttling", "RequestTimeTooSkewed", "SocketException", "SocketTimeout", "ServiceBusy", "DomainNetWorkVisitedException", "ConnectionTimeout", "CachedTimeTooLarge"}
bucketLogFilePrefix = "bucket-log-"
maxObjectSize = int64(5 * 1024 * 1024 * 1024)
)

type ossCredentials struct {
Expand Down Expand Up @@ -337,9 +339,66 @@ func isTransientOSSErr(err error) bool {
return false
}

// OSS simple upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/simple-upload?spm=a2c63.p38356.0.0.2c072398fh5k3W#section-ym8-svm-rmu
func simpleUpload(bucket *oss.Bucket, objectName, path string) error {
log.Info("OSS Simple Uploading")
return bucket.PutObjectFromFile(objectName, path)
}

// OSS multipart upload code reference: https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload?spm=a2c63.p38356.0.0.4ebe423fzsaPiN#section-trz-mpy-tes
func multipartUpload(bucket *oss.Bucket, objectName, path string, objectSize int64) error {
Comment on lines +348 to +349
Copy link
Contributor

@agilgur5 agilgur5 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is mostly taken from the Alibaba docs, but I wonder if the chunking / UploadPart section could be parallelized?

If so, could potentially contribute that back upstream if Alibaba's docs are open source (idk?)

log.Info("OSS Multipart Uploading")
AlbeeSo marked this conversation as resolved.
Show resolved Hide resolved
// Calculate the number of chunks
chunkNum := int(math.Ceil(float64(objectSize)/float64(maxObjectSize))) + 1
chunks, err := oss.SplitFileByPartNum(path, chunkNum)
if err != nil {
return err
}
fd, err := os.Open(filepath.Clean(path))
if err != nil {
return err
}
defer fd.Close()
// Initialize a multipart upload event.
imur, err := bucket.InitiateMultipartUpload(objectName)
if err != nil {
return err
}
// Upload the chunks.
var parts []oss.UploadPart
for _, chunk := range chunks {
_, err := fd.Seek(chunk.Offset, io.SeekStart)
if err != nil {
return err
}
// Call the UploadPart method to upload each chunck.
part, err := bucket.UploadPart(imur, fd, chunk.Size, chunk.Number)
if err != nil {
log.Warnf("Upload part error: %v", err)
return err
}
log.Infof("Upload part number: %v, ETag: %v", part.PartNumber, part.ETag)
parts = append(parts, part)
}
_, err = bucket.CompleteMultipartUpload(imur, parts)
if err != nil {
log.Warnf("Complete multipart upload error: %v", err)
return err
}
return nil
}

func putFile(bucket *oss.Bucket, objectName, path string) error {
log.Debugf("putFile from %s to %s", path, objectName)
return bucket.PutObjectFromFile(objectName, path)
fStat, err := os.Stat(path)
if err != nil {
return err
}
// Determine upload method based on file size.
if fStat.Size() <= maxObjectSize {
return simpleUpload(bucket, objectName, path)
}
return multipartUpload(bucket, objectName, path, fStat.Size())
}

func putDirectory(bucket *oss.Bucket, objectName, dir string) error {
Expand Down
Loading