Skip to content

Commit

Permalink
Implementation for retrying requests
Browse files Browse the repository at this point in the history
- Attempt to work around eventual consistency issues at the cost of some performance and potentially unnecessary requests.
- Request input structs can now contain a ConsistencyConditionFunc which determines whether a request should be retried on failure.
- ConsistencyConditionFuncs receive a copy of the response and the parsed OData with which to make such decisions.
- A canned RetryOn404ConsistencyConditionFunc can simply retry requests that receive a 404 response.
- This mitigates some eventual consistency issues, such as manipulating or referencing a newly created object, but doesn't work when listing objects.
  • Loading branch information
manicminer committed Jun 2, 2021
1 parent 74381a8 commit db2dd24
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 101 deletions.
8 changes: 5 additions & 3 deletions msgraph/app_role_assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (c *AppRoleAssignmentsClient) List(ctx context.Context, id string) (*[]AppR
// Remove removes a app role assignment.
func (c *AppRoleAssignmentsClient) Remove(ctx context.Context, id, appRoleAssignmentId string) (int, error) {
_, status, _, err := c.BaseClient.Delete(ctx, DeleteHttpRequestInput{
ValidStatusCodes: []int{http.StatusNoContent},
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusNoContent},
Uri: Uri{
Entity: fmt.Sprintf("/%s/%s/appRoleAssignments/%s", c.resourceType, id, appRoleAssignmentId),
HasTenantId: true,
Expand Down Expand Up @@ -105,8 +106,9 @@ func (c *AppRoleAssignmentsClient) Assign(ctx context.Context, clientServicePrin
return nil, status, fmt.Errorf("json.Marshal(): %v", err)
}
resp, status, _, err := c.BaseClient.Post(ctx, PostHttpRequestInput{
Body: body,
ValidStatusCodes: []int{http.StatusCreated},
Body: body,
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusCreated},
Uri: Uri{
Entity: fmt.Sprintf("/%s/%s/appRoleAssignments", c.resourceType, clientServicePrincipalId),
HasTenantId: true,
Expand Down
26 changes: 16 additions & 10 deletions msgraph/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (c *ApplicationsClient) Create(ctx context.Context, application Application
// Get retrieves an Application manifest.
func (c *ApplicationsClient) Get(ctx context.Context, id string) (*Application, int, error) {
resp, status, _, err := c.BaseClient.Get(ctx, GetHttpRequestInput{
ValidStatusCodes: []int{http.StatusOK},
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusOK},
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s", id),
HasTenantId: true,
Expand Down Expand Up @@ -145,8 +146,9 @@ func (c *ApplicationsClient) Update(ctx context.Context, application Application
return status, fmt.Errorf("json.Marshal(): %v", err)
}
_, status, _, err = c.BaseClient.Patch(ctx, PatchHttpRequestInput{
Body: body,
ValidStatusCodes: []int{http.StatusNoContent},
Body: body,
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusNoContent},
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s", *application.ID),
HasTenantId: true,
Expand All @@ -161,7 +163,8 @@ func (c *ApplicationsClient) Update(ctx context.Context, application Application
// Delete removes an Application.
func (c *ApplicationsClient) Delete(ctx context.Context, id string) (int, error) {
_, status, _, err := c.BaseClient.Delete(ctx, DeleteHttpRequestInput{
ValidStatusCodes: []int{http.StatusNoContent},
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusNoContent},
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s", id),
HasTenantId: true,
Expand Down Expand Up @@ -229,8 +232,9 @@ func (c *ApplicationsClient) AddPassword(ctx context.Context, applicationId stri
return nil, status, fmt.Errorf("json.Marshal(): %v", err)
}
resp, status, _, err := c.BaseClient.Post(ctx, PostHttpRequestInput{
Body: body,
ValidStatusCodes: []int{http.StatusOK, http.StatusCreated},
Body: body,
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusOK, http.StatusCreated},
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s/addPassword", applicationId),
HasTenantId: true,
Expand Down Expand Up @@ -316,7 +320,8 @@ func (c *ApplicationsClient) ListOwners(ctx context.Context, id string) (*[]stri
// ownerId is the object ID of the owning object.
func (c *ApplicationsClient) GetOwner(ctx context.Context, applicationId, ownerId string) (*string, int, error) {
resp, status, _, err := c.BaseClient.Get(ctx, GetHttpRequestInput{
ValidStatusCodes: []int{http.StatusOK},
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusOK},
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s/owners/%s/$ref", applicationId, ownerId),
Params: url.Values{"$select": []string{"id,url"}},
Expand Down Expand Up @@ -374,9 +379,10 @@ func (c *ApplicationsClient) AddOwners(ctx context.Context, application *Applica
return status, fmt.Errorf("json.Marshal(): %v", err)
}
_, status, _, err = c.BaseClient.Post(ctx, PostHttpRequestInput{
Body: body,
ValidStatusCodes: []int{http.StatusNoContent},
ValidStatusFunc: checkOwnerAlreadyExists,
Body: body,
ConsistencyFailureFunc: RetryOn404ConsistencyFailureFunc,
ValidStatusCodes: []int{http.StatusNoContent},
ValidStatusFunc: checkOwnerAlreadyExists,
Uri: Uri{
Entity: fmt.Sprintf("/applications/%s/owners/$ref", *application.ID),
HasTenantId: true,
Expand Down
126 changes: 97 additions & 29 deletions msgraph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"strconv"
Expand All @@ -25,16 +25,25 @@ const (
)

const (
consistencyRetryDelay = 2 * time.Second
defaultInitialBackoff = 1 * time.Second
defaultBackoffCap = 64 * time.Second
requestAttempts = 10
)

// ConsistencyFailureFunc is a function that determines whether an HTTP request has failed due to eventual consistency and should be retried
type ConsistencyFailureFunc func(*http.Response, *odata.OData) bool

func RetryOn404ConsistencyFailureFunc(resp *http.Response, _ *odata.OData) bool {
return resp.StatusCode == http.StatusNotFound
}

// ValidStatusFunc is a function that tests whether an HTTP response is considered valid for the particular request.
type ValidStatusFunc func(response *http.Response, o *odata.OData) bool
type ValidStatusFunc func(*http.Response, *odata.OData) bool

// HttpRequestInput is any type that can validate the response to an HTTP request.
type HttpRequestInput interface {
GetConsistencyFailureFunc() ConsistencyFailureFunc
GetValidStatusCodes() []int
GetValidStatusFunc() ValidStatusFunc
}
Expand Down Expand Up @@ -64,6 +73,10 @@ type Client struct {
// Authorizer is anything that can provide an access token with which to authorize requests.
Authorizer auth.Authorizer

// DisableRetries prevents the client from reattempting failed requests (which it does to work around eventual consistency issues).
// This does not impact handling of retries related to rate limiting, which are always performed.
DisableRetries bool

httpClient *http.Client
}

Expand All @@ -74,7 +87,7 @@ func NewClient(apiVersion ApiVersion, tenantId string) Client {
ApiVersion: apiVersion,
TenantId: tenantId,
UserAgent: "Hamilton (Go-http-client/1.1)",
httpClient: http.DefaultClient,
httpClient: &http.Client{},
}
}

Expand Down Expand Up @@ -109,6 +122,7 @@ func (c Client) performRequest(req *http.Request, input HttpRequestInput) (*http

req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json; charset=utf-8")
//req.Header.Add("ConsistencyLevel", "eventual")

if c.UserAgent != "" {
req.Header.Add("User-Agent", c.UserAgent)
Expand All @@ -126,6 +140,14 @@ func (c Client) performRequest(req *http.Request, input HttpRequestInput) (*http
return base * backoffPower(base, exp-1)
}

var reqBody []byte
if req.Body != nil {
reqBody, err = io.ReadAll(req.Body)
if err != nil {
return nil, status, nil, fmt.Errorf("reading request body: %v", err)
}
}

var attempts, backoff, multiplier int64
for attempts = 0; attempts < requestAttempts; attempts++ {
// sleep after the previous failed attempt
Expand All @@ -140,6 +162,7 @@ func (c Client) performRequest(req *http.Request, input HttpRequestInput) (*http
backoff = cap
}

req.Body = io.NopCloser(bytes.NewBuffer(reqBody))
resp, err = c.httpClient.Do(req)
if err != nil {
return nil, status, nil, err
Expand All @@ -150,15 +173,30 @@ func (c Client) performRequest(req *http.Request, input HttpRequestInput) (*http
return nil, status, o, err
}

if !c.DisableRetries {
f := input.GetConsistencyFailureFunc()
if f != nil && f(resp, o) {
// eventual consistency, just try again
backoff = int64(consistencyRetryDelay)
multiplier = 0
continue
}
}

status = resp.StatusCode
if !containsStatusCode(input.GetValidStatusCodes(), status) {
f := input.GetValidStatusFunc()
if f != nil && f(resp, o) {
return resp, status, o, nil
}

// rate limiting
if containsStatusCode([]int{424, 429, 503}, status) {
rateLimitingStatuses := []int{
http.StatusFailedDependency,
http.StatusTooManyRequests,
http.StatusServiceUnavailable,
}
if containsStatusCode(rateLimitingStatuses, status) {
// rate limiting
if retryAfter := resp.Header.Get("Retry-After"); retryAfter != "" {
if r, err := strconv.ParseFloat(retryAfter, 64); err == nil && r > 0 {
// Retry-After header detected, use that instead of default backoff
Expand All @@ -175,7 +213,7 @@ func (c Client) performRequest(req *http.Request, input HttpRequestInput) (*http
errText = fmt.Sprintf("OData error: %s", o.Error)
default:
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, status, o, fmt.Errorf("unexpected status %d, could not read response body", resp.StatusCode)
}
Expand Down Expand Up @@ -203,9 +241,15 @@ func containsStatusCode(expected []int, actual int) bool {

// DeleteHttpRequestInput configures a DELETE request.
type DeleteHttpRequestInput struct {
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
ConsistencyFailureFunc ConsistencyFailureFunc
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
}

// GetConsistencyFailureFunc returns a function used to evaluate whether a failed request is due to eventual consistency and should be retried.
func (i DeleteHttpRequestInput) GetConsistencyFailureFunc() ConsistencyFailureFunc {
return i.ConsistencyFailureFunc
}

// GetValidStatusCodes returns a []int of status codes considered valid for a DELETE request.
Expand Down Expand Up @@ -238,10 +282,16 @@ func (c Client) Delete(ctx context.Context, input DeleteHttpRequestInput) (*http

// GetHttpRequestInput configures a GET request.
type GetHttpRequestInput struct {
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
rawUri string
ConsistencyFailureFunc ConsistencyFailureFunc
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
rawUri string
}

// GetConsistencyFailureFunc returns a function used to evaluate whether a failed request is due to eventual consistency and should be retried.
func (i GetHttpRequestInput) GetConsistencyFailureFunc() ConsistencyFailureFunc {
return i.ConsistencyFailureFunc
}

// GetValidStatusCodes returns a []int of status codes considered valid for a GET request.
Expand Down Expand Up @@ -284,7 +334,7 @@ func (c Client) Get(ctx context.Context, input GetHttpRequestInput) (*http.Respo
contentType := strings.ToLower(resp.Header.Get("Content-Type"))
if strings.HasPrefix(contentType, "application/json") {
// Read the response body and close it
respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, status, o, fmt.Errorf("could not parse response body")
}
Expand All @@ -298,7 +348,7 @@ func (c Client) Get(ctx context.Context, input GetHttpRequestInput) (*http.Respo

if firstOdata.NextLink == nil || firstOdata.Value == nil {
// No more pages, reassign response body and return
resp.Body = ioutil.NopCloser(bytes.NewBuffer(respBody))
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
return resp, status, o, nil
}

Expand All @@ -311,7 +361,7 @@ func (c Client) Get(ctx context.Context, input GetHttpRequestInput) (*http.Respo
}

// Read the next page response body and close it
nextRespBody, err := ioutil.ReadAll(nextResp.Body)
nextRespBody, err := io.ReadAll(nextResp.Body)
if err != nil {
return nil, status, o, fmt.Errorf("could not parse response body")
}
Expand All @@ -336,18 +386,24 @@ func (c Client) Get(ctx context.Context, input GetHttpRequestInput) (*http.Respo
}

// Reassign the response body
resp.Body = ioutil.NopCloser(bytes.NewBuffer(newJson))
resp.Body = io.NopCloser(bytes.NewBuffer(newJson))
}

return resp, status, o, nil
}

// PatchHttpRequestInput configures a PATCH request.
type PatchHttpRequestInput struct {
Body []byte
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
ConsistencyFailureFunc ConsistencyFailureFunc
Body []byte
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
}

// GetConsistencyFailureFunc returns a function used to evaluate whether a failed request is due to eventual consistency and should be retried.
func (i PatchHttpRequestInput) GetConsistencyFailureFunc() ConsistencyFailureFunc {
return i.ConsistencyFailureFunc
}

// GetValidStatusCodes returns a []int of status codes considered valid for a PATCH request.
Expand Down Expand Up @@ -380,10 +436,16 @@ func (c Client) Patch(ctx context.Context, input PatchHttpRequestInput) (*http.R

// PostHttpRequestInput configures a POST request.
type PostHttpRequestInput struct {
Body []byte
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
Body []byte
ConsistencyFailureFunc ConsistencyFailureFunc
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
}

// GetConsistencyFailureFunc returns a function used to evaluate whether a failed request is due to eventual consistency and should be retried.
func (i PostHttpRequestInput) GetConsistencyFailureFunc() ConsistencyFailureFunc {
return i.ConsistencyFailureFunc
}

// GetValidStatusCodes returns a []int of status codes considered valid for a POST request.
Expand Down Expand Up @@ -416,10 +478,16 @@ func (c Client) Post(ctx context.Context, input PostHttpRequestInput) (*http.Res

// PutHttpRequestInput configures a PUT request.
type PutHttpRequestInput struct {
Body []byte
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
ConsistencyFailureFunc ConsistencyFailureFunc
Body []byte
ValidStatusCodes []int
ValidStatusFunc ValidStatusFunc
Uri Uri
}

// GetConsistencyFailureFunc returns a function used to evaluate whether a failed request is due to eventual consistency and should be retried.
func (i PutHttpRequestInput) GetConsistencyFailureFunc() ConsistencyFailureFunc {
return i.ConsistencyFailureFunc
}

// GetValidStatusCodes returns a []int of status codes considered valid for a PUT request.
Expand Down
Loading

0 comments on commit db2dd24

Please sign in to comment.