Skip to content

Commit

Permalink
fix(executor): fix gcs artifact retry
Browse files Browse the repository at this point in the history
Signed-off-by: Tianchu Zhao <evantczhao@gmail.com>
  • Loading branch information
tczhao committed Jul 17, 2021
1 parent 16308fa commit 7ad386a
Showing 1 changed file with 44 additions and 7 deletions.
51 changes: 44 additions & 7 deletions workflow/artifacts/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -14,12 +15,14 @@ import (
"github.com/argoproj/pkg/file"
log "github.com/sirupsen/logrus"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
"github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
)

Expand All @@ -28,7 +31,41 @@ type ArtifactDriver struct {
ServiceAccountKey string
}

var _ common.ArtifactDriver = &ArtifactDriver{}
var (
_ common.ArtifactDriver = &ArtifactDriver{}
defaultRetry = wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}
)

// from https://github.com/googleapis/google-cloud-go/blob/master/storage/go110.go
func isTransientGCSErr(err error) bool {
if err == io.ErrUnexpectedEOF {
return true
}
switch e := err.(type) {
case *googleapi.Error:
// Retry on 429 and 5xx, according to
// https://cloud.google.com/storage/docs/exponential-backoff.
return e.Code == 429 || (e.Code >= 500 && e.Code < 600)
case *url.Error:
// Retry socket-level errors ECONNREFUSED and ENETUNREACH (from syscall).
// Unfortunately the error type is unexported, so we resort to string
// matching.
retriable := []string{"connection refused", "connection reset"}
for _, s := range retriable {
if strings.Contains(e.Error(), s) {
return true
}
}
case interface{ Temporary() bool }:
if e.Temporary() {
return true
}
}
if e, ok := err.(interface{ Unwrap() error }); ok {
return isTransientGCSErr(e.Unwrap())
}
return false
}

func (g *ArtifactDriver) newGCSClient() (*storage.Client, error) {
if g.ServiceAccountKey != "" {
Expand Down Expand Up @@ -62,19 +99,19 @@ func newGCSClientDefault() (*storage.Client, error) {

// Load function downloads objects from GCS
func (g *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("GCS Load path: %s, key: %s", path, inputArtifact.GCS.Key)
gcsClient, err := g.newGCSClient()
if err != nil {
log.Warnf("Failed to create new GCS client: %v", err)
return false, err
return isTransientGCSErr(err), err
}
defer gcsClient.Close()
err = downloadObjects(gcsClient, inputArtifact.GCS.Bucket, inputArtifact.GCS.Key, path)
if err != nil {
log.Warnf("Failed to download objects from GCS: %v", err)
return false, err
return isTransientGCSErr(err), err
}
return true, nil
})
Expand Down Expand Up @@ -161,17 +198,17 @@ func listByPrefix(client *storage.Client, bucket, prefix, delim string) ([]strin

// Save an artifact to GCS compliant storage, e.g., uploading a local file to GCS bucket
func (g *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("GCS Save path: %s, key: %s", path, outputArtifact.GCS.Key)
client, err := g.newGCSClient()
if err != nil {
return false, err
return isTransientGCSErr(err), err
}
defer client.Close()
err = uploadObjects(client, outputArtifact.GCS.Bucket, outputArtifact.GCS.Key, path)
if err != nil {
return false, err
return isTransientGCSErr(err), err
}
return true, nil
})
Expand Down

0 comments on commit 7ad386a

Please sign in to comment.