Skip to content

Commit

Permalink
Add quicker retry to get kube cluster details if the cluster was offl…
Browse files Browse the repository at this point in the history
…ine (#43903)

* Add quicker retry to get kube cluster details if the cluster goes offline.

* Add kubedetails close call on error.

Although this error will never actually happen in practice.
  • Loading branch information
AntonAM committed Jul 9, 2024
1 parent fade3d2 commit a8630b2
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 17 deletions.
37 changes: 34 additions & 3 deletions lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/tools/clientcmd"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/cloud/azure"
"github.com/gravitational/teleport/lib/cloud/gcp"
Expand Down Expand Up @@ -96,6 +97,9 @@ type clusterDetailsConfig struct {
component KubeServiceType
}

const defaultRefreshPeriod = 5 * time.Minute
const backoffRefreshStep = 10 * time.Second

// newClusterDetails creates a proxied kubeDetails structure given a dynamic cluster.
func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDetails, err error) {
creds := cfg.kubeCreds
Expand Down Expand Up @@ -144,23 +148,50 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
gvkSupportedResources: gvkSupportedRes,
}

// If cluster is online and there's no errors, we refresh details seldom (every 5 minutes),
// but if the cluster is offline, we try to refresh details more often to catch it getting back online earlier.
firstPeriod := defaultRefreshPeriod
if isClusterOffline {
firstPeriod = backoffRefreshStep
}
refreshDelay, err := retryutils.NewLinear(retryutils.LinearConfig{
First: firstPeriod,
Step: backoffRefreshStep,
Max: defaultRefreshPeriod,
Jitter: retryutils.NewSeventhJitter(),
Clock: cfg.clock,
})
if err != nil {
k.Close()
return nil, trace.Wrap(err)
}

k.wg.Add(1)
// Start the periodic update of the codec factory and the list of supported types for RBAC.
go func() {
defer k.wg.Done()
ticker := cfg.clock.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.Chan():
case <-refreshDelay.After():
codecFactory, rbacSupportedTypes, gvkSupportedResources, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
// If this is first time we get an error, we reset retry mechanism so it will start trying to refresh details quicker, with linear backoff.
if refreshDelay.First == defaultRefreshPeriod {
refreshDelay.First = backoffRefreshStep
refreshDelay.Reset()
} else {
refreshDelay.Inc()
}
cfg.log.WithError(err).Error("Failed to update cluster schema")
continue
}


Check failure on line 192 in lib/kube/proxy/cluster_details.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/gravitational/teleport) --custom-order (gci)

Check failure on line 192 in lib/kube/proxy/cluster_details.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

File is not `goimports`-ed (goimports)
// Restore details refresh delay to the default value, in case previously cluster was offline.
refreshDelay.First = defaultRefreshPeriod
k.rwMu.Lock()
k.kubeCodecs = codecFactory
k.rbacSupportedTypes = rbacSupportedTypes
Expand Down
148 changes: 148 additions & 0 deletions lib/kube/proxy/cluster_details_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package proxy

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"

"github.com/gravitational/teleport/api/types"
)

func TestNewClusterDetails(t *testing.T) {
t.Parallel()
ctx := context.Background()
log := logrus.New().WithContext(ctx)

getClusterDetailsConfig := func(c clockwork.FakeClock) (clusterDetailsConfig, *clusterDetailsClientSet) {
client := &clusterDetailsClientSet{}
return clusterDetailsConfig{
kubeCreds: &staticKubeCreds{
kubeClient: client,
},
cluster: &types.KubernetesClusterV3{},
log: log,
clock: c,
}, client
}

t.Run("normal operation", func(t *testing.T) {
clock := clockwork.NewFakeClock()
config, client := getClusterDetailsConfig(clock)
details, err := newClusterDetails(ctx, config)

require.NoError(t, err)
require.NotNil(t, details)
require.Equal(t, 1, client.GetCalledTimes())

clock.BlockUntil(1)

// Advancing by short period doesn't cause another details refresh, since in normal state refresh interval
// is long.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)
require.Equal(t, 1, client.GetCalledTimes())

// Advancing by the default interval period causes another details refresh.
clock.Advance(defaultRefreshPeriod + time.Second)
clock.BlockUntil(1)
require.Equal(t, 2, client.GetCalledTimes())
})

t.Run("first time has failed, second time it's restored", func(t *testing.T) {
clock := clockwork.NewFakeClock()
config, client := getClusterDetailsConfig(clock)
client.discoveryErr = errors.New("error")
details, err := newClusterDetails(ctx, config)

require.NoError(t, err)
require.NotNil(t, details)
require.True(t, details.isClusterOffline)
require.Equal(t, 1, client.GetCalledTimes())

clock.BlockUntil(1)

client.discoveryErr = nil

// Advancing by short interval causes details refresh because we're in a bad state, and trying to
// refresh details more often.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)

require.Equal(t, 2, client.GetCalledTimes())
require.False(t, details.isClusterOffline)

// After we've restored normal state advancing by short interval doesn't cause details refresh.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)
require.Equal(t, 2, client.GetCalledTimes())

// Advancing by the default interval period causes another details refresh.
clock.Advance(defaultRefreshPeriod + time.Second)
clock.BlockUntil(1)
require.Equal(t, 3, client.GetCalledTimes())
})

}

type clusterDetailsClientSet struct {
kubernetes.Interface
discovery.DiscoveryInterface

discoveryErr error
calledTimes atomic.Int32
}

func (c *clusterDetailsClientSet) Discovery() discovery.DiscoveryInterface {
return c
}

func (c *clusterDetailsClientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
c.calledTimes.Add(1)
if c.discoveryErr != nil {
return nil, nil, c.discoveryErr
}

return nil, []*metav1.APIResourceList{
&fakeAPIResource,
}, nil
}

func (c *clusterDetailsClientSet) ServerVersion() (*version.Info, error) {
return &version.Info{
Major: "1",
Minor: "29",
GitVersion: "v1.29.0",
}, nil
}

func (c *clusterDetailsClientSet) GetCalledTimes() int {
return int(c.calledTimes.Load())
}
8 changes: 4 additions & 4 deletions lib/kube/proxy/kube_creds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type kubeCreds interface {
getTransportConfig() *transport.Config
getTargetAddr() string
getKubeRestConfig() *rest.Config
getKubeClient() *kubernetes.Clientset
getKubeClient() kubernetes.Interface
getTransport() http.RoundTripper
wrapTransport(http.RoundTripper) (http.RoundTripper, error)
close() error
Expand All @@ -63,7 +63,7 @@ type staticKubeCreds struct {
transportConfig *transport.Config
// targetAddr is a kubernetes API address.
targetAddr string
kubeClient *kubernetes.Clientset
kubeClient kubernetes.Interface
// clientRestCfg is the Kubernetes Rest config for the cluster.
clientRestCfg *rest.Config
transport http.RoundTripper
Expand All @@ -85,7 +85,7 @@ func (s *staticKubeCreds) getTargetAddr() string {
return s.targetAddr
}

func (s *staticKubeCreds) getKubeClient() *kubernetes.Clientset {
func (s *staticKubeCreds) getKubeClient() kubernetes.Interface {
return s.kubeClient
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (d *dynamicKubeCreds) getTargetAddr() string {
return d.staticCreds.targetAddr
}

func (d *dynamicKubeCreds) getKubeClient() *kubernetes.Clientset {
func (d *dynamicKubeCreds) getKubeClient() kubernetes.Interface {
d.RLock()
defer d.RUnlock()
return d.staticCreds.kubeClient
Expand Down
22 changes: 12 additions & 10 deletions lib/kube/proxy/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ func (c *clientSet) Discovery() discovery.DiscoveryInterface {
return c
}

func (c *clientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return nil, []*metav1.APIResourceList{
var fakeAPIResource = metav1.APIResourceList{
GroupVersion: "extensions/v1beta1",
APIResources: []metav1.APIResource{
{
GroupVersion: "extensions/v1beta1",
APIResources: []metav1.APIResource{
{
Name: "ingresses",
Kind: "Ingress",
Namespaced: true,
},
},
Name: "ingresses",
Kind: "Ingress",
Namespaced: true,
},
},
}

func (c *clientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return nil, []*metav1.APIResourceList{
&fakeAPIResource,
}, nil
}

0 comments on commit a8630b2

Please sign in to comment.