Skip to content

Commit

Permalink
always listen on context canceled waiting on channels (#1669)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana authored Jun 27, 2022
1 parent 40b13d9 commit 67689fe
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Receive each part number from the channel allowing three parallel uploads.
for w := 1; w <= opts.getNumThreads(); w++ {
go func(w int, partSize int64) {
// Each worker will draw from the part channel and upload in parallel.
for uploadReq := range uploadPartsCh {
for {
var uploadReq uploadPartReq
var ok bool
select {
case <-ctx.Done():
return
case uploadReq, ok = <-uploadPartsCh:
if !ok {
return
}
// Each worker will draw from the part channel and upload in parallel.
}

// If partNumber was not uploaded we calculate the missing
// part offset and size. For all other part numbers we
Expand Down Expand Up @@ -214,17 +224,22 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Gather the responses as they occur and update any
// progress bar.
for u := 1; u <= totalPartsCount; u++ {
uploadRes := <-uploadedPartsCh
if uploadRes.Error != nil {
return UploadInfo{}, uploadRes.Error
select {
case <-ctx.Done():
return UploadInfo{}, ctx.Err()
case uploadRes := <-uploadedPartsCh:
if uploadRes.Error != nil {
return UploadInfo{}, uploadRes.Error
}

// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
// Store the parts to be completed in order.
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
})
}
// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
// Store the parts to be completed in order.
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
})
}

// Verify if we uploaded all the data.
Expand Down

0 comments on commit 67689fe

Please sign in to comment.