diff --git a/CHANGELOG.md b/CHANGELOG.md index 590e5ba922..c8ece5bb8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -106,6 +106,9 @@ Adding a new version? You'll need three changes: Kong gateway runs in router flavor `expressions`. [#3956](https://github.com/Kong/kubernetes-ingress-controller/pull/3956) [#3988](https://github.com/Kong/kubernetes-ingress-controller/pull/3988) +- Configuration updates to Konnect Runtime Group's Admin API now respect a backoff + strategy that prevents KIC from exceeding API calls limits. + [#3989](https://github.com/Kong/kubernetes-ingress-controller/pull/3989) ### Fixed diff --git a/go.mod b/go.mod index 547f3efc42..1c5a843468 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-logr/logr v1.2.4 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 + github.com/jpillora/backoff v1.0.0 github.com/kong/deck v1.20.0 github.com/kong/go-kong v0.41.0 github.com/kong/kubernetes-telemetry v0.0.4 diff --git a/go.sum b/go.sum index f07f10deba..b3f1aa3f6c 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= diff --git a/internal/adminapi/backoff_strategy.go b/internal/adminapi/backoff_strategy.go new file mode 100644 index 0000000000..2cf2eade71 --- /dev/null +++ b/internal/adminapi/backoff_strategy.go @@ -0,0 +1,16 @@ +package adminapi + +// UpdateBackoffStrategy keeps state of an update backoff strategy. +type UpdateBackoffStrategy interface { + // CanUpdate tells whether we're allowed to make an update attempt for a given config hash. + // In case it returns false, the second return value is a human-readable explanation of why the update cannot + // be performed at this point in time. + CanUpdate([]byte) (bool, string) + + // RegisterUpdateSuccess resets the backoff strategy, effectively making it allow next update straight away. + RegisterUpdateSuccess() + + // RegisterUpdateFailure registers an update failure along with its failure reason passed as a generic error, and + // a config hash that we failed to push. + RegisterUpdateFailure(failureReason error, configHash []byte) +} diff --git a/internal/adminapi/backoff_strategy_konnect.go b/internal/adminapi/backoff_strategy_konnect.go new file mode 100644 index 0000000000..a376a2f56b --- /dev/null +++ b/internal/adminapi/backoff_strategy_konnect.go @@ -0,0 +1,142 @@ +package adminapi + +import ( + "bytes" + "fmt" + "strings" + "sync" + "time" + + "github.com/jpillora/backoff" + "github.com/kong/go-kong/kong" + "github.com/samber/lo" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/deckerrors" +) + +const ( + KonnectBackoffInitialInterval = time.Second * 3 + KonnectBackoffMaxInterval = time.Minute * 15 + KonnectBackoffMultiplier = 2 +) + +type Clock interface { + Now() time.Time +} + +type SystemClock struct{} + +func (SystemClock) Now() time.Time { return time.Now() } + +// KonnectBackoffStrategy keeps track of Konnect config push backoffs. +// +// It takes into account: +// - a regular exponential backoff that is incremented on every Update failure, +// - a last failed configuration hash (where we skip Update until a config changes). +// +// It's important to note that KonnectBackoffStrategy can use the latter (config hash) +// because of the nature of the one-directional integration where KIC is the only +// component responsible for populating configuration of Konnect's Runtime Group. +// In case that changes in the future (e.g. manual modifications to parts of the +// configuration are allowed on Konnect side for some reason), we might have to +// drop this part of the backoff strategy. +type KonnectBackoffStrategy struct { + b *backoff.Backoff + nextAttempt time.Time + clock Clock + lastFailedConfigHash []byte + + lock sync.RWMutex +} + +func NewKonnectBackoffStrategy(clock Clock) *KonnectBackoffStrategy { + exponentialBackoff := &backoff.Backoff{ + Min: KonnectBackoffInitialInterval, + Max: KonnectBackoffMaxInterval, + Factor: KonnectBackoffMultiplier, + } + exponentialBackoff.Reset() + + return &KonnectBackoffStrategy{ + b: exponentialBackoff, + clock: clock, + } +} + +func (s *KonnectBackoffStrategy) CanUpdate(configHash []byte) (bool, string) { + s.lock.RLock() + defer s.lock.RUnlock() + + // The exponential backoff duration is satisfied. + // In case of the first attempt it will be satisfied as s.nextAttempt will be a zero value which is always in the past. + timeLeft := s.nextAttempt.Sub(s.clock.Now()) + exponentialBackoffSatisfied := timeLeft <= 0 + + // The configuration we're attempting to update is not the same faulty config we've already tried pushing. + isTheSameFaultyConfig := s.lastFailedConfigHash != nil && bytes.Equal(s.lastFailedConfigHash, configHash) + + // In case both conditions are satisfied, we're good to make an attempt. + if exponentialBackoffSatisfied && !isTheSameFaultyConfig { + return true, "" + } + + // Otherwise, we build a human-readable explanation of why the update cannot be performed at this point in time. + return false, s.whyCannotUpdate(timeLeft, isTheSameFaultyConfig) +} + +func (s *KonnectBackoffStrategy) RegisterUpdateFailure(err error, configHash []byte) { + s.lock.Lock() + defer s.lock.Unlock() + + if errs := deckerrors.ExtractAPIErrors(err); len(errs) > 0 { + _, hasClientError := lo.Find(errs, func(item *kong.APIError) bool { + return item.Code() >= 400 && item.Code() < 500 + }) + + // We store the failed configuration hash only in case we receive a client error code [400, 500). + // It's because we don't want to repeatedly try sending the config that we know is faulty on our side. + // It only makes sense to retry when the config changes. + if hasClientError { + s.lastFailedConfigHash = configHash + } else { + s.lastFailedConfigHash = nil + } + } + + // Backoff.Duration() call returns backoff time we need to wait until next attempt. + // It also increments the internal attempts counter so the next time we call it, the + // duration will be multiplied accordingly. + timeLeft := s.b.Duration() + + // We're storing the exact point in time after which we'll be allowed to perform the next update attempt. + s.nextAttempt = s.clock.Now().Add(timeLeft) +} + +func (s *KonnectBackoffStrategy) RegisterUpdateSuccess() { + s.lock.Lock() + defer s.lock.Unlock() + + s.b.Reset() + s.nextAttempt = time.Time{} + s.lastFailedConfigHash = nil +} + +func (s *KonnectBackoffStrategy) whyCannotUpdate( + timeLeft time.Duration, + isTheSameFaultyConfig bool, +) string { + var reasons []string + + if isTheSameFaultyConfig { + reasons = append(reasons, fmt.Sprintf( + "config has to be changed: %q hash has already failed to be pushed with a client error", + string(s.lastFailedConfigHash), + )) + } + + if timeLeft > 0 { + reasons = append(reasons, fmt.Sprintf("next attempt allowed in %s", timeLeft)) + } + + return strings.Join(reasons, ", ") +} diff --git a/internal/adminapi/backoff_strategy_konnect_test.go b/internal/adminapi/backoff_strategy_konnect_test.go new file mode 100644 index 0000000000..87f6296db5 --- /dev/null +++ b/internal/adminapi/backoff_strategy_konnect_test.go @@ -0,0 +1,123 @@ +package adminapi_test + +import ( + "errors" + "net/http" + "testing" + "time" + + "github.com/kong/go-kong/kong" + "github.com/stretchr/testify/assert" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" +) + +type mockClock struct { + n time.Time +} + +func newMockClock() *mockClock { + return &mockClock{n: time.Now()} +} + +func (m *mockClock) Now() time.Time { + return m.n +} + +func (m *mockClock) MoveBy(d time.Duration) { + m.n = m.n.Add(d) +} + +func TestKonnectBackoffStrategy(t *testing.T) { + var ( + clock = newMockClock() + hashOne = []byte("1") + hashTwo = []byte("2") + ) + + t.Run("on init allows any updates", func(t *testing.T) { + strategy := adminapi.NewKonnectBackoffStrategy(clock) + + // First try, no failure in the past, should always allow update, register failure. + canUpdate, whyNot := strategy.CanUpdate(hashOne) + assert.True(t, canUpdate, "should allow any update as a first one") + assert.Empty(t, whyNot) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.True(t, canUpdate, "should allow any update as a first one") + assert.Empty(t, whyNot) + }) + + t.Run("generic failure triggers backoff time requirement", func(t *testing.T) { + strategy := adminapi.NewKonnectBackoffStrategy(clock) + + // After failure, time moves by 1s (below initial backoff time), should not allow update. + strategy.RegisterUpdateFailure(errors.New("error occurred"), hashOne) + clock.MoveBy(time.Second) + + canUpdate, whyNot := strategy.CanUpdate(hashTwo) + assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied") + assert.Equal(t, "next attempt allowed in 2s", whyNot) + + // Time moves by 5s (enough for the backoff next try). + clock.MoveBy(time.Second * 5) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.True(t, canUpdate, "should allow update for different hash when enough time has passed") + assert.Empty(t, whyNot) + }) + + t.Run("client error triggers faulty hash requirements", func(t *testing.T) { + strategy := adminapi.NewKonnectBackoffStrategy(clock) + + strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusBadRequest, ""), hashOne) + clock.MoveBy(time.Second * 5) + + canUpdate, whyNot := strategy.CanUpdate(hashOne) + assert.False(t, canUpdate, "should not allow update for the same faulty hash") + assert.Equal(t, "config has to be changed: \"1\" hash has already failed to be pushed with a client error", whyNot) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.True(t, canUpdate, "should allow update for another hash") + assert.Empty(t, whyNot) + }) + + t.Run("success resets both hash and backoff requirements", func(t *testing.T) { + strategy := adminapi.NewKonnectBackoffStrategy(clock) + + strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusBadRequest, ""), hashOne) + + canUpdate, whyNot := strategy.CanUpdate(hashOne) + assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied") + assert.Equal(t, "config has to be changed: \"1\" hash has already failed to be pushed with a client error, next attempt allowed in 3s", whyNot) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.False(t, canUpdate, "should not allow next update when last failed and backoff time wasn't satisfied") + assert.Equal(t, "next attempt allowed in 3s", whyNot) + + strategy.RegisterUpdateSuccess() + + canUpdate, whyNot = strategy.CanUpdate(hashOne) + assert.True(t, canUpdate, "should allow any update after the last success") + assert.Empty(t, whyNot) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.True(t, canUpdate, "should allow any update after the last success") + assert.Empty(t, whyNot) + }) + + t.Run("server error does not trigger faulty hash requirement", func(t *testing.T) { + strategy := adminapi.NewKonnectBackoffStrategy(clock) + + strategy.RegisterUpdateFailure(kong.NewAPIError(http.StatusInternalServerError, ""), hashOne) + clock.MoveBy(time.Second * 5) + + canUpdate, whyNot := strategy.CanUpdate(hashOne) + assert.True(t, canUpdate, "should allow update for the same faulty hash as it was a server error") + assert.Empty(t, whyNot) + + canUpdate, whyNot = strategy.CanUpdate(hashTwo) + assert.True(t, canUpdate, "should allow update for another hash") + assert.Empty(t, whyNot) + }) +} diff --git a/internal/adminapi/client.go b/internal/adminapi/client.go index 736f299b25..3ed7e052e1 100644 --- a/internal/adminapi/client.go +++ b/internal/adminapi/client.go @@ -37,16 +37,6 @@ func NewClient(c *kong.Client) *Client { } } -// NewKonnectClient creates an Admin API client that is to be used with a Konnect Runtime Group Admin API. -func NewKonnectClient(c *kong.Client, runtimeGroup string) *Client { - return &Client{ - adminAPIClient: c, - isKonnect: true, - konnectRuntimeGroup: runtimeGroup, - pluginSchemaStore: util.NewPluginSchemaStore(c), - } -} - // NewTestClient creates a client for test purposes. func NewTestClient(address string) (*Client, error) { kongClient, err := kong.NewTestClient(lo.ToPtr(address), &http.Client{}) @@ -57,6 +47,28 @@ func NewTestClient(address string) (*Client, error) { return NewClient(kongClient), nil } +type KonnectClient struct { + Client + backoffStrategy UpdateBackoffStrategy +} + +// NewKonnectClient creates an Admin API client that is to be used with a Konnect Runtime Group Admin API. +func NewKonnectClient(c *kong.Client, runtimeGroup string) *KonnectClient { + return &KonnectClient{ + Client: Client{ + adminAPIClient: c, + isKonnect: true, + konnectRuntimeGroup: runtimeGroup, + pluginSchemaStore: util.NewPluginSchemaStore(c), + }, + backoffStrategy: NewKonnectBackoffStrategy(SystemClock{}), + } +} + +func (c *KonnectClient) BackoffStrategy() UpdateBackoffStrategy { + return c.backoffStrategy +} + // AdminAPIClient returns an underlying go-kong's Admin API client. func (c *Client) AdminAPIClient() *kong.Client { return c.adminAPIClient diff --git a/internal/adminapi/konnect.go b/internal/adminapi/konnect.go index 1ca8cf1a51..62d21203ac 100644 --- a/internal/adminapi/konnect.go +++ b/internal/adminapi/konnect.go @@ -28,7 +28,7 @@ type KonnectConfig struct { TLSClient TLSClientConfig } -func NewKongClientForKonnectRuntimeGroup(c KonnectConfig) (*Client, error) { +func NewKongClientForKonnectRuntimeGroup(c KonnectConfig) (*KonnectClient, error) { clientCertificate, err := tlsutil.ExtractClientCertificates( []byte(c.TLSClient.Cert), c.TLSClient.CertFile, diff --git a/internal/clients/manager.go b/internal/clients/manager.go index 2e78e15ae4..b39851208a 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -38,7 +38,7 @@ type AdminAPIClientsManager struct { // konnectClient represents a special-case of the data-plane which is Konnect cloud. // This client is used to synchronise configuration with Konnect's Runtime Group Admin API. - konnectClient *adminapi.Client + konnectClient *adminapi.KonnectClient // lock prevents concurrent access to the manager's fields. lock sync.RWMutex @@ -101,25 +101,16 @@ func (c *AdminAPIClientsManager) Notify(discoveredAPIs []adminapi.DiscoveredAdmi // SetKonnectClient sets a client that will be used to communicate with Konnect Runtime Group Admin API. // If called multiple times, it will override the client. -func (c *AdminAPIClientsManager) SetKonnectClient(client *adminapi.Client) { +func (c *AdminAPIClientsManager) SetKonnectClient(client *adminapi.KonnectClient) { c.lock.Lock() defer c.lock.Unlock() c.konnectClient = client } -// AllClients returns a copy of current client's slice. It will also include Konnect client if set. -func (c *AdminAPIClientsManager) AllClients() []*adminapi.Client { +func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient { c.lock.RLock() defer c.lock.RUnlock() - - copied := make([]*adminapi.Client, len(c.gatewayClients)) - copy(copied, c.gatewayClients) - - if c.konnectClient != nil { - copied = append(copied, c.konnectClient) - } - - return copied + return c.konnectClient } // GatewayClients returns a copy of current client's slice. Konnect client won't be included. @@ -272,6 +263,6 @@ func (c *AdminAPIClientsManager) closeGatewayClientsSubscribers() { // AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that // we should configure. type AdminAPIClientsProvider interface { - AllClients() []*adminapi.Client + KonnectClient() *adminapi.KonnectClient GatewayClients() []*adminapi.Client } diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 05d4d83461..b52f7eeff6 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -118,7 +118,7 @@ func TestClientAddressesNotifications(t *testing.T) { requireClientsCountEventually := func(t *testing.T, c *AdminAPIClientsManager, addresses []string, args ...any) { require.Eventually(t, func() bool { - clientAddresses := lo.Map(c.AllClients(), func(cl *adminapi.Client, _ int) string { + clientAddresses := lo.Map(c.GatewayClients(), func(cl *adminapi.Client, _ int) string { return cl.BaseRootURL() }) return slices.Equal(addresses, clientAddresses) @@ -179,7 +179,7 @@ func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { manager.RunNotifyLoop() <-manager.Running() - clients := manager.AllClients() + clients := manager.GatewayClients() require.Len(t, clients, 1) require.Equal(t, "localhost:8083", clients[0].BaseRootURL()) @@ -266,14 +266,12 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { require.NoError(t, err) require.Len(t, m.GatewayClients(), 1, "expecting one initial client") require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client") - require.Len(t, m.AllClients(), 1, "expecting one initial client") - konnectTestClient, err := adminapi.NewTestClient("https://us.api.konghq.tech") - require.NoError(t, err) + konnectTestClient := &adminapi.KonnectClient{} m.SetKonnectClient(konnectTestClient) require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients") require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount") - require.Len(t, m.AllClients(), 2, "konnect client should be returned from AllClients") + require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient") } func TestAdminAPIClientsManager_GatewayClientsFromNotificationsAreExpectedToHavePodRef(t *testing.T) { diff --git a/internal/dataplane/deckerrors/api.go b/internal/dataplane/deckerrors/api.go new file mode 100644 index 0000000000..af1a0fd1a2 --- /dev/null +++ b/internal/dataplane/deckerrors/api.go @@ -0,0 +1,39 @@ +package deckerrors + +import ( + "errors" + + deckutils "github.com/kong/deck/utils" + "github.com/kong/go-kong/kong" +) + +// ExtractAPIErrors tries to extract kong.APIErrors from the generic error. +// It might be used when inspection of the error details is needed, e.g. its status code. +func ExtractAPIErrors(err error) []*kong.APIError { + // It might be a single APIError. + if apiErr, ok := castAsErr[*kong.APIError](err); ok { + return []*kong.APIError{apiErr} + } + + // It might be either a deckutils.ErrArray with APIErrors inside. + var deckErrArray deckutils.ErrArray + if errors.As(err, &deckErrArray) { + var apiErrs []*kong.APIError + for _, err := range deckErrArray.Errors { + if apiErr, ok := castAsErr[*kong.APIError](err); ok { + apiErrs = append(apiErrs, apiErr) + } + } + return apiErrs + } + + return nil +} + +func castAsErr[T error](err error) (T, bool) { + var target T + if errors.As(err, &target) { + return target, true + } + return target, false +} diff --git a/internal/dataplane/deckerrors/api_test.go b/internal/dataplane/deckerrors/api_test.go new file mode 100644 index 0000000000..d4fe73bd9f --- /dev/null +++ b/internal/dataplane/deckerrors/api_test.go @@ -0,0 +1,59 @@ +package deckerrors_test + +import ( + "errors" + "net/http" + "testing" + + deckutils "github.com/kong/deck/utils" + "github.com/kong/go-kong/kong" + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/deckerrors" +) + +func TestExtractAPIErrors(t *testing.T) { + var ( + genericErr = errors.New("not an api error") + apiErr = kong.NewAPIError(http.StatusBadRequest, "api error") + ) + + testCases := []struct { + name string + input error + expected []*kong.APIError + }{ + { + name: "nil", + input: nil, + expected: nil, + }, + { + name: "generic error", + input: genericErr, + expected: nil, + }, + { + name: "api error", + input: apiErr, + expected: []*kong.APIError{apiErr}, + }, + { + name: "deck array of errors with no api error", + input: deckutils.ErrArray{Errors: []error{genericErr, genericErr}}, + expected: nil, + }, + { + name: "deck array of errors with an api error among generic ones", + input: deckutils.ErrArray{Errors: []error{genericErr, apiErr, genericErr}}, + expected: []*kong.APIError{apiErr}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + out := deckerrors.ExtractAPIErrors(tc.input) + require.Equal(t, tc.expected, out) + }) + } +} diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 6d5e72feb8..e9f202f080 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -2,6 +2,7 @@ package dataplane import ( "context" + "errors" "fmt" "reflect" "sort" @@ -367,12 +368,14 @@ func (c *KongClient) Update(ctx context.Context) error { c.logger.Debug("successfully built data-plane configuration") } - shas, err := c.sendOutToClients(ctx, parsingResult.KongState, c.kongConfig) + shas, err := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig) if err != nil { c.configStatusNotifier.NotifyConfigStatus(ctx, clients.ConfigStatusApplyFailed) return err } + c.trySendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig) + // succeeded to apply configuration to Kong gateway. // notify the receiver of config status that translation error happened when there are translation errors, // otherwise notify that config status is OK. @@ -396,18 +399,16 @@ func (c *KongClient) Update(ctx context.Context) error { return nil } -// sendOutToClients will generate deck content (config) from the provided kong state -// and send it out to each of the configured clients. -func (c *KongClient) sendOutToClients( +// sendOutToGatewayClients will generate deck content (config) from the provided kong state +// and send it out to each of the configured gateway clients. +func (c *KongClient) sendOutToGatewayClients( ctx context.Context, s *kongstate.KongState, config sendconfig.Config, ) ([]string, error) { - clients := c.clientsProvider.AllClients() - c.logger.Debugf("sending configuration to %d clients", len(clients)) - shas, err := iter.MapErr(clients, func(client **adminapi.Client) (string, error) { - newSHA, err := c.sendToClient(ctx, *client, s, config) - return HandleSendToClientResult(*client, c.logger, newSHA, err) - }, - ) + gatewayClients := c.clientsProvider.GatewayClients() + c.logger.Debugf("sending configuration to %d gateway clients", len(gatewayClients)) + shas, err := iter.MapErr(gatewayClients, func(client **adminapi.Client) (string, error) { + return c.sendToClient(ctx, *client, s, config) + }) if err != nil { return nil, err } @@ -419,9 +420,30 @@ func (c *KongClient) sendOutToClients( return previousSHAs, nil } +// It will try to send ignore errors that are returned from Konnect client. +func (c *KongClient) trySendOutToKonnectClient(ctx context.Context, s *kongstate.KongState, config sendconfig.Config) { + konnectClient := c.clientsProvider.KonnectClient() + // There's no KonnectClient configured, that's totally fine. + if konnectClient == nil { + return + } + + if _, err := c.sendToClient(ctx, konnectClient, s, config); err != nil { + // In case of an error, we only log it since we don't want the Konnect to affect the basic functionality + // of the controller. + + if errors.Is(err, sendconfig.ErrUpdateSkippedDueToBackoffStrategy{}) { + c.logger.WithError(err).Warn("Skipped pushing configuration to Konnect") + return + } + + c.logger.WithError(err).Warn("Failed pushing configuration to Konnect") + } +} + func (c *KongClient) sendToClient( ctx context.Context, - client *adminapi.Client, + client sendconfig.AdminAPIClient, s *kongstate.KongState, config sendconfig.Config, ) (string, error) { @@ -473,20 +495,6 @@ func (c *KongClient) sendToClient( return string(newConfigSHA), nil } -// HandleSendToClientResult handles a result returned from sendToClient. -// It will ignore errors that are returned from Konnect client. -func HandleSendToClientResult(client sendconfig.KonnectAwareClient, logger logrus.FieldLogger, newSHA string, err error) (string, error) { - if err != nil { - // We do not collect errors from Konnect as they should not break the data-plane loop. - if client.IsKonnect() { - logger.WithError(err).Error("Failed pushing configuration to Konnect") - return "", nil - } - return "", err - } - return newSHA, nil -} - // SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results. // Currently it is used for uploading the node status to konnect runtime group. func (c *KongClient) SetConfigStatusNotifier(n clients.ConfigStatusNotifier) { diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index f23ef20b00..7b2b855b02 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -14,7 +14,6 @@ import ( gokong "github.com/kong/go-kong/kong" "github.com/samber/lo" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" @@ -127,81 +126,14 @@ var ( } ) -type konnectClient struct { - isKonnect bool -} - -func (c konnectClient) IsKonnect() bool { - return c.isKonnect -} - -func TestHandleSendToClientResult(t *testing.T) { - const testSHA = "2110454484b88378619111aab0d8a8b8d0ecad5c0ad1120a19810c965f8652dd" - testError := errors.New("sending to client failure") - - testCases := []struct { - name string - isKonnect bool - inputSHA string - inputErr error - - expectedErr error - expectedSHA string - }{ - { - name: "no error, sha is passed", - inputSHA: testSHA, - - expectedSHA: testSHA, - }, - { - name: "error is passed", - inputSHA: testSHA, - inputErr: testError, - - expectedErr: testError, - }, - { - name: "konnect - error is ignored", - isKonnect: true, - inputSHA: testSHA, - inputErr: testError, - - expectedErr: nil, - expectedSHA: "", - }, - { - name: "konnect - no error, sha is passed", - isKonnect: true, - inputSHA: testSHA, - - expectedSHA: testSHA, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - c := konnectClient{isKonnect: tc.isKonnect} - resultSHA, err := dataplane.HandleSendToClientResult(c, logrus.New(), tc.inputSHA, tc.inputErr) - assert.Equal(t, tc.expectedErr, err) - assert.Equal(t, tc.expectedSHA, resultSHA) - }) - } -} - // mockGatewayClientsProvider is a mock implementation of dataplane.AdminAPIClientsProvider. type mockGatewayClientsProvider struct { gatewayClients []*adminapi.Client - konnectClient *adminapi.Client + konnectClient *adminapi.KonnectClient } -func (f mockGatewayClientsProvider) AllClients() []*adminapi.Client { - all := make([]*adminapi.Client, len(f.gatewayClients)) - copy(all, f.gatewayClients) - if f.konnectClient != nil { - all = append(all, f.konnectClient) - } - return all +func (f mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { + return f.konnectClient } func (f mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { @@ -274,7 +206,7 @@ type mockUpdateStrategy struct { onUpdate func() error } -func (m *mockUpdateStrategy) Update(context.Context, *file.Content) ( +func (m *mockUpdateStrategy) Update(context.Context, sendconfig.ContentWithHash) ( err error, resourceErrors []sendconfig.ResourceError, resourceErrorsParseErr error, @@ -287,6 +219,10 @@ func (m *mockUpdateStrategy) MetricsProtocol() metrics.Protocol { return metrics.ProtocolDBLess } +func (m *mockUpdateStrategy) Type() string { + return "Mock" +} + // mockConfigurationChangeDetector is a mock implementation of sendconfig.ConfigurationChangeDetector. type mockConfigurationChangeDetector struct { hasConfigurationChanged bool @@ -321,7 +257,7 @@ func TestKongClientUpdate_AllExpectedClientsAreCalled(t *testing.T) { testCases := []struct { name string gatewayClients []*adminapi.Client - konnectClient *adminapi.Client + konnectClient *adminapi.KonnectClient errorOnUpdateForURLs []string expectError bool }{ @@ -393,7 +329,7 @@ func TestKongClientUpdate_AllExpectedClientsAreCalled(t *testing.T) { } require.NoError(t, err) - allExpectedURLs := mapClientsToUrls(clientsProvider.AllClients()) + allExpectedURLs := mapClientsToUrls(clientsProvider) updateStrategyResolver.assertUpdateCalledForURLs(allExpectedURLs) }) } @@ -461,7 +397,7 @@ func mustSampleGatewayClient(t *testing.T) *adminapi.Client { return c } -func mustSampleKonnectClient(t *testing.T) *adminapi.Client { +func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { t.Helper() c, err := gokong.NewClient(lo.ToPtr(fmt.Sprintf("https://%s.konghq.tech", uuid.NewString())), &http.Client{}) @@ -471,8 +407,12 @@ func mustSampleKonnectClient(t *testing.T) *adminapi.Client { return adminapi.NewKonnectClient(c, rgID) } -func mapClientsToUrls(clients []*adminapi.Client) []string { - return lo.Map(clients, func(c *adminapi.Client, _ int) string { +func mapClientsToUrls(clients mockGatewayClientsProvider) []string { + urls := lo.Map(clients.GatewayClients(), func(c *adminapi.Client, _ int) string { return c.BaseRootURL() }) + if clients.KonnectClient() != nil { + urls = append(urls, clients.KonnectClient().BaseRootURL()) + } + return urls } diff --git a/internal/dataplane/sendconfig/backoff_strategy.go b/internal/dataplane/sendconfig/backoff_strategy.go new file mode 100644 index 0000000000..6f990a32e1 --- /dev/null +++ b/internal/dataplane/sendconfig/backoff_strategy.go @@ -0,0 +1,80 @@ +package sendconfig + +import ( + "context" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/metrics" +) + +type ErrUpdateSkippedDueToBackoffStrategy struct { + explanation string +} + +func NewErrUpdateSkippedDueToBackoffStrategy(explanation string) ErrUpdateSkippedDueToBackoffStrategy { + return ErrUpdateSkippedDueToBackoffStrategy{explanation: explanation} +} + +func (e ErrUpdateSkippedDueToBackoffStrategy) Error() string { + return fmt.Sprintf("update skipped due to a backoff strategy not being satisfied: %s", e.explanation) +} + +func (e ErrUpdateSkippedDueToBackoffStrategy) Is(err error) bool { + return errors.Is(err, ErrUpdateSkippedDueToBackoffStrategy{}) +} + +// UpdateStrategyWithBackoff decorates any UpdateStrategy to respect a passed adminapi.UpdateBackoffStrategy. +type UpdateStrategyWithBackoff struct { + decorated UpdateStrategy + backoffStrategy adminapi.UpdateBackoffStrategy + log logrus.FieldLogger +} + +func NewUpdateStrategyWithBackoff( + decorated UpdateStrategy, + backoffStrategy adminapi.UpdateBackoffStrategy, + log logrus.FieldLogger, +) UpdateStrategyWithBackoff { + return UpdateStrategyWithBackoff{ + decorated: decorated, + backoffStrategy: backoffStrategy, + log: log, + } +} + +// Update will ensure that the decorated UpdateStrategy.Update is called only when an underlying +// UpdateBackoffStrategy.CanUpdate is satisfied. +// In case it's not, it will return a predefined ErrUpdateSkippedDueToBackoffStrategy. +// In case it is, apart from calling UpdateStrategy.Update, it will also register a success or a failure of an update +// attempt so that the UpdateBackoffStrategy can keep track of it. +func (s UpdateStrategyWithBackoff) Update(ctx context.Context, targetContent ContentWithHash) ( + err error, + resourceErrors []ResourceError, + resourceErrorsParseErr error, +) { + if canUpdate, whyNot := s.backoffStrategy.CanUpdate(targetContent.Hash); !canUpdate { + return NewErrUpdateSkippedDueToBackoffStrategy(whyNot), nil, nil + } + + err, resourceErrors, resourceErrorsParseErr = s.decorated.Update(ctx, targetContent) + if err != nil { + s.log.WithError(err).Debug("Update failed, registering it for backoff strategy") + s.backoffStrategy.RegisterUpdateFailure(err, targetContent.Hash) + } else { + s.backoffStrategy.RegisterUpdateSuccess() + } + + return err, resourceErrors, resourceErrorsParseErr +} + +func (s UpdateStrategyWithBackoff) MetricsProtocol() metrics.Protocol { + return s.decorated.MetricsProtocol() +} + +func (s UpdateStrategyWithBackoff) Type() string { + return fmt.Sprintf("WithBackoff(%s)", s.decorated.Type()) +} diff --git a/internal/dataplane/sendconfig/backoff_strategy_test.go b/internal/dataplane/sendconfig/backoff_strategy_test.go new file mode 100644 index 0000000000..0826d12b40 --- /dev/null +++ b/internal/dataplane/sendconfig/backoff_strategy_test.go @@ -0,0 +1,132 @@ +package sendconfig_test + +import ( + "context" + "errors" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v2/internal/metrics" +) + +type mockUpdateStrategy struct { + wasUpdateCalled bool + shouldSucceed bool +} + +func newMockUpdateStrategy(shouldSucceed bool) *mockUpdateStrategy { + return &mockUpdateStrategy{shouldSucceed: shouldSucceed} +} + +func (m *mockUpdateStrategy) Update(context.Context, sendconfig.ContentWithHash) ( + err error, + resourceErrors []sendconfig.ResourceError, + resourceErrorsParseErr error, +) { + m.wasUpdateCalled = true + + if !m.shouldSucceed { + return errors.New("update failure occurred"), nil, nil + } + + return nil, nil, nil +} + +func (m *mockUpdateStrategy) MetricsProtocol() metrics.Protocol { + return "Mock" +} + +func (m *mockUpdateStrategy) Type() string { + return "Mock" +} + +type mockBackoffStrategy struct { + allowUpdate bool + wasSuccessRegistered bool + wasFailureRegistered bool +} + +func (m *mockBackoffStrategy) CanUpdate([]byte) (bool, string) { + if m.allowUpdate { + return true, "" + } + + return false, "some reason" +} + +func (m *mockBackoffStrategy) RegisterUpdateSuccess() { + m.wasSuccessRegistered = true +} + +func (m *mockBackoffStrategy) RegisterUpdateFailure(error, []byte) { + m.wasFailureRegistered = true +} + +func newMockBackoffStrategy(allowUpdate bool) *mockBackoffStrategy { + return &mockBackoffStrategy{allowUpdate: allowUpdate} +} + +func TestUpdateStrategyWithBackoff(t *testing.T) { + ctx := context.Background() + log := logrus.New() + + testCases := []struct { + name string + + updateShouldBeAllowed bool + updateShouldSucceed bool + + expectUpdateCalled bool + expectSuccessRegistered bool + expectFailureRegistered bool + expectError error + }{ + { + name: "backoff allows update and it succeeds", + updateShouldBeAllowed: true, + updateShouldSucceed: true, + + expectUpdateCalled: true, + expectSuccessRegistered: true, + }, + { + name: "backoff allows update and it fails", + updateShouldBeAllowed: true, + updateShouldSucceed: false, + + expectUpdateCalled: true, + expectFailureRegistered: true, + expectError: errors.New("update failure occurred"), + }, + { + name: "backoff doesn't allow update, it doesn't happen and predefined error type is returned", + updateShouldBeAllowed: false, + + expectUpdateCalled: false, + expectError: sendconfig.NewErrUpdateSkippedDueToBackoffStrategy("some reason"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + updateStrategy := newMockUpdateStrategy(tc.updateShouldSucceed) + backoffStrategy := newMockBackoffStrategy(tc.updateShouldBeAllowed) + + decoratedStrategy := sendconfig.NewUpdateStrategyWithBackoff(updateStrategy, backoffStrategy, log) + err, _, _ := decoratedStrategy.Update(ctx, sendconfig.ContentWithHash{}) + if tc.expectError != nil { + require.Equal(t, tc.expectError, err) + } else { + require.NoError(t, err) + } + + assert.Equal(t, tc.expectUpdateCalled, updateStrategy.wasUpdateCalled) + assert.Equal(t, tc.expectSuccessRegistered, backoffStrategy.wasSuccessRegistered) + assert.Equal(t, tc.expectFailureRegistered, backoffStrategy.wasFailureRegistered) + }) + } +} diff --git a/internal/dataplane/sendconfig/dbmode.go b/internal/dataplane/sendconfig/dbmode.go index d8a9b38044..1c432409b7 100644 --- a/internal/dataplane/sendconfig/dbmode.go +++ b/internal/dataplane/sendconfig/dbmode.go @@ -39,7 +39,7 @@ func NewUpdateStrategyDBMode( } } -func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent *file.Content) ( +func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent ContentWithHash) ( err error, resourceErrors []ResourceError, resourceErrorsParseErr error, @@ -49,7 +49,7 @@ func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent *file.Co return fmt.Errorf("failed getting current state for %s: %w", s.client.BaseRootURL(), err), nil, nil } - ts, err := s.targetState(ctx, cs, targetContent) + ts, err := s.targetState(ctx, cs, targetContent.Content) if err != nil { return deckerrors.ConfigConflictError{Err: err}, nil, nil } @@ -76,6 +76,10 @@ func (s UpdateStrategyDBMode) MetricsProtocol() metrics.Protocol { return metrics.ProtocolDeck } +func (s UpdateStrategyDBMode) Type() string { + return "DBMode" +} + func (s UpdateStrategyDBMode) currentState(ctx context.Context) (*state.KongState, error) { rawState, err := dump.Get(ctx, s.client, s.dumpConfig) if err != nil { diff --git a/internal/dataplane/sendconfig/inmemory.go b/internal/dataplane/sendconfig/inmemory.go index 1415c4e826..b1e6543baf 100644 --- a/internal/dataplane/sendconfig/inmemory.go +++ b/internal/dataplane/sendconfig/inmemory.go @@ -7,7 +7,6 @@ import ( "fmt" "io" - "github.com/kong/deck/file" "github.com/sirupsen/logrus" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/deckgen" @@ -41,18 +40,18 @@ func NewUpdateStrategyInMemory( } } -func (s UpdateStrategyInMemory) Update(ctx context.Context, targetState *file.Content) ( +func (s UpdateStrategyInMemory) Update(ctx context.Context, targetState ContentWithHash) ( err error, resourceErrors []ResourceError, resourceErrorsParseErr error, ) { // Kong will error out if this is set - targetState.Info = nil + targetState.Content.Info = nil // Kong errors out if `null`s are present in `config` of plugins - deckgen.CleanUpNullsInPluginConfigs(targetState) + deckgen.CleanUpNullsInPluginConfigs(targetState.Content) - config, err := json.Marshal(targetState) + config, err := json.Marshal(targetState.Content) if err != nil { return fmt.Errorf("constructing kong configuration: %w", err), nil, nil } @@ -70,6 +69,10 @@ func (s UpdateStrategyInMemory) MetricsProtocol() metrics.Protocol { return metrics.ProtocolDBLess } +func (s UpdateStrategyInMemory) Type() string { + return "InMemory" +} + // shouldUseFlattenedErrors verifies whether we should pass flatten errors flag to ReloadDeclarativeRawConfig. // Kong's API library combines KVs in the request body (the config) and query string (check hash, flattened) // into a single set of parameters: https://github.com/Kong/go-kong/pull/271#issuecomment-1416212852 diff --git a/internal/dataplane/sendconfig/sendconfig.go b/internal/dataplane/sendconfig/sendconfig.go index ff3152a20d..0a8eff79ae 100644 --- a/internal/dataplane/sendconfig/sendconfig.go +++ b/internal/dataplane/sendconfig/sendconfig.go @@ -2,18 +2,20 @@ package sendconfig import ( "context" + "errors" "fmt" "time" "github.com/kong/deck/file" + "github.com/kong/go-kong/kong" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/deckgen" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/failures" "github.com/kong/kubernetes-ingress-controller/v2/internal/metrics" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) // ----------------------------------------------------------------------------- @@ -24,11 +26,22 @@ type UpdateStrategyResolver interface { ResolveUpdateStrategy(client UpdateClient) UpdateStrategy } +type AdminAPIClient interface { + AdminAPIClient() *kong.Client + LastConfigSHA() []byte + SetLastConfigSHA([]byte) + BaseRootURL() string + PluginSchemaStore() *util.PluginSchemaStore + + IsKonnect() bool + KonnectRuntimeGroup() string +} + // PerformUpdate writes `targetContent` to Kong Admin API specified by `kongConfig`. func PerformUpdate( ctx context.Context, log logrus.FieldLogger, - client *adminapi.Client, + client AdminAPIClient, config Config, targetContent *file.Content, promMetrics *metrics.CtrlFuncMetrics, @@ -58,12 +71,21 @@ func PerformUpdate( } updateStrategy := updateStrategyResolver.ResolveUpdateStrategy(client) + log = log.WithField("update_strategy", updateStrategy.Type()) timeStart := time.Now() - err, resourceErrors, resourceErrorsParseErr := updateStrategy.Update(ctx, targetContent) + err, resourceErrors, resourceErrorsParseErr := updateStrategy.Update(ctx, ContentWithHash{ + Content: targetContent, + Hash: newSHA, + }) duration := time.Since(timeStart) metricsProtocol := updateStrategy.MetricsProtocol() if err != nil { + // Not pushing metrics in case it's an update skip due to a backoff. + if errors.Is(err, ErrUpdateSkippedDueToBackoffStrategy{}) { + return nil, []failures.ResourceFailure{}, err + } + resourceFailures := resourceErrorsToResourceFailures(resourceErrors, resourceErrorsParseErr, log) promMetrics.RecordPushFailure(metricsProtocol, duration, client.BaseRootURL(), err) return nil, resourceFailures, err diff --git a/internal/dataplane/sendconfig/strategy.go b/internal/dataplane/sendconfig/strategy.go index fef45eacd7..31e9265cf1 100644 --- a/internal/dataplane/sendconfig/strategy.go +++ b/internal/dataplane/sendconfig/strategy.go @@ -8,13 +8,20 @@ import ( "github.com/kong/go-kong/kong" "github.com/sirupsen/logrus" + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/metrics" ) +// ContentWithHash encapsulates file.Content along with its precalculated hash. +type ContentWithHash struct { + Content *file.Content + Hash []byte +} + // UpdateStrategy is the way we approach updating data-plane's configuration, depending on its type. type UpdateStrategy interface { // Update applies targetConfig to the data-plane. - Update(ctx context.Context, targetContent *file.Content) ( + Update(ctx context.Context, targetContent ContentWithHash) ( err error, resourceErrors []ResourceError, resourceErrorsParseErr error, @@ -22,6 +29,9 @@ type UpdateStrategy interface { // MetricsProtocol returns a string describing the update strategy type to be used in metrics. MetricsProtocol() metrics.Protocol + + // Type returns a human-readable debug string representing the UpdateStrategy. + Type() string } type UpdateClient interface { @@ -30,6 +40,11 @@ type UpdateClient interface { AdminAPIClient() *kong.Client } +type UpdateClientWithBackoff interface { + UpdateClient + BackoffStrategy() adminapi.UpdateBackoffStrategy +} + // ResourceError is a Kong configuration error associated with a Kubernetes resource. type ResourceError struct { Name string @@ -55,9 +70,21 @@ func NewDefaultUpdateStrategyResolver(config Config, log logrus.FieldLogger) Def // ResolveUpdateStrategy returns an UpdateStrategy based on the client and configuration. // The UpdateStrategy can be either UpdateStrategyDBMode or UpdateStrategyInMemory. Both // of them implement different ways to populate Kong instances with data-plane configuration. +// If the client implements UpdateClientWithBackoff interface, its strategy will be decorated +// with the backoff strategy it provides. func (r DefaultUpdateStrategyResolver) ResolveUpdateStrategy( client UpdateClient, ) UpdateStrategy { + updateStrategy := r.resolveUpdateStrategy(client) + + if clientWithBackoff, ok := client.(UpdateClientWithBackoff); ok { + return NewUpdateStrategyWithBackoff(updateStrategy, clientWithBackoff.BackoffStrategy(), r.log) + } + + return updateStrategy +} + +func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy(client UpdateClient) UpdateStrategy { adminAPIClient := client.AdminAPIClient() // In case the client communicates with Konnect Admin API, we know it has to use DB-mode. There's no need to check diff --git a/internal/dataplane/sendconfig/strategy_test.go b/internal/dataplane/sendconfig/strategy_test.go index e4b48e436e..1837cbd03c 100644 --- a/internal/dataplane/sendconfig/strategy_test.go +++ b/internal/dataplane/sendconfig/strategy_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" ) @@ -34,34 +35,42 @@ func (c *clientMock) AdminAPIClient() *kong.Client { return &kong.Client{} } +type clientWithBackoffMock struct { + *clientMock +} + +func (c clientWithBackoffMock) BackoffStrategy() adminapi.UpdateBackoffStrategy { + return newMockBackoffStrategy(true) +} + func TestDefaultUpdateStrategyResolver_ResolveUpdateStrategy(t *testing.T) { testCases := []struct { isKonnect bool inMemory bool - expectedStrategy sendconfig.UpdateStrategy + expectedStrategyType string expectKonnectRuntimeGroupCall bool }{ { isKonnect: true, inMemory: false, - expectedStrategy: sendconfig.UpdateStrategyDBMode{}, + expectedStrategyType: "WithBackoff(DBMode)", expectKonnectRuntimeGroupCall: true, }, { isKonnect: true, inMemory: true, - expectedStrategy: sendconfig.UpdateStrategyDBMode{}, + expectedStrategyType: "WithBackoff(DBMode)", expectKonnectRuntimeGroupCall: true, }, { - isKonnect: false, - inMemory: false, - expectedStrategy: sendconfig.UpdateStrategyDBMode{}, + isKonnect: false, + inMemory: false, + expectedStrategyType: "DBMode", }, { - isKonnect: false, - inMemory: true, - expectedStrategy: sendconfig.UpdateStrategyInMemory{}, + isKonnect: false, + inMemory: true, + expectedStrategyType: "InMemory", }, } @@ -71,12 +80,19 @@ func TestDefaultUpdateStrategyResolver_ResolveUpdateStrategy(t *testing.T) { isKonnect: tc.isKonnect, } + var updateClient sendconfig.UpdateClient + if tc.isKonnect { + updateClient = &clientWithBackoffMock{client} + } else { + updateClient = client + } + resolver := sendconfig.NewDefaultUpdateStrategyResolver(sendconfig.Config{ InMemory: tc.inMemory, }, logrus.New()) - strategy := resolver.ResolveUpdateStrategy(client) - require.IsType(t, tc.expectedStrategy, strategy) + strategy := resolver.ResolveUpdateStrategy(updateClient) + require.Equal(t, tc.expectedStrategyType, strategy.Type()) assert.True(t, client.adminAPIClientWasCalled) assert.Equal(t, tc.expectKonnectRuntimeGroupCall, client.konnectRuntimeGroupWasCalled) }) diff --git a/test/e2e/konnect_test.go b/test/e2e/konnect_test.go index 08c68ca415..232e6c4c16 100644 --- a/test/e2e/konnect_test.go +++ b/test/e2e/konnect_test.go @@ -296,7 +296,7 @@ func createKonnectClientSecretAndConfigMap(ctx context.Context, t *testing.T, en } // createKonnectAdminAPIClient creates an *kong.Client that will communicate with Konnect Runtime Group's Admin API. -func createKonnectAdminAPIClient(t *testing.T, rgID, cert, key string) *adminapi.Client { +func createKonnectAdminAPIClient(t *testing.T, rgID, cert, key string) *adminapi.KonnectClient { t.Helper() c, err := adminapi.NewKongClientForKonnectRuntimeGroup(adminapi.KonnectConfig{