Skip to content

Commit

Permalink
added a parameter to specify the maxPartSize
Browse files Browse the repository at this point in the history
this parameter will try to group files up to this value
if there is an object in the tar greater than this value
then this object will have its own part of the same size
as the object
  • Loading branch information
rem7 committed Jan 31, 2024
1 parent 0f1fbd2 commit 8e15385
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
13 changes: 13 additions & 0 deletions cmd/s3tar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func run(args []string) error {
var maxAttempts int
var concatInMemory bool
var urlDecode bool
var userPartMaxSize int64

cli.VersionFlag = &cli.BoolFlag{
Name: "print-version",
Expand Down Expand Up @@ -205,6 +206,12 @@ func run(args []string) error {
Usage: "url decode the key value from the manifest",
Destination: &urlDecode,
},
&cli.Int64Flag{
Name: "max-part-size",
Value: 20,
Usage: "constrain the max part size of MPU, in MB",
Destination: &userPartMaxSize,
},
},
Action: func(cCtx *cli.Context) error {
logLevel := parseLogLevel(cCtx.Count("verbose"))
Expand Down Expand Up @@ -241,6 +248,11 @@ func run(args []string) error {
if create {
src := cCtx.Args().First() // TODO implement dir list

if userPartMaxSize < 5 || userPartMaxSize > 5000 {
exitError(6, "max-part-size should be >= 5 and < 5000")
}
userPartMaxSize = userPartMaxSize * 1024 * 1024

s3opts := &s3tar.S3TarS3Options{
SrcManifest: manifestPath,
SkipManifestHeader: skipManifestHeader,
Expand All @@ -250,6 +262,7 @@ func run(args []string) error {
EndpointUrl: endpointUrl,
ConcatInMemory: concatInMemory,
UrlDecode: urlDecode,
UserMaxPartSize: userPartMaxSize,
}
s3opts.DstBucket, s3opts.DstKey = s3tar.ExtractBucketAndPath(archiveFile)
s3opts.DstPrefix = filepath.Dir(s3opts.DstKey)
Expand Down
8 changes: 6 additions & 2 deletions mem_concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func buildInMemoryConcat(ctx context.Context, client *s3.Client, objectList []*S
return uploadObject(ctx, client, opts.DstBucket, opts.DstKey, data, opts.storageClass)
} else {

sizeLimit := findMinimumPartSize(estimatedSize, largestObjectSize)
sizeLimit := findMinimumPartSize(estimatedSize, opts.UserMaxPartSize)

Infof(ctx, "mpu partsize: %s, estimated ram usage: %s\n", formatBytes(sizeLimit), formatBytes(sizeLimit*int64(threads)*3))
Infof(ctx, "mpu partsize: %s, estimated ram usage: %s\nlargestObject: %d\n", formatBytes(sizeLimit), formatBytes(sizeLimit*int64(threads)*3), largestObjectSize)

// TODO: fix TOC to be pre-appended
// tocObj, _, err := buildToc(ctx, objectList)
Expand All @@ -45,6 +45,7 @@ func buildInMemoryConcat(ctx context.Context, client *s3.Client, objectList []*S
return nil, fmt.Errorf("number of parts exceeded the number of mpu parts allowed\n")
}

Infof(ctx, "number of parts: %d\n", len(groups))
// create MPU
mpu, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: &opts.DstBucket,
Expand Down Expand Up @@ -146,11 +147,14 @@ func sumSlice[T int | int32 | int64 | float64](i []T) (o T) {

func findLargestObject(objectList []*S3Obj) int64 {
var largestObject int64 = 0
var largestObjectKey string
for _, o := range objectList {
if *o.Size > largestObject {
largestObject = *o.Size
largestObjectKey = *o.Key
}
}
fmt.Printf("largestObjectKey: %s\n", largestObjectKey)
return largestObject
}

Expand Down
24 changes: 16 additions & 8 deletions s3tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,21 +601,29 @@ func _processSmallFiles(ctx context.Context, objectList []*S3Obj, start, end int
// as possible. This is helpful to parallelize the workload even more.
// findMinimumPartSize will start at 10MB and increment by 5MB until we're
// within the 10,000 MPU part limit
func findMinimumPartSize(finalSizeBytes, startingSize int64) int64 {
func findMinimumPartSize(finalSizeBytes, userMaxSize int64) int64 {

const fiveMB = beginningPad
partSize := startingSize
if startingSize < fiveMB {
partSize = fiveMB
}
if startingSize > partSizeMax {
log.Fatal("part size maximum cannot exceed 5GiB")
if userMaxSize == 0 {
userMaxSize = partSizeMax
}

const fiveMB = beginningPad
partSize := int64(fiveMB)

for ; partSize <= partSizeMax; partSize = partSize + fiveMB {
if finalSizeBytes/int64(partSize) < maxPartNumLimit {
break
}
}

if partSize > userMaxSize {
partSize = userMaxSize
}

if partSize > partSizeMax {
log.Fatal("part size maximum cannot exceed 5GiB")
}

return partSize
}

Expand Down
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type S3TarS3Options struct {
extractPrefix string
ConcatInMemory bool
UrlDecode bool
UserMaxPartSize int64
}

func (o *S3TarS3Options) Copy() S3TarS3Options {
Expand Down

0 comments on commit 8e15385

Please sign in to comment.