Skip to content

Commit

Permalink
Add remote.WithJobs and use it in remote.MultiWrite (#803)
Browse files Browse the repository at this point in the history
- default parallel is 4
- an error is returned if WithParallel is called with a value <= 0

We can probably reuse this to fetch and push blobs elsewhere in remote.

MultiWrite'ing 123 distroless images with WithJobs(100) took 4s
compared to 11s for the default.
  • Loading branch information
imjasonh committed Nov 3, 2020
1 parent ab3252b commit 5040388
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
5 changes: 0 additions & 5 deletions pkg/v1/remote/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ import (
"github.com/google/go-containerregistry/pkg/v1/v1util"
)

var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}

// ErrSchema1 indicates that we received a schema1 manifest from the registry.
// This library doesn't have plans to support this legacy image format:
// https://github.com/google/go-containerregistry/issues/377
Expand Down
12 changes: 4 additions & 8 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"golang.org/x/sync/errgroup"
)

// Parallelism of blob and manifest uploads
// TODO(jasonhall): Make this an Option.
const jobs = 4

// MultiWrite writes the given Images or ImageIndexes to the given refs, as
// efficiently as possible, by deduping shared layer blobs and uploading layers
// in parallel, then uploading all manifests in parallel.
Expand Down Expand Up @@ -96,9 +92,9 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*jobs)
blobChan := make(chan v1.Layer, 2*o.jobs)
var g errgroup.Group
for i := 0; i < jobs; i++ {
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
Expand Down Expand Up @@ -126,8 +122,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
i Taggable
ref name.Reference
}
taskChan := make(chan task, 2*jobs)
for i := 0; i < jobs; i++ {
taskChan := make(chan task, 2*o.jobs)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
Expand Down
25 changes: 25 additions & 0 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package remote

import (
"context"
"errors"
"net/http"

"github.com/google/go-containerregistry/pkg/authn"
Expand All @@ -33,14 +34,23 @@ type options struct {
transport http.RoundTripper
platform v1.Platform
context context.Context
jobs int
}

var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}

const defaultJobs = 4

func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o := &options{
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
}

for _, option := range opts {
Expand Down Expand Up @@ -131,3 +141,18 @@ func WithContext(ctx context.Context) Option {
return nil
}
}

// WithJobs is a functional option for setting the parallelism of remote
// operations performed by a given function. Note that not all remote
// operations support parallelism.
//
// The default value is 4.
func WithJobs(jobs int) Option {
return func(o *options) error {
if jobs <= 0 {
return errors.New("jobs must be greater than zero")
}
o.jobs = jobs
return nil
}
}
2 changes: 2 additions & 0 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error {
uploaded[h] = true
}

// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
g.Go(func() error {
return w.uploadOne(l)
})
Expand Down Expand Up @@ -501,6 +502,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
context: o.context,
}

// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
for _, desc := range index.Manifests {
ref := ref.Context().Digest(desc.Digest.String())
exists, err := w.checkExistingManifest(desc.Digest, desc.MediaType)
Expand Down

0 comments on commit 5040388

Please sign in to comment.