Skip to content

Commit

Permalink
gcsupload: limit maximum upload concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Mar 6, 2020
1 parent 37714f0 commit 6b72630
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 6 deletions.
3 changes: 3 additions & 0 deletions prow/apis/prowjobs/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ type GCSConfiguration struct {
// builtin's and the local system's defaults. This maps extensions
// to media types, for example: MediaTypes["log"] = "text/plain"
MediaTypes map[string]string `json:"mediaTypes,omitempty"`
// MaxConcurrency limits the concurrency used when pushing
// data to GCS. Defaults to 4 concurrent uploads per available CPU.
MaxConcurrency int64 `json:"max_concurrency,omitempty"`

// LocalOutputDir specifies a directory where files should be copied INSTEAD of uploading to GCS.
// This option is useful for testing jobs that use the pod-utilities without actually uploading.
Expand Down
5 changes: 5 additions & 0 deletions prow/gcsupload/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"flag"
"fmt"
"runtime"
"strings"

"github.com/GoogleCloudPlatform/testgrid/util/gcs"
Expand Down Expand Up @@ -114,6 +115,10 @@ func (o *Options) Complete(args []string) {
o.GCSConfiguration.MediaTypes[extension] = mediaType
}
o.mediaTypes = flagutil.NewStrings()

if o.GCSConfiguration.MaxConcurrency == 0 {
o.GCSConfiguration.MaxConcurrency = int64(4 * runtime.NumCPU())
}
}

// AddFlags adds flags to the FlagSet that populate
Expand Down
2 changes: 1 addition & 1 deletion prow/gcsupload/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (o Options) Run(spec *downwardapi.JobSpec, extra map[string]gcs.UploadFunc)
return fmt.Errorf("could not connect to GCS: %v", err)
}

if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets); err != nil {
if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets, o.MaxConcurrency); err != nil {
return fmt.Errorf("failed to upload to GCS: %v", err)
}
logrus.Info("Finished upload to GCS")
Expand Down
1 change: 1 addition & 0 deletions prow/pod-utils/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@com_google_cloud_go//storage:go_default_library",
"@org_golang_google_api//googleapi:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)

Expand Down
15 changes: 11 additions & 4 deletions prow/pod-utils/gcs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"cloud.google.com/go/storage"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"google.golang.org/api/googleapi"

"k8s.io/test-infra/prow/errorutil"
Expand All @@ -38,11 +39,11 @@ type destToWriter func(dest string) dataWriter
// Upload uploads all of the data in the
// uploadTargets map to GCS in parallel. The map is
// keyed on GCS path under the bucket
func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error {
func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc, maxConcurrency int64) error {
dtw := func(dest string) dataWriter {
return gcsObjectWriter{bucket.Object(dest).NewWriter(context.Background())}
}
return upload(dtw, uploadTargets)
return upload(dtw, uploadTargets, maxConcurrency)
}

// LocalExport copies all of the data in the uploadTargets map to local files in parallel. The map
Expand All @@ -53,18 +54,24 @@ func LocalExport(exportDir string, uploadTargets map[string]UploadFunc) error {
filePath: path.Join(exportDir, dest),
}
}
return upload(dtw, uploadTargets)
return upload(dtw, uploadTargets, int64(len(uploadTargets))) // there is no use in limiting concurrency of local writes
}

func upload(dtw destToWriter, uploadTargets map[string]UploadFunc) error {
func upload(dtw destToWriter, uploadTargets map[string]UploadFunc, maxConcurrency int64) error {
errCh := make(chan error, len(uploadTargets))
group := &sync.WaitGroup{}
group.Add(len(uploadTargets))
sem := semaphore.NewWeighted(maxConcurrency)
for dest, upload := range uploadTargets {
log := logrus.WithField("dest", dest)
if err := sem.Acquire(context.Background(), 1); err != nil {
log.WithError(err).Error("Could not begin upload.")
continue
}
log.Info("Queued for upload")
go func(f UploadFunc, writer dataWriter, log *logrus.Entry) {
defer group.Done()
defer sem.Release(1)
if err := f(writer); err != nil {
errCh <- err
} else {
Expand Down
2 changes: 1 addition & 1 deletion prow/pod-utils/gcs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestUploadToGcs(t *testing.T) {
targets[fmt.Sprintf("fail-%d", i)] = fail
}

err := Upload(&storage.BucketHandle{}, targets)
err := Upload(&storage.BucketHandle{}, targets, int64(len(targets)))
if err != nil && !testCase.expectedErr {
t.Errorf("%s: expected no error but got %v", testCase.name, err)
}
Expand Down

0 comments on commit 6b72630

Please sign in to comment.