From 114bda2b85ba118923c58aa1e4a2c44e87b266fe Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Fri, 6 Mar 2020 11:12:58 -0800 Subject: [PATCH] gcsupload: limit maximum upload concurrency Signed-off-by: Steve Kuznetsov --- prow/apis/prowjobs/v1/types.go | 3 +++ prow/gcsupload/options.go | 5 +++++ prow/gcsupload/run.go | 2 +- prow/pod-utils/gcs/BUILD.bazel | 1 + prow/pod-utils/gcs/upload.go | 14 ++++++++++---- prow/pod-utils/gcs/upload_test.go | 2 +- 6 files changed, 21 insertions(+), 6 deletions(-) diff --git a/prow/apis/prowjobs/v1/types.go b/prow/apis/prowjobs/v1/types.go index cc0d046cee4ab..2e240b39c86de 100644 --- a/prow/apis/prowjobs/v1/types.go +++ b/prow/apis/prowjobs/v1/types.go @@ -519,6 +519,9 @@ type GCSConfiguration struct { // builtin's and the local system's defaults. This maps extensions // to media types, for example: MediaTypes["log"] = "text/plain" MediaTypes map[string]string `json:"mediaTypes,omitempty"` + // MaxConcurrency limits the concurrency used when pushing + // data to GCS. Defaults to 4 concurrent uploads per available CPU. + MaxConcurrency int64 `json:"max_concurrency,omitempty"` // LocalOutputDir specifies a directory where files should be copied INSTEAD of uploading to GCS. // This option is useful for testing jobs that use the pod-utilities without actually uploading. diff --git a/prow/gcsupload/options.go b/prow/gcsupload/options.go index 23484c2ba2fa6..3e09cccdba0ba 100644 --- a/prow/gcsupload/options.go +++ b/prow/gcsupload/options.go @@ -21,6 +21,7 @@ import ( "errors" "flag" "fmt" + "runtime" "strings" "github.com/GoogleCloudPlatform/testgrid/util/gcs" @@ -114,6 +115,10 @@ func (o *Options) Complete(args []string) { o.GCSConfiguration.MediaTypes[extension] = mediaType } o.mediaTypes = flagutil.NewStrings() + + if o.GCSConfiguration.MaxConcurrency == 0 { + o.GCSConfiguration.MaxConcurrency = 4 * runtime.NumCPU() + } } // AddFlags adds flags to the FlagSet that populate diff --git a/prow/gcsupload/run.go b/prow/gcsupload/run.go index a6fb86c14bd6a..af50716d988ad 100644 --- a/prow/gcsupload/run.go +++ b/prow/gcsupload/run.go @@ -61,7 +61,7 @@ func (o Options) Run(spec *downwardapi.JobSpec, extra map[string]gcs.UploadFunc) return fmt.Errorf("could not connect to GCS: %v", err) } - if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets); err != nil { + if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets, o.MaxConcurrency); err != nil { return fmt.Errorf("failed to upload to GCS: %v", err) } logrus.Info("Finished upload to GCS") diff --git a/prow/pod-utils/gcs/BUILD.bazel b/prow/pod-utils/gcs/BUILD.bazel index 68e899a6ddd1c..9a22f10185bbb 100644 --- a/prow/pod-utils/gcs/BUILD.bazel +++ b/prow/pod-utils/gcs/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "@com_github_googlecloudplatform_testgrid//metadata:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_google_cloud_go//storage:go_default_library", + "@org_golang_x_sync//semaphore:go_default_library", ], ) diff --git a/prow/pod-utils/gcs/upload.go b/prow/pod-utils/gcs/upload.go index 951e8437e7e9d..a719664ba3159 100644 --- a/prow/pod-utils/gcs/upload.go +++ b/prow/pod-utils/gcs/upload.go @@ -26,6 +26,7 @@ import ( "cloud.google.com/go/storage" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" "k8s.io/test-infra/prow/errorutil" ) @@ -37,11 +38,11 @@ type destToWriter func(dest string) dataWriter // Upload uploads all of the data in the // uploadTargets map to GCS in parallel. The map is // keyed on GCS path under the bucket -func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error { +func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc, maxConcurrency int64) error { dtw := func(dest string) dataWriter { return gcsObjectWriter{bucket.Object(dest).NewWriter(context.Background())} } - return upload(dtw, uploadTargets) + return upload(dtw, uploadTargets, maxConcurrency) } // LocalExport copies all of the data in the uploadTargets map to local files in parallel. The map @@ -52,18 +53,23 @@ func LocalExport(exportDir string, uploadTargets map[string]UploadFunc) error { filePath: path.Join(exportDir, dest), } } - return upload(dtw, uploadTargets) + return upload(dtw, uploadTargets, int64(len(uploadTargets))) // there is no use in limiting concurrency of local writes } -func upload(dtw destToWriter, uploadTargets map[string]UploadFunc) error { +func upload(dtw destToWriter, uploadTargets map[string]UploadFunc, maxConcurrency int64) error { errCh := make(chan error, len(uploadTargets)) group := &sync.WaitGroup{} group.Add(len(uploadTargets)) + sem := semaphore.NewWeighted(maxConcurrency) for dest, upload := range uploadTargets { log := logrus.WithField("dest", dest) + if err := sem.Acquire(context.Background(), 1); err != nil { + log.WithError(err).Error("Could not begin upload.") + } log.Info("Queued for upload") go func(f UploadFunc, writer dataWriter, log *logrus.Entry) { defer group.Done() + defer sem.Release(1) if err := f(writer); err != nil { errCh <- err } else { diff --git a/prow/pod-utils/gcs/upload_test.go b/prow/pod-utils/gcs/upload_test.go index dd681b968e45d..d37d2fb6db424 100644 --- a/prow/pod-utils/gcs/upload_test.go +++ b/prow/pod-utils/gcs/upload_test.go @@ -87,7 +87,7 @@ func TestUploadToGcs(t *testing.T) { targets[fmt.Sprintf("fail-%d", i)] = fail } - err := Upload(&storage.BucketHandle{}, targets) + err := Upload(&storage.BucketHandle{}, targets, int64(len(targets))) if err != nil && !testCase.expectedErr { t.Errorf("%s: expected no error but got %v", testCase.name, err) }