Skip to content

Commit

Permalink
Add retry when commiting a manifest (#1041)
Browse files Browse the repository at this point in the history
- only retry on certain 'temporary' http status codes
- add tests to ensure retries do not occur on 'non-temp' http status
codes. And to make sure progress bar still works when retrying

Authored-by: Dennis Leon <leonde@vmware.com>
  • Loading branch information
DennisDenuto committed Jun 7, 2021
1 parent b69114f commit 6118d45
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 40 deletions.
73 changes: 73 additions & 0 deletions pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package remote
import (
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"

"github.com/google/go-containerregistry/pkg/name"
Expand Down Expand Up @@ -149,6 +151,77 @@ func TestMultiWriteWithNondistributableLayer(t *testing.T) {
}
}

func TestMultiWrite_Retry(t *testing.T) {
// Create a random image.
img1, err := random.Image(1024, 2)
if err != nil {
t.Fatal("random.Image:", err)
}

t.Run("retry http error 500", func(t *testing.T) {
// Set up a fake registry.
handler := registry.New()

numOfInternalServerErrors := 0
registryThatFailsOnFirstUpload := http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
if strings.Contains(request.URL.Path, "/manifests/") && numOfInternalServerErrors < 1 {
numOfInternalServerErrors++
responseWriter.WriteHeader(500)
return
}
handler.ServeHTTP(responseWriter, request)
})

s := httptest.NewServer(registryThatFailsOnFirstUpload)
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}

tag1 := mustNewTag(t, u.Host+"/repo:tag1")
if err := MultiWrite(map[name.Reference]Taggable{
tag1: img1,
}); err != nil {
t.Error("Write:", err)
}
})

t.Run("do not retry http error 401", func(t *testing.T) {
// Set up a fake registry.
handler := registry.New()

numOf401HttpErrors := 0
registryThatFailsOnFirstUpload := http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
if strings.Contains(request.URL.Path, "/manifests/") {
numOf401HttpErrors++
responseWriter.WriteHeader(401)
return
}
handler.ServeHTTP(responseWriter, request)
})

s := httptest.NewServer(registryThatFailsOnFirstUpload)
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}

tag1 := mustNewTag(t, u.Host+"/repo:tag1")
if err := MultiWrite(map[name.Reference]Taggable{
tag1: img1,
}); err == nil {
t.Fatal("Expected error:")
}

if numOf401HttpErrors > 1 {
t.Fatal("Should not retry on 401 errors:")
}

})
}

// TestMultiWrite_Deep tests that a deeply nested tree of manifest lists gets
// pushed in the correct order (i.e., each level in sequence).
func TestMultiWrite_Deep(t *testing.T) {
Expand Down
47 changes: 47 additions & 0 deletions pkg/v1/remote/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,53 @@ func TestMultiWrite_Progress(t *testing.T) {
}
}

func TestMultiWrite_Progress_Retry(t *testing.T) {
idx, err := random.Index(100000, 10, 10)
if err != nil {
t.Fatal(err)
}
c := make(chan v1.Update, 1000)

// Set up a fake registry.
handler := registry.New()
numOfInternalServerErrors := 0
registryThatFailsOnFirstUpload := http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
if strings.Contains(request.URL.Path, "/manifests/") && numOfInternalServerErrors < 1 {
numOfInternalServerErrors++
responseWriter.WriteHeader(500)
return
}
handler.ServeHTTP(responseWriter, request)
})

s := httptest.NewServer(registryThatFailsOnFirstUpload)
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}

ref, err := name.ParseReference(fmt.Sprintf("%s/test/progress/upload", u.Host))
if err != nil {
t.Fatal(err)
}
ref2, err := name.ParseReference(fmt.Sprintf("%s/test/progress/upload:again", u.Host))
if err != nil {
t.Fatal(err)
}

if err := MultiWrite(map[name.Reference]Taggable{
ref: idx,
ref2: idx,
}, WithProgress(c)); err != nil {
t.Fatalf("MultiWrite: %v", err)
}

if err := checkUpdates(c); err != nil {
t.Fatal(err)
}
}

func TestWriteLayer_Progress_Retry(t *testing.T) {
l, err := random.Layer(100000, types.OCIUncompressedLayer)
if err != nil {
Expand Down
84 changes: 44 additions & 40 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,24 @@ func (w *writer) incrProgress(written int64) {
}
}

var shouldRetry retry.Predicate = func(err error) bool {
// Various failure modes here, as we're often reading from and writing to
// the network.
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
logs.Warn.Printf("retrying %v", err)
return true
}
return false
}

// Try this three times, waiting 1s after first failure, 3s after second.
var backoff = retry.Backoff{
Duration: 1.0 * time.Second,
Factor: 3.0,
Jitter: 0.1,
Steps: 3,
}

// uploadOne performs a complete upload of a single layer.
func (w *writer) uploadOne(l v1.Layer) error {
var from, mount string
Expand Down Expand Up @@ -437,16 +455,6 @@ func (w *writer) uploadOne(l v1.Layer) error {

ctx := w.context

shouldRetry := func(err error) bool {
// Various failure modes here, as we're often reading from and writing to
// the network.
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
logs.Warn.Printf("retrying %v", err)
return true
}
return false
}

tryUpload := func() error {
location, mounted, err := w.initiateUpload(from, mount)
if err != nil {
Expand Down Expand Up @@ -498,14 +506,6 @@ func (w *writer) uploadOne(l v1.Layer) error {
return nil
}

// Try this three times, waiting 1s after first failure, 3s after second.
backoff := retry.Backoff{
Duration: 1.0 * time.Second,
Factor: 3.0,
Jitter: 0.1,
Steps: 3,
}

return retry.Retry(tryUpload, shouldRetry, backoff)
}

Expand Down Expand Up @@ -616,34 +616,38 @@ func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {

// commitManifest does a PUT of the image's manifest.
func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
raw, desc, err := unpackTaggable(t)
if err != nil {
return err
}
tryUpload := func() error {
raw, desc, err := unpackTaggable(t)
if err != nil {
return err
}

u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))

// Make the request to PUT the serialized manifest
req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
if err != nil {
return err
}
req.Header.Set("Content-Type", string(desc.MediaType))
// Make the request to PUT the serialized manifest
req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
if err != nil {
return err
}
req.Header.Set("Content-Type", string(desc.MediaType))

resp, err := w.client.Do(req.WithContext(w.context))
if err != nil {
return err
}
defer resp.Body.Close()
resp, err := w.client.Do(req.WithContext(w.context))
if err != nil {
return err
}
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
return err
if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
return err
}

// The image was successfully pushed!
logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size)
w.incrProgress(int64(len(raw)))
return nil
}

// The image was successfully pushed!
logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size)
w.incrProgress(int64(len(raw)))
return nil
return retry.Retry(tryUpload, shouldRetry, backoff)
}

func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
Expand Down

0 comments on commit 6118d45

Please sign in to comment.