Skip to content

Commit

Permalink
feat(storage/transfermanager): add DownloadDirectory (#10430)
Browse files Browse the repository at this point in the history
DownloadDirectory is a convenience function that allows selection of multiple objects to be downloaded in parallel to a local directory.
  • Loading branch information
BrennaEpp authored Jul 3, 2024
1 parent b660d68 commit 0d0e5dd
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 23 deletions.
220 changes: 203 additions & 17 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2/callctx"
"google.golang.org/api/iterator"
)

// Downloader manages a set of parallelized downloads.
Expand All @@ -51,24 +55,96 @@ type Downloader struct {
// set on the ctx may time out before the download even starts. To set a timeout
// that starts with the download, use the [WithPerOpTimeout()] option.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error {
if d.config.asynchronous && input.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && input.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
if d.closed() {
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
}

select {
case <-d.doneReceivingInputs:
return errors.New("transfermanager: WaitAndClose called before DownloadObject")
default:
if err := d.validateObjectInput(input); err != nil {
return err
}

input.ctx = ctx
d.addInput(input)
return nil
}

// DownloadDirectory queues the download of a set of objects to a local path.
// This will initiate the download but is non-blocking; call Downloader.Results
// or use the callback to process the result. DownloadDirectory is thread-safe
// and can be called simultaneously from different goroutines.
// DownloadDirectory will resolve any filters on the input and create the needed
// directory structure locally as the operations progress.
// Note: DownloadDirectory overwrites existing files in the directory.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error {
if d.closed() {
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
}
if err := d.validateDirectoryInput(input); err != nil {
return err
}

query := &storage.Query{
Prefix: input.Prefix,
StartOffset: input.StartOffset,
EndOffset: input.EndOffset,
MatchGlob: input.MatchGlob,
}
if err := query.SetAttrSelection([]string{"Name"}); err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory query.SetAttrSelection: %w", err)
}

// TODO: Clean up any created directory structure on failure.

objectsToQueue := []string{}
it := d.client.Bucket(input.Bucket).Objects(ctx, query)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to list objects: %w", err)
}

objectsToQueue = append(objectsToQueue, attrs.Name)
}

outs := make(chan DownloadOutput, len(objectsToQueue))
inputs := make([]DownloadObjectInput, 0, len(objectsToQueue))

for _, object := range objectsToQueue {
objDirectory := filepath.Join(input.LocalDirectory, filepath.Dir(object))
filePath := filepath.Join(input.LocalDirectory, object)

// Make sure all directories in the object path exist.
err := os.MkdirAll(objDirectory, fs.ModeDir|fs.ModePerm)
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to make directory(%q): %w", objDirectory, err)
}

// Create file to download to.
f, fErr := os.Create(filePath)
if fErr != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to create file(%q): %w", filePath, fErr)
}

inputs = append(inputs, DownloadObjectInput{
Bucket: input.Bucket,
Object: object,
Destination: f,
Callback: input.OnObjectDownload,
ctx: ctx,
directory: true,
directoryObjectOutputs: outs,
})
}

if d.config.asynchronous {
go input.gatherObjectOutputs(outs, len(inputs))
}
d.addNewInputs(inputs)
return nil
}

// WaitAndClose waits for all outstanding downloads to complete and closes the
// Downloader. Adding new downloads after this has been called will cause an error.
//
Expand Down Expand Up @@ -143,14 +219,37 @@ func (d *Downloader) addInput(input *DownloadObjectInput) {
d.inputsMu.Unlock()
}

// addNewInputs adds a slice of inputs to the downloader.
// This should only be used to queue new objects.
func (d *Downloader) addNewInputs(inputs []DownloadObjectInput) {
d.downloadsInProgress.Add(len(inputs))

d.inputsMu.Lock()
d.inputs = append(d.inputs, inputs...)
d.inputsMu.Unlock()
}

func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) {
copiedResult := *result // make a copy so that callbacks do not affect the result

if input.directory {
f := input.Destination.(*os.File)
if err := f.Close(); err != nil && result.Err == nil {
result.Err = fmt.Errorf("closing file(%q): %w", f.Name(), err)
}

if d.config.asynchronous {
input.directoryObjectOutputs <- copiedResult
}
}
// TODO: check checksum if full object

if d.config.asynchronous {
if d.config.asynchronous || input.directory {
input.Callback(result)
} else {
}
if !d.config.asynchronous {
d.resultsMu.Lock()
d.results = append(d.results, *result)
d.results = append(d.results, copiedResult)
d.resultsMu.Unlock()
}

Expand Down Expand Up @@ -260,6 +359,35 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
d.addResult(in, shardOut)
}

func (d *Downloader) validateObjectInput(in *DownloadObjectInput) error {
if d.config.asynchronous && in.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && in.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}
return nil
}

func (d *Downloader) validateDirectoryInput(in *DownloadDirectoryInput) error {
if d.config.asynchronous && in.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && in.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}
return nil
}

func (d *Downloader) closed() bool {
select {
case <-d.doneReceivingInputs:
return true
default:
return false
}
}

// NewDownloader creates a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
// The returned Downloader can be shared across goroutines to initiate downloads.
Expand Down Expand Up @@ -326,10 +454,12 @@ type DownloadObjectInput struct {
// finish.
Callback func(*DownloadOutput)

ctx context.Context
cancelCtx context.CancelCauseFunc
shard int // the piece of the object range that should be downloaded
shardOutputs chan<- *DownloadOutput
ctx context.Context
cancelCtx context.CancelCauseFunc
shard int // the piece of the object range that should be downloaded
shardOutputs chan<- *DownloadOutput
directory bool // input was queued by calling DownloadDirectory
directoryObjectOutputs chan<- DownloadOutput
}

// downloadShard will read a specific object piece into in.Destination.
Expand Down Expand Up @@ -402,6 +532,62 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
return
}

// DownloadDirectoryInput is the input for a directory to download.
type DownloadDirectoryInput struct {
// Bucket is the bucket in GCS to download from. Required.
Bucket string

// LocalDirectory specifies the directory to download the matched objects
// to. Relative paths are allowed. The directory structure and contents
// must not be modified while the download is in progress.
// The directory will be created if it does not already exist. Required.
LocalDirectory string

// Prefix is the prefix filter to download objects whose names begin with this.
// Optional.
Prefix string

// StartOffset is used to filter results to objects whose names are
// lexicographically equal to or after startOffset. If endOffset is also
// set, the objects listed will have names between startOffset (inclusive)
// and endOffset (exclusive). Optional.
StartOffset string

// EndOffset is used to filter results to objects whose names are
// lexicographically before endOffset. If startOffset is also set, the
// objects listed will have names between startOffset (inclusive) and
// endOffset (exclusive). Optional.
EndOffset string

// MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
// https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
// for syntax details. Optional.
MatchGlob string

// Callback will run after all the objects in the directory as selected by
// the provided filters are finished downloading.
// It must be set if and only if the [WithCallbacks] option is set.
Callback func([]DownloadOutput)

// OnObjectDownload will run after every finished object download. Optional.
OnObjectDownload func(*DownloadOutput)
}

// gatherObjectOutputs receives from the given channel exactly numObjects times.
// It will call the callback once all object outputs are received.
// It does not do any verification on the outputs nor does it cancel other
// objects on error.
func (dirin *DownloadDirectoryInput) gatherObjectOutputs(gatherOuts <-chan DownloadOutput, numObjects int) {
outs := make([]DownloadOutput, 0, numObjects)
for i := 0; i < numObjects; i++ {
obj := <-gatherOuts
outs = append(outs, obj)
}

// All objects have been gathered; execute the callback.
dirin.Callback(outs)
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
Expand Down
2 changes: 1 addition & 1 deletion storage/transfermanager/downloader_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestDownloadBufferParallel(t *testing.T) {
if err != nil {
t.Errorf("b.WriteAt: %v", err)
}
if n != 5 {
if n != step {
t.Errorf("expected to write 5 bytes, got %d", n)
}
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion storage/transfermanager/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestWaitAndClose(t *testing.T) {
t.Fatalf("WaitAndClose: %v", err)
}

expectedErr := "transfermanager: WaitAndClose called before DownloadObject"
expectedErr := "transfermanager: Downloader used after WaitAndClose was called"
err = d.DownloadObject(context.Background(), &DownloadObjectInput{})
if err == nil {
t.Fatalf("d.DownloadObject err was nil, should be %q", expectedErr)
Expand Down
Loading

0 comments on commit 0d0e5dd

Please sign in to comment.