Skip to content

Commit

Permalink
Merge pull request #757 from vercel/gsoltis/smart-remote-failure
Browse files Browse the repository at this point in the history
Apply review suggestions to remote failure PR
  • Loading branch information
Greg Soltis authored Feb 18, 2022
2 parents 73cd20a + e7ed79b commit 42a0bf6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 57 deletions.
7 changes: 3 additions & 4 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package cache

import (
"fmt"
"sync"
"golang.org/x/sync/errgroup"

"github.com/vercel/turborepo/cli/internal/config"
"github.com/vercel/turborepo/cli/internal/ui"

"golang.org/x/sync/errgroup"
)

// Cache is abstracted way to cache/fetch previously run tasks
Expand Down Expand Up @@ -65,7 +64,7 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files
// but it's hard to fix that without breaking the cache abstraction.
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error {
// Attempt to store on all caches simultaneously.
g := new(errgroup.Group)
g := &errgroup.Group{}
for i, cache := range mplex.caches {
if i == stopAt {
break
Expand Down
6 changes: 0 additions & 6 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount {
return fmt.Errorf("skipping uploading artifacts to HTTP cache: too many failures")
}

r, w := io.Pipe()
go cache.write(w, hash, files)
Expand Down Expand Up @@ -106,9 +103,6 @@ func (cache *httpCache) storeFile(tw *tar.Writer, name string) error {
func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (bool, []string, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount {
return false, nil, fmt.Errorf("skipping downloading artifacts to HTTP cache: too many past failures")
}
m, files, err := cache.retrieve(key)
if err != nil {
return false, files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
Expand Down
100 changes: 59 additions & 41 deletions cli/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package client

import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,24 +21,30 @@ import (

type ApiClient struct {
// The api's base URL
baseUrl string
Token string
turboVersion string
CurrentFailCount uint64
baseUrl string
Token string
turboVersion string
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
maxRemoteFailCount uint64
// Must be used via atomic package
currentFailCount uint64
// An http client
HttpClient *retryablehttp.Client
}

// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred
var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred")

func (api *ApiClient) SetToken(token string) {
api.Token = token
}

// New creates a new ApiClient
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient {
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient {
client := &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
CurrentFailCount: 0,
baseUrl: baseUrl,
turboVersion: turboVersion,
maxRemoteFailCount: maxRemoteFailCount,
HttpClient: &retryablehttp.Client{
HTTPClient: &http.Client{
Timeout: time.Duration(20 * time.Second),
Expand All @@ -47,27 +56,26 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli
Logger: logger,
},
}
client.HttpClient.CheckRetry = client.checkRetry
return client
}

func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
if v, ok := err.(*url.Error); ok {
if errors.As(err, &x509.UnknownAuthorityError{}) {
// Don't retry if the error was due to TLS cert verification failure.
if _, ok := v.Err.(x509.UnknownAuthorityError); ok {
atomic.AddUint64(&client.CurrentFailCount, 1)
return false, v
}
atomic.AddUint64(&c.currentFailCount, 1)
return false, err
}
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// 429 Too Many Requests is recoverable. Sometimes the server puts
// a Retry-After response header to indicate when the server is
// available to start processing request from client.
if resp.StatusCode == http.StatusTooManyRequests {
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

Expand All @@ -76,13 +84,41 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool,
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

return false, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
atomic.AddUint64(&c.currentFailCount, 1)
return false, ctx.Err()
}

// we're squashing the error from the request and substituting any error that might come
// from our retry policy.
shouldRetry, err := c.retryCachePolicy(resp, err)
if shouldRetry {
// Our policy says it's ok to retry, but we need to check the failure count
if retryErr := c.okToRequest(); retryErr != nil {
return false, retryErr
}
}
return shouldRetry, err
}

// okToRequest returns nil if it's ok to make a request, and returns the error to
// return to the caller if a request is not allowed
func (c *ApiClient) okToRequest() error {
if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount {
return nil
}
return ErrTooManyFailures
}

func (c *ApiClient) makeUrl(endpoint string) string {
return fmt.Sprintf("%v%v", c.baseUrl, endpoint)
}
Expand All @@ -92,6 +128,9 @@ func (c *ApiClient) UserAgent() string {
}

func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error {
if err := c.okToRequest(); err != nil {
return err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
Expand All @@ -113,18 +152,6 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
return fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
}

c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1)
return false, ctx.Err()
}

// don't propagate other errors
shouldRetry, err := c.retryCachePolicy(resp, err)
return shouldRetry, err
}

if resp, err := c.HttpClient.Do(req); err != nil {
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
} else {
Expand All @@ -134,6 +161,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
}

func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) {
if err := c.okToRequest(); err != nil {
return nil, err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
Expand All @@ -153,18 +183,6 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo
return nil, fmt.Errorf("invalid cache URL: %w", err)
}

c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1)
return false, ctx.Err()
}

// don't propagate other errors
shouldRetry, err := c.retryCachePolicy(resp, err)
return shouldRetry, err
}

return c.HttpClient.Do(req)
}

Expand Down
11 changes: 5 additions & 6 deletions cli/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"

"github.com/vercel/turborepo/cli/internal/client"

hclog "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -49,8 +50,6 @@ type Config struct {

// CacheConfig
type CacheConfig struct {
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
MaxRemoteFailCount uint64
// Number of async workers
Workers int
// Cache directory
Expand Down Expand Up @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
Output: output,
})

apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion)
maxRemoteFailCount := 3
apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount))

c = &Config{
Logger: logger,
Expand All @@ -174,9 +174,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
ApiClient: apiClient,
TurboVersion: turboVersion,
Cache: &CacheConfig{
MaxRemoteFailCount: 3,
Workers: runtime.NumCPU() + 2,
Dir: filepath.Join("node_modules", ".cache", "turbo"),
Workers: runtime.NumCPU() + 2,
Dir: filepath.Join("node_modules", ".cache", "turbo"),
},
}

Expand Down

0 comments on commit 42a0bf6

Please sign in to comment.