Skip to content

Commit

Permalink
Support WithProgress for remote Writes (#967)
Browse files Browse the repository at this point in the history
- supported in WriteLayer, Write, WriteIndex, MultiWrite
- chan is closed when write completes
- an error is sent along the chan if there are any non-temporary errors uploading
- if a layer already exists or is mounted, progress updates immediately to account for that
- if layer upload fails and is retried, progress goes backward and goes back up
  • Loading branch information
imjasonh committed Mar 22, 2021
1 parent 4a92f6c commit 3678a26
Show file tree
Hide file tree
Showing 6 changed files with 721 additions and 22 deletions.
50 changes: 45 additions & 5 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// Current limitations:
// - All refs must share the same repository.
// - Images cannot consist of stream.Layers.
func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Determine the repository being pushed to; if asked to push to
// multiple repositories, give up.
var repo, zero name.Repository
Expand Down Expand Up @@ -86,9 +86,49 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
return err
}
w := writer{
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
lastUpdate: &v1.Update{},
}

// Collect the total size of blobs and manifests we're about to write.
if o.updates != nil {
defer close(o.updates)
defer func() { sendError(o.updates, rerr) }()
for _, b := range blobs {
size, err := b.Size()
if err != nil {
return err
}
w.lastUpdate.Total += size
}
countManifest := func(t Taggable) error {
b, err := t.RawManifest()
if err != nil {
return err
}
w.lastUpdate.Total += int64(len(b))
return nil
}
for _, i := range images {
if err := countManifest(i); err != nil {
return err
}
}
for _, nm := range newManifests {
for _, i := range nm {
if err := countManifest(i); err != nil {
return err
}
}
}
for _, i := range indexes {
if err := countManifest(i); err != nil {
return err
}
}
}

// Upload individual blobs and collect any errors.
Expand Down Expand Up @@ -160,8 +200,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.
return commitMany(indexes)

return commitMany(indexes)
}

// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
Expand Down
2 changes: 1 addition & 1 deletion pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestMultiWrite_Deep(t *testing.T) {
if err != nil {
t.Fatal("random.Image:", err)
}
for i := 0; i < 10; i++ {
for i := 0; i < 4; i++ {
idx = mutate.AppendManifests(idx, mutate.IndexAddendum{Add: idx})
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type options struct {
jobs int
userAgent string
allowNondistributableArtifacts bool
updates chan<- v1.Update
}

var defaultPlatform = v1.Platform{
Expand Down Expand Up @@ -184,3 +185,14 @@ func WithNondistributable(o *options) error {
o.allowNondistributableArtifacts = true
return nil
}

// WithProgress takes a channel that will receive progress updates as bytes are written.
//
// Sending updates to an unbuffered channel will block writes, so callers
// should provide a buffered channel to avoid potential deadlocks.
func WithProgress(updates chan<- v1.Update) Option {
return func(o *options) error {
o.updates = updates
return nil
}
}
Loading

0 comments on commit 3678a26

Please sign in to comment.