diff --git a/prow/apis/prowjobs/v1/types.go b/prow/apis/prowjobs/v1/types.go index cc0d046cee4a..2e240b39c86d 100644 --- a/prow/apis/prowjobs/v1/types.go +++ b/prow/apis/prowjobs/v1/types.go @@ -519,6 +519,9 @@ type GCSConfiguration struct { // builtin's and the local system's defaults. This maps extensions // to media types, for example: MediaTypes["log"] = "text/plain" MediaTypes map[string]string `json:"mediaTypes,omitempty"` + // MaxConcurrency limits the concurrency used when pushing + // data to GCS. Defaults to 4 concurrent uploads per available CPU. + MaxConcurrency int64 `json:"max_concurrency,omitempty"` // LocalOutputDir specifies a directory where files should be copied INSTEAD of uploading to GCS. // This option is useful for testing jobs that use the pod-utilities without actually uploading. diff --git a/prow/gcsupload/options.go b/prow/gcsupload/options.go index 23484c2ba2fa..966346b86a3e 100644 --- a/prow/gcsupload/options.go +++ b/prow/gcsupload/options.go @@ -21,6 +21,7 @@ import ( "errors" "flag" "fmt" + "runtime" "strings" "github.com/GoogleCloudPlatform/testgrid/util/gcs" @@ -114,6 +115,10 @@ func (o *Options) Complete(args []string) { o.GCSConfiguration.MediaTypes[extension] = mediaType } o.mediaTypes = flagutil.NewStrings() + + if o.GCSConfiguration.MaxConcurrency == 0 { + o.GCSConfiguration.MaxConcurrency = int64(4 * runtime.NumCPU()) + } } // AddFlags adds flags to the FlagSet that populate diff --git a/prow/gcsupload/run.go b/prow/gcsupload/run.go index a6fb86c14bd6..af50716d988a 100644 --- a/prow/gcsupload/run.go +++ b/prow/gcsupload/run.go @@ -61,7 +61,7 @@ func (o Options) Run(spec *downwardapi.JobSpec, extra map[string]gcs.UploadFunc) return fmt.Errorf("could not connect to GCS: %v", err) } - if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets); err != nil { + if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets, o.MaxConcurrency); err != nil { return fmt.Errorf("failed to upload to GCS: %v", err) } logrus.Info("Finished upload to GCS") diff --git a/prow/pod-utils/gcs/BUILD.bazel b/prow/pod-utils/gcs/BUILD.bazel index 3f8bbc254b06..7554c1ad278c 100644 --- a/prow/pod-utils/gcs/BUILD.bazel +++ b/prow/pod-utils/gcs/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "@com_github_sirupsen_logrus//:go_default_library", "@com_google_cloud_go//storage:go_default_library", "@org_golang_google_api//googleapi:go_default_library", + "@org_golang_x_sync//semaphore:go_default_library", ], ) diff --git a/prow/pod-utils/gcs/upload.go b/prow/pod-utils/gcs/upload.go index 1ad0afc9abd4..90a0956fa07a 100644 --- a/prow/pod-utils/gcs/upload.go +++ b/prow/pod-utils/gcs/upload.go @@ -26,6 +26,7 @@ import ( "cloud.google.com/go/storage" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" "google.golang.org/api/googleapi" "k8s.io/test-infra/prow/errorutil" @@ -38,11 +39,11 @@ type destToWriter func(dest string) dataWriter // Upload uploads all of the data in the // uploadTargets map to GCS in parallel. The map is // keyed on GCS path under the bucket -func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error { +func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc, maxConcurrency int64) error { dtw := func(dest string) dataWriter { return gcsObjectWriter{bucket.Object(dest).NewWriter(context.Background())} } - return upload(dtw, uploadTargets) + return upload(dtw, uploadTargets, maxConcurrency) } // LocalExport copies all of the data in the uploadTargets map to local files in parallel. The map @@ -53,18 +54,24 @@ func LocalExport(exportDir string, uploadTargets map[string]UploadFunc) error { filePath: path.Join(exportDir, dest), } } - return upload(dtw, uploadTargets) + return upload(dtw, uploadTargets, int64(len(uploadTargets))) // there is no use in limiting concurrency of local writes } -func upload(dtw destToWriter, uploadTargets map[string]UploadFunc) error { +func upload(dtw destToWriter, uploadTargets map[string]UploadFunc, maxConcurrency int64) error { errCh := make(chan error, len(uploadTargets)) group := &sync.WaitGroup{} group.Add(len(uploadTargets)) + sem := semaphore.NewWeighted(maxConcurrency) for dest, upload := range uploadTargets { log := logrus.WithField("dest", dest) + if err := sem.Acquire(context.Background(), 1); err != nil { + log.WithError(err).Error("Could not begin upload.") + continue + } log.Info("Queued for upload") go func(f UploadFunc, writer dataWriter, log *logrus.Entry) { defer group.Done() + defer sem.Release(1) if err := f(writer); err != nil { errCh <- err } else { diff --git a/prow/pod-utils/gcs/upload_test.go b/prow/pod-utils/gcs/upload_test.go index dd681b968e45..d37d2fb6db42 100644 --- a/prow/pod-utils/gcs/upload_test.go +++ b/prow/pod-utils/gcs/upload_test.go @@ -87,7 +87,7 @@ func TestUploadToGcs(t *testing.T) { targets[fmt.Sprintf("fail-%d", i)] = fail } - err := Upload(&storage.BucketHandle{}, targets) + err := Upload(&storage.BucketHandle{}, targets, int64(len(targets))) if err != nil && !testCase.expectedErr { t.Errorf("%s: expected no error but got %v", testCase.name, err) }