Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: obey http status and backoff on 429, 50x #156

Merged
merged 9 commits into from
Dec 15, 2023
36 changes: 35 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -66,6 +67,9 @@ type metrics struct {
closed chan struct{}
ctx context.Context
cancel func()
maxSkips float64
errors float64
skips float64
}

func newMetrics(options metricsOptions, channels metricsChannels) *metrics {
Expand All @@ -75,6 +79,9 @@ func newMetrics(options metricsOptions, channels metricsChannels) *metrics {
started: time.Now(),
close: make(chan struct{}),
closed: make(chan struct{}),
maxSkips: 10,
errors: 0,
skips: 0,
}
ctx, cancel := context.WithCancel(context.Background())
m.ctx = ctx
Expand Down Expand Up @@ -111,7 +118,11 @@ func (m *metrics) sync() {
for {
select {
case <-m.ticker.C:
m.sendMetrics()
if m.skips == 0 {
m.sendMetrics()
} else {
m.decrementSkip()
}
case <-m.close:
close(m.closed)
return
Expand All @@ -136,7 +147,24 @@ func (m *metrics) registerInstance() {

m.registered <- payload
}
func (m *metrics) backoff() {
m.errors = math.Min(m.maxSkips, m.errors+1)
m.skips = m.errors
}

func (m *metrics) configurationError() {
m.errors = m.maxSkips
m.skips = m.errors
}

func (m *metrics) successfulPost() {
m.errors = math.Max(0, m.errors-1)
m.skips = m.errors
}

func (m *metrics) decrementSkip() {
m.skips = math.Max(0, m.skips-1)
}
func (m *metrics) sendMetrics() {
m.bucketMu.Lock()
bucket := m.resetBucket()
Expand All @@ -160,6 +188,11 @@ func (m *metrics) sendMetrics() {
defer resp.Body.Close()

if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusMultipleChoices {
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound {
m.configurationError()
} else if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= http.StatusInternalServerError {
m.backoff()
}
m.warn(fmt.Errorf("%s return %d", u.String(), resp.StatusCode))
// The post failed, re-add the metrics we attempted to send so
// they are included in the next post.
Expand All @@ -174,6 +207,7 @@ func (m *metrics) sendMetrics() {
m.bucket.Start = bucket.Start
m.bucketMu.Unlock()
} else {
m.successfulPost()
m.sent <- payload
}
}
Expand Down
91 changes: 89 additions & 2 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,101 @@ func TestMetrics_ShouldNotCountMetricsForParentToggles(t *testing.T) {
WithInstanceId(mockInstanceId),
WithListener(mockListener),
)

assert.Nil(err, "client should not return an error")
client.WaitForReady()
client.IsEnabled("child")

assert.EqualValues(client.metrics.bucket.Toggles["child"].Yes, 1)
assert.EqualValues(client.metrics.bucket.Toggles["parent"].Yes, 0)
client.Close()
err = client.Close()

assert.Nil(err, "client should not return an error")
assert.True(gock.IsDone(), "there should be no more mocks")
}

func TestMetrics_ShouldBackoffOn500(t *testing.T) {
assert := assert.New(t)
defer gock.OffAll()

gock.New(mockerServer).
Post("/client/register").
Reply(200)
gock.New(mockerServer).
Post("/client/metrics").
Persist().
Reply(500)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
JSON(api.FeatureResponse{})
mockListener := &MockedListener{}
mockListener.On("OnReady").Return()
mockListener.On("OnRegistered", mock.AnythingOfType("ClientData")).Return()
mockListener.On("OnCount", "foo", false).Return()
mockListener.On("OnCount", "bar", false).Return()
mockListener.On("OnCount", "baz", false).Return()
mockListener.On("OnWarning", mock.MatchedBy(func(e error) bool {
return strings.HasSuffix(e.Error(), "http://foo.com/client/metrics return 500")
})).Return()
mockListener.On("OnError", mock.Anything).Return()

client, err := NewClient(
WithUrl(mockerServer),
WithMetricsInterval(50*time.Millisecond),
WithAppName(mockAppName),
WithInstanceId(mockInstanceId),
WithListener(mockListener),
)
assert.Nil(err, "client should not return an error")

client.WaitForReady()
client.IsEnabled("foo")
client.IsEnabled("bar")
client.IsEnabled("baz")

time.Sleep(320 * time.Millisecond)
err = client.Close()
assert.Equal(float64(3), client.metrics.errors)
assert.Nil(err, "Client should close without a problem")

}

func TestMetrics_ErrorCountShouldDecreaseIfSuccessful(t *testing.T) {
assert := assert.New(t)
defer gock.OffAll()

gock.New(mockerServer).
Post("/client/register").
Reply(200)
gock.New(mockerServer).
Post("/client/metrics").
Times(2).
Reply(500)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
JSON(api.FeatureResponse{})
gock.New(mockerServer).
Post("/client/metrics").
Persist().
Reply(200)

client, err := NewClient(
WithUrl(mockerServer),
WithMetricsInterval(50*time.Millisecond),
WithAppName(mockAppName),
WithInstanceId(mockInstanceId),
)
assert.Nil(err, "client should not return an error")

client.WaitForReady()
client.IsEnabled("foo")
client.IsEnabled("bar")
client.IsEnabled("baz")
time.Sleep(360 * time.Millisecond)
client.IsEnabled("foo")
time.Sleep(100 * time.Millisecond)
err = client.Close()
assert.Equal(float64(0), client.metrics.errors)
assert.Nil(err, "Client should close without a problem")
}
44 changes: 40 additions & 4 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"sync"
Expand All @@ -26,6 +27,9 @@ type repository struct {
isReady bool
refreshTicker *time.Ticker
segments map[int][]api.Constraint
errors float64
maxSkips float64
skips float64
}

func newRepository(options repositoryOptions, channels repositoryChannels) *repository {
Expand All @@ -36,6 +40,9 @@ func newRepository(options repositoryOptions, channels repositoryChannels) *repo
closed: make(chan struct{}),
refreshTicker: time.NewTicker(options.refreshInterval),
segments: map[int][]api.Constraint{},
errors: 0,
maxSkips: 10,
skips: 0,
}
ctx, cancel := context.WithCancel(context.Background())
repo.ctx = ctx
Expand Down Expand Up @@ -80,11 +87,33 @@ func (r *repository) sync() {
close(r.closed)
return
case <-r.refreshTicker.C:
r.fetchAndReportError()
if r.skips == 0 {
r.fetchAndReportError()
} else {
r.decrementSkips()
}
}
}
}

func (r *repository) backoff() {
r.errors = math.Min(r.maxSkips, r.errors+1)
r.skips = r.errors
}

func (r *repository) successfulFetch() {
r.errors = math.Max(0, r.errors-1)
r.skips = r.errors
}

func (r *repository) decrementSkips() {
r.skips = math.Max(0, r.skips-1)
}
func (r *repository) configurationError() {
r.errors = r.maxSkips
r.skips = r.errors
}

func (r *repository) fetch() error {
u, _ := r.options.url.Parse(getFetchURLPath(r.options.projectName))

Expand Down Expand Up @@ -119,7 +148,7 @@ func (r *repository) fetch() error {
if resp.StatusCode == http.StatusNotModified {
return nil
}
if err := statusIsOK(resp); err != nil {
if err := r.statusIsOK(resp); err != nil {
return err
}

Expand All @@ -133,14 +162,21 @@ func (r *repository) fetch() error {
r.etag = resp.Header.Get("Etag")
r.segments = featureResp.SegmentsMap()
r.options.storage.Reset(featureResp.FeatureMap(), true)
r.successfulFetch()
r.Unlock()
return nil
}

func statusIsOK(resp *http.Response) error {
func (r *repository) statusIsOK(resp *http.Response) error {
s := resp.StatusCode
if 200 <= s && s < 300 {
if http.StatusOK <= s && s < http.StatusMultipleChoices {
return nil
} else if s == http.StatusUnauthorized || s == http.StatusForbidden || s == http.StatusNotFound {
r.configurationError()
return fmt.Errorf("%s %s returned status code %d your SDK is most likely misconfigured, backing off to maximum (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.maxSkips)
} else if s == http.StatusTooManyRequests || s >= http.StatusInternalServerError {
r.backoff()
return fmt.Errorf("%s %s returned status code %d, backing off (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.errors)
}

return fmt.Errorf("%s %s returned status code %d", resp.Request.Method, resp.Request.URL, s)
Expand Down
60 changes: 60 additions & 0 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package unleash
import (
"bytes"
"encoding/json"
"gopkg.in/h2non/gock.v1"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -127,3 +128,62 @@ func TestRepository_ParseAPIResponse(t *testing.T) {
assert.Equal(2, len(response.Features))
assert.Equal(0, len(response.Segments))
}


func TestRepository_backs_off_on_http_statuses(t *testing.T) {
a := assert.New(t)
testCases := []struct {
statusCode int
errorCount float64
}{
{ 401, 10},
{ 403, 10},
{ 404, 10},
{ 429, 1},
{ 500, 1},
{ 502, 1},
{ 503, 1},
}
defer gock.Off()
for _, tc := range testCases {
gock.New(mockerServer).
Get("/client/features").
Reply(tc.statusCode)
client, err := NewClient(
WithUrl(mockerServer),
WithAppName(mockAppName),
WithDisableMetrics(true),
WithInstanceId(mockInstanceId),
WithRefreshInterval(time.Millisecond * 15),
)
a.Nil(err)
time.Sleep(20 * time.Millisecond)
err = client.Close()
a.Equal(tc.errorCount, client.repository.errors)
a.Nil(err)
}
}
func TestRepository_back_offs_are_gradually_reduced_on_success(t *testing.T) {
a := assert.New(t)
defer gock.Off()
gock.New(mockerServer).
Get("/client/features").
Times(4).
Reply(429)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
BodyString(`{ "version": 2, "features": []}`)
client, err := NewClient(
WithUrl(mockerServer),
WithAppName(mockAppName),
WithDisableMetrics(true),
WithInstanceId(mockInstanceId),
WithRefreshInterval(time.Millisecond * 10),
)
a.Nil(err)
client.WaitForReady()
err = client.Close()
a.Equal(float64(3), client.repository.errors) // 4 failures, and then one success, should reduce error count to 3
a.Nil(err)
}
1 change: 1 addition & 0 deletions spec_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build norace
// +build norace

package unleash
Expand Down
2 changes: 1 addition & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func every(slice interface{}, condition func(interface{}) bool) bool {
return false
}

if (sliceValue.Len() == 0) {
if sliceValue.Len() == 0 {
return false
}

Expand Down
1 change: 0 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,3 @@ func TestContains(t *testing.T) {
}
})
}

Loading