Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(konnect): introduce backoff strategy for config updates #3989

Merged
merged 5 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ Adding a new version? You'll need three changes:
Kong gateway runs in router flavor `expressions`.
[#3956](https://github.com/Kong/kubernetes-ingress-controller/pull/3956)
[#3988](https://github.com/Kong/kubernetes-ingress-controller/pull/3988)
- Configuration updates to Konnect Runtime Group's Admin API now respect a backoff
strategy that prevents KIC from exceeding API calls limits.
[#3989](https://github.com/Kong/kubernetes-ingress-controller/pull/3989)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/jpillora/backoff v1.0.0
github.com/kong/deck v1.20.0
github.com/kong/go-kong v0.41.0
github.com/kong/kubernetes-telemetry v0.0.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
Expand Down
16 changes: 16 additions & 0 deletions internal/adminapi/backoff_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package adminapi

// UpdateBackoffStrategy keeps state of an update backoff strategy.
type UpdateBackoffStrategy interface {
// CanUpdate tells whether we're allowed to make an update attempt for a given config hash.
// In case it returns false, the second return value is a human-readable explanation of why the update cannot
// be performed at this point in time.
CanUpdate([]byte) (bool, string)

// RegisterUpdateSuccess resets the backoff strategy, effectively making it allow next update straight away.
RegisterUpdateSuccess()

// RegisterUpdateFailure registers an update failure along with its failure reason passed as a generic error, and
// a config hash that we failed to push.
RegisterUpdateFailure(failureReason error, configHash []byte)
}
142 changes: 142 additions & 0 deletions internal/adminapi/backoff_strategy_konnect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package adminapi

import (
"bytes"
"fmt"
"strings"
"sync"
"time"

"github.com/jpillora/backoff"
"github.com/kong/go-kong/kong"
"github.com/samber/lo"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/deckerrors"
)

const (
KonnectBackoffInitialInterval = time.Second * 3
KonnectBackoffMaxInterval = time.Minute * 15
KonnectBackoffMultiplier = 2
)

type Clock interface {
Now() time.Time
}

type SystemClock struct{}

func (SystemClock) Now() time.Time { return time.Now() }

// KonnectBackoffStrategy keeps track of Konnect config push backoffs.
//
// It takes into account:
// - a regular exponential backoff that is incremented on every Update failure,
// - a last failed configuration hash (where we skip Update until a config changes).
//
// It's important to note that KonnectBackoffStrategy can use the latter (config hash)
// because of the nature of the one-directional integration where KIC is the only
// component responsible for populating configuration of Konnect's Runtime Group.
// In case that changes in the future (e.g. manual modifications to parts of the
// configuration are allowed on Konnect side for some reason), we might have to
// drop this part of the backoff strategy.
type KonnectBackoffStrategy struct {
b *backoff.Backoff
nextAttempt time.Time
clock Clock
lastFailedConfigHash []byte

lock sync.RWMutex
}

func NewKonnectBackoffStrategy(clock Clock) *KonnectBackoffStrategy {
exponentialBackoff := &backoff.Backoff{
Min: KonnectBackoffInitialInterval,
Max: KonnectBackoffMaxInterval,
Factor: KonnectBackoffMultiplier,
}
exponentialBackoff.Reset()

return &KonnectBackoffStrategy{
b: exponentialBackoff,
clock: clock,
}
}

func (s *KonnectBackoffStrategy) CanUpdate(configHash []byte) (bool, string) {
s.lock.RLock()
defer s.lock.RUnlock()

// The exponential backoff duration is satisfied.
// In case of the first attempt it will be satisfied as s.nextAttempt will be a zero value which is always in the past.
timeLeft := s.nextAttempt.Sub(s.clock.Now())
exponentialBackoffSatisfied := timeLeft <= 0

// The configuration we're attempting to update is not the same faulty config we've already tried pushing.
isTheSameFaultyConfig := s.lastFailedConfigHash != nil && bytes.Equal(s.lastFailedConfigHash, configHash)

// In case both conditions are satisfied, we're good to make an attempt.
if exponentialBackoffSatisfied && !isTheSameFaultyConfig {
return true, ""
}

// Otherwise, we build a human-readable explanation of why the update cannot be performed at this point in time.
return false, s.whyCannotUpdate(timeLeft, isTheSameFaultyConfig)
}

func (s *KonnectBackoffStrategy) RegisterUpdateFailure(err error, configHash []byte) {
s.lock.Lock()
defer s.lock.Unlock()

if errs := deckerrors.ExtractAPIErrors(err); len(errs) > 0 {
_, hasClientError := lo.Find(errs, func(item *kong.APIError) bool {
return item.Code() >= 400 && item.Code() < 500
})

// We store the failed configuration hash only in case we receive a client error code [400, 500).
// It's because we don't want to repeatedly try sending the config that we know is faulty on our side.
// It only makes sense to retry when the config changes.
if hasClientError {
s.lastFailedConfigHash = configHash
} else {
s.lastFailedConfigHash = nil
}
}

// Backoff.Duration() call returns backoff time we need to wait until next attempt.
// It also increments the internal attempts counter so the next time we call it, the
// duration will be multiplied accordingly.
timeLeft := s.b.Duration()

// We're storing the exact point in time after which we'll be allowed to perform the next update attempt.
s.nextAttempt = s.clock.Now().Add(timeLeft)
}

func (s *KonnectBackoffStrategy) RegisterUpdateSuccess() {
s.lock.Lock()
defer s.lock.Unlock()

s.b.Reset()
s.nextAttempt = time.Time{}
s.lastFailedConfigHash = nil
}

func (s *KonnectBackoffStrategy) whyCannotUpdate(
timeLeft time.Duration,
isTheSameFaultyConfig bool,
) string {
var reasons []string

if isTheSameFaultyConfig {
reasons = append(reasons, fmt.Sprintf(
"config has to be changed: %q hash has already failed to be pushed with a client error",
string(s.lastFailedConfigHash),
))
}

if timeLeft > 0 {
reasons = append(reasons, fmt.Sprintf("next attempt allowed in %s", timeLeft))
}

return strings.Join(reasons, ", ")
}
123 changes: 123 additions & 0 deletions internal/adminapi/backoff_strategy_konnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package adminapi_test

import (
"errors"
"net/http"
"testing"
"time"

"github.com/kong/go-kong/kong"
"github.com/stretchr/testify/assert"

"github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi"
)

type mockClock struct {
n time.Time
}

func newMockClock() *mockClock {
return &mockClock{n: time.Now()}
}

func (m *mockClock) Now() time.Time {
return m.n
}

func (m *mockClock) MoveBy(d time.Duration) {
pmalek marked this conversation as resolved.
Show resolved Hide resolved
m.n = m.n.Add(d)
}

func TestKonnectBackoffStrategy(t *testing.T) {
var (
clock = newMockClock()
hashOne = []byte("1")
hashTwo = []byte("2")
)

t.Run("on init allows any updates", func(t *testing.T) {
strategy := adminapi.NewKonnectBackoffStrategy(clock)

// First try, no failure in the past, should always allow update, register failure.
canUpdate, whyNot := strategy.CanUpdate(hashOne)
assert.True(t, canUpdate, "should allow any update as a first one")
assert.Empty(t, whyNot)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.True(t, canUpdate, "should allow any update as a first one")
assert.Empty(t, whyNot)
})

t.Run("generic failure triggers backoff time requirement", func(t *testing.T) {
strategy := adminapi.NewKonnectBackoffStrategy(clock)

// After failure, time moves by 1s (below initial backoff time), should not allow update.
strategy.RegisterUpdateFailure(errors.New("error occurred"), hashOne)
clock.MoveBy(time.Second)

canUpdate, whyNot := strategy.CanUpdate(hashTwo)
assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied")
assert.Equal(t, "next attempt allowed in 2s", whyNot)

// Time moves by 5s (enough for the backoff next try).
clock.MoveBy(time.Second * 5)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.True(t, canUpdate, "should allow update for different hash when enough time has passed")
assert.Empty(t, whyNot)
})

t.Run("client error triggers faulty hash requirements", func(t *testing.T) {
strategy := adminapi.NewKonnectBackoffStrategy(clock)

strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusBadRequest, ""), hashOne)
clock.MoveBy(time.Second * 5)

canUpdate, whyNot := strategy.CanUpdate(hashOne)
assert.False(t, canUpdate, "should not allow update for the same faulty hash")
assert.Equal(t, "config has to be changed: \"1\" hash has already failed to be pushed with a client error", whyNot)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.True(t, canUpdate, "should allow update for another hash")
assert.Empty(t, whyNot)
})

t.Run("success resets both hash and backoff requirements", func(t *testing.T) {
strategy := adminapi.NewKonnectBackoffStrategy(clock)

strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusBadRequest, ""), hashOne)

canUpdate, whyNot := strategy.CanUpdate(hashOne)
assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied")
assert.Equal(t, "config has to be changed: \"1\" hash has already failed to be pushed with a client error, next attempt allowed in 3s", whyNot)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied")
assert.Equal(t, "next attempt allowed in 3s", whyNot)

strategy.RegisterUpdateSuccess()

canUpdate, whyNot = strategy.CanUpdate(hashOne)
assert.True(t, canUpdate, "should allow any update after the last success")
assert.Empty(t, whyNot)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.True(t, canUpdate, "should allow any update after the last success")
assert.Empty(t, whyNot)
})

t.Run("server error does not trigger faulty hash requirement", func(t *testing.T) {
strategy := adminapi.NewKonnectBackoffStrategy(clock)

strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusInternalServerError, ""), hashOne)
clock.MoveBy(time.Second * 5)

canUpdate, whyNot := strategy.CanUpdate(hashOne)
assert.True(t, canUpdate, "should allow update for the same faulty hash as it was a server error")
assert.Empty(t, whyNot)

canUpdate, whyNot = strategy.CanUpdate(hashTwo)
assert.True(t, canUpdate, "should allow update for another hash")
assert.Empty(t, whyNot)
})
}
32 changes: 22 additions & 10 deletions internal/adminapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ func NewClient(c *kong.Client) *Client {
}
}

// NewKonnectClient creates an Admin API client that is to be used with a Konnect Runtime Group Admin API.
func NewKonnectClient(c *kong.Client, runtimeGroup string) *Client {
return &Client{
adminAPIClient: c,
isKonnect: true,
konnectRuntimeGroup: runtimeGroup,
pluginSchemaStore: util.NewPluginSchemaStore(c),
}
}

// NewTestClient creates a client for test purposes.
func NewTestClient(address string) (*Client, error) {
kongClient, err := kong.NewTestClient(lo.ToPtr(address), &http.Client{})
Expand All @@ -57,6 +47,28 @@ func NewTestClient(address string) (*Client, error) {
return NewClient(kongClient), nil
}

type KonnectClient struct {
Client
backoffStrategy UpdateBackoffStrategy
}

// NewKonnectClient creates an Admin API client that is to be used with a Konnect Runtime Group Admin API.
func NewKonnectClient(c *kong.Client, runtimeGroup string) *KonnectClient {
return &KonnectClient{
Client: Client{
adminAPIClient: c,
isKonnect: true,
konnectRuntimeGroup: runtimeGroup,
pluginSchemaStore: util.NewPluginSchemaStore(c),
},
backoffStrategy: NewKonnectBackoffStrategy(SystemClock{}),
}
}

func (c *KonnectClient) BackoffStrategy() UpdateBackoffStrategy {
return c.backoffStrategy
}

// AdminAPIClient returns an underlying go-kong's Admin API client.
func (c *Client) AdminAPIClient() *kong.Client {
return c.adminAPIClient
Expand Down
2 changes: 1 addition & 1 deletion internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type KonnectConfig struct {
TLSClient TLSClientConfig
}

func NewKongClientForKonnectRuntimeGroup(c KonnectConfig) (*Client, error) {
func NewKongClientForKonnectRuntimeGroup(c KonnectConfig) (*KonnectClient, error) {
clientCertificate, err := tlsutil.ExtractClientCertificates(
[]byte(c.TLSClient.Cert),
c.TLSClient.CertFile,
Expand Down
Loading