diff --git a/workflow/artifacts/oss/oss.go b/workflow/artifacts/oss/oss.go index 72d3bd71d92c..e0a064d9ccea 100644 --- a/workflow/artifacts/oss/oss.go +++ b/workflow/artifacts/oss/oss.go @@ -3,6 +3,7 @@ package oss import ( "fmt" "io" + "math" "os" "path/filepath" "strings" @@ -41,6 +42,7 @@ var ( // OSS error code reference: https://error-center.alibabacloud.com/status/product/Oss ossTransientErrorCodes = []string{"RequestTimeout", "QuotaExceeded.Refresh", "Default", "ServiceUnavailable", "Throttling", "RequestTimeTooSkewed", "SocketException", "SocketTimeout", "ServiceBusy", "DomainNetWorkVisitedException", "ConnectionTimeout", "CachedTimeTooLarge"} bucketLogFilePrefix = "bucket-log-" + maxObjectSize = int64(5 * 1024 * 1024 * 1024) ) type ossCredentials struct { @@ -339,7 +341,51 @@ func isTransientOSSErr(err error) bool { func putFile(bucket *oss.Bucket, objectName, path string) error { log.Debugf("putFile from %s to %s", path, objectName) - return bucket.PutObjectFromFile(objectName, path) + fStat, err := os.Stat(path) + if err != nil { + return err + } + // Determine upload method based on file size. + if fStat.Size() <= maxObjectSize { + log.Info("OSS Simple Uploading") + return bucket.PutObjectFromFile(objectName, path) + } + log.Info("OSS Multipart Uploading") + // Calculate the number of chunks + chunkNum := int(math.Ceil(float64(fStat.Size())/float64(maxObjectSize))) + 1 + chunks, err := oss.SplitFileByPartNum(path, chunkNum) + if err != nil { + return err + } + fd, err := os.Open(filepath.Clean(path)) + if err != nil { + return err + } + defer fd.Close() + // Initialize a multipart upload event. + imur, err := bucket.InitiateMultipartUpload(objectName) + if err != nil { + return err + } + // Upload the chunks. + var parts []oss.UploadPart + for _, chunk := range chunks { + fd.Seek(chunk.Offset, io.SeekStart) + // Call the UploadPart method to upload each chunck. + part, err := bucket.UploadPart(imur, fd, chunk.Size, chunk.Number) + if err != nil { + log.Warnf("Upload part error: %v", err) + return err + } + log.Infof("Upload part number: %v, ETag: %v", part.PartNumber, part.ETag) + parts = append(parts, part) + } + _, err = bucket.CompleteMultipartUpload(imur, parts) + if err != nil { + log.Warnf("Complete multipart upload error: %v", err) + return err + } + return nil } func putDirectory(bucket *oss.Bucket, objectName, dir string) error {