Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid data race by copying the buffer #1357

Merged
merged 1 commit into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,15 @@ func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketNa
uploadPartsCh <- uploadPartReq{PartNum: p}
}
close(uploadPartsCh)

var partsBuf = make([][]byte, opts.getNumThreads())
for i := range partsBuf {
partsBuf[i] = make([]byte, partSize)
}

// Receive each part number from the channel allowing three parallel uploads.
for w := 1; w <= opts.getNumThreads(); w++ {
go func(partSize int64) {
go func(w int, partSize int64) {
// Each worker will draw from the part channel and upload in parallel.
for uploadReq := range uploadPartsCh {

Expand All @@ -164,12 +170,21 @@ func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketNa
partSize = lastPartSize
}

n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize])
if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
uploadedPartsCh <- uploadedPartRes{
Error: rerr,
}
// Exit the goroutine.
return
}

// Get a section reader on a particular offset.
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress)

// Proceed to upload the part.
objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID,
sectionReader, uploadReq.PartNum,
objPart, err := c.uploadPart(ctx, bucketName, objectName,
uploadID, hookReader, uploadReq.PartNum,
"", "", partSize, opts.ServerSideEncryption)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Expand All @@ -189,7 +204,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketNa
Part: uploadReq.Part,
}
}
}(partSize)
}(w, partSize)
}

// Gather the responses as they occur and update any
Expand Down
10 changes: 5 additions & 5 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ var successStatus = []int{
// request upon any error up to maxRetries attempts in a binomially
// delayed manner using a standard back off algorithm.
func (c Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) {
var isRetryable bool // Indicates if request can be retried.
var retryable bool // Indicates if request can be retried.
var bodySeeker io.Seeker // Extracted seeker from io.Reader.
var reqRetry = MaxRetry // Indicates how many times we can retry the request

Expand All @@ -526,13 +526,13 @@ func (c Client) executeMethod(ctx context.Context, method string, metadata reque

if metadata.contentBody != nil {
// Check if body is seekable then it is retryable.
bodySeeker, isRetryable = metadata.contentBody.(io.Seeker)
bodySeeker, retryable = metadata.contentBody.(io.Seeker)
switch bodySeeker {
case os.Stdin, os.Stdout, os.Stderr:
isRetryable = false
retryable = false
}
// Retry only when reader is seekable
if !isRetryable {
if !retryable {
reqRetry = 1
}

Expand All @@ -559,7 +559,7 @@ func (c Client) executeMethod(ctx context.Context, method string, metadata reque
// error until maxRetries have been exhausted, retry attempts are
// performed after waiting for a given period of time in a
// binomial fashion.
if isRetryable {
if retryable {
// Seek back to beginning for each attempt.
if _, err = bodySeeker.Seek(0, 0); err != nil {
// If seek failed, no need to retry.
Expand Down