Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcsupload: limit maximum upload concurrency #16658

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions prow/apis/prowjobs/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ type GCSConfiguration struct {
// builtin's and the local system's defaults. This maps extensions
// to media types, for example: MediaTypes["log"] = "text/plain"
MediaTypes map[string]string `json:"mediaTypes,omitempty"`
// MaxConcurrency limits the concurrency used when pushing
// data to GCS. Defaults to 4 concurrent uploads per available CPU.
MaxConcurrency int64 `json:"max_concurrency,omitempty"`

// LocalOutputDir specifies a directory where files should be copied INSTEAD of uploading to GCS.
// This option is useful for testing jobs that use the pod-utilities without actually uploading.
Expand Down
5 changes: 5 additions & 0 deletions prow/gcsupload/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"flag"
"fmt"
"runtime"
"strings"

"github.com/GoogleCloudPlatform/testgrid/util/gcs"
Expand Down Expand Up @@ -114,6 +115,10 @@ func (o *Options) Complete(args []string) {
o.GCSConfiguration.MediaTypes[extension] = mediaType
}
o.mediaTypes = flagutil.NewStrings()

if o.GCSConfiguration.MaxConcurrency == 0 {
o.GCSConfiguration.MaxConcurrency = int64(4 * runtime.NumCPU())
}
}

// AddFlags adds flags to the FlagSet that populate
Expand Down
2 changes: 1 addition & 1 deletion prow/gcsupload/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (o Options) Run(spec *downwardapi.JobSpec, extra map[string]gcs.UploadFunc)
return fmt.Errorf("could not connect to GCS: %v", err)
}

if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets); err != nil {
if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets, o.MaxConcurrency); err != nil {
return fmt.Errorf("failed to upload to GCS: %v", err)
}
logrus.Info("Finished upload to GCS")
Expand Down
1 change: 1 addition & 0 deletions prow/pod-utils/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@com_google_cloud_go//storage:go_default_library",
"@org_golang_google_api//googleapi:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)

Expand Down
15 changes: 11 additions & 4 deletions prow/pod-utils/gcs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"cloud.google.com/go/storage"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"google.golang.org/api/googleapi"

"k8s.io/test-infra/prow/errorutil"
Expand All @@ -38,11 +39,11 @@ type destToWriter func(dest string) dataWriter
// Upload uploads all of the data in the
// uploadTargets map to GCS in parallel. The map is
// keyed on GCS path under the bucket
func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error {
func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc, maxConcurrency int64) error {
dtw := func(dest string) dataWriter {
return gcsObjectWriter{bucket.Object(dest).NewWriter(context.Background())}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of orthogonal to this PR, but we should be using a context with a timeout here so that we don't try to upload a file forever, especially now that that could prevent other files from being uploaded as well.

}
return upload(dtw, uploadTargets)
return upload(dtw, uploadTargets, maxConcurrency)
}

// LocalExport copies all of the data in the uploadTargets map to local files in parallel. The map
Expand All @@ -53,18 +54,24 @@ func LocalExport(exportDir string, uploadTargets map[string]UploadFunc) error {
filePath: path.Join(exportDir, dest),
}
}
return upload(dtw, uploadTargets)
return upload(dtw, uploadTargets, int64(len(uploadTargets))) // there is no use in limiting concurrency of local writes
}

func upload(dtw destToWriter, uploadTargets map[string]UploadFunc) error {
func upload(dtw destToWriter, uploadTargets map[string]UploadFunc, maxConcurrency int64) error {
errCh := make(chan error, len(uploadTargets))
group := &sync.WaitGroup{}
group.Add(len(uploadTargets))
sem := semaphore.NewWeighted(maxConcurrency)
for dest, upload := range uploadTargets {
log := logrus.WithField("dest", dest)
if err := sem.Acquire(context.Background(), 1); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: it's actually the dtw(dest) call we make in creating the closure that allocates, so we need to guard that

log.WithError(err).Error("Could not begin upload.")
stevekuznetsov marked this conversation as resolved.
Show resolved Hide resolved
continue
}
log.Info("Queued for upload")
go func(f UploadFunc, writer dataWriter, log *logrus.Entry) {
defer group.Done()
defer sem.Release(1)
if err := f(writer); err != nil {
errCh <- err
} else {
Expand Down
2 changes: 1 addition & 1 deletion prow/pod-utils/gcs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestUploadToGcs(t *testing.T) {
targets[fmt.Sprintf("fail-%d", i)] = fail
}

err := Upload(&storage.BucketHandle{}, targets)
err := Upload(&storage.BucketHandle{}, targets, int64(len(targets)))
if err != nil && !testCase.expectedErr {
t.Errorf("%s: expected no error but got %v", testCase.name, err)
}
Expand Down