Skip to content

Commit

Permalink
fix(storage/transfermanager): WaitAndClose waits for Callbacks to fin…
Browse files Browse the repository at this point in the history
…ish (#10504)

Fixes #10502
  • Loading branch information
BrennaEpp committed Jul 3, 2024
1 parent 727b6c8 commit 0e81002
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec
}

if d.config.asynchronous {
go input.gatherObjectOutputs(outs, len(inputs))
d.downloadsInProgress.Add(1)
go d.gatherObjectOutputs(input, outs, len(inputs))
}
d.addNewInputs(inputs)
return nil
Expand Down Expand Up @@ -359,6 +360,22 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
d.addResult(in, shardOut)
}

// gatherObjectOutputs receives from the given channel exactly numObjects times.
// It will execute the callback once all object outputs are received.
// It does not do any verification on the outputs nor does it cancel other
// objects on error.
func (d *Downloader) gatherObjectOutputs(in *DownloadDirectoryInput, gatherOuts <-chan DownloadOutput, numObjects int) {
outs := make([]DownloadOutput, 0, numObjects)
for i := 0; i < numObjects; i++ {
obj := <-gatherOuts
outs = append(outs, obj)
}

// All objects have been gathered; execute the callback.
in.Callback(outs)
d.downloadsInProgress.Done()
}

func (d *Downloader) validateObjectInput(in *DownloadObjectInput) error {
if d.config.asynchronous && in.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
Expand Down Expand Up @@ -567,27 +584,13 @@ type DownloadDirectoryInput struct {
// Callback will run after all the objects in the directory as selected by
// the provided filters are finished downloading.
// It must be set if and only if the [WithCallbacks] option is set.
// WaitAndClose will wait for all callbacks to finish.
Callback func([]DownloadOutput)

// OnObjectDownload will run after every finished object download. Optional.
OnObjectDownload func(*DownloadOutput)
}

// gatherObjectOutputs receives from the given channel exactly numObjects times.
// It will call the callback once all object outputs are received.
// It does not do any verification on the outputs nor does it cancel other
// objects on error.
func (dirin *DownloadDirectoryInput) gatherObjectOutputs(gatherOuts <-chan DownloadOutput, numObjects int) {
outs := make([]DownloadOutput, 0, numObjects)
for i := 0; i < numObjects; i++ {
obj := <-gatherOuts
outs = append(outs, obj)
}

// All objects have been gathered; execute the callback.
dirin.Callback(outs)
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
Expand Down

0 comments on commit 0e81002

Please sign in to comment.