Skip to content

Commit

Permalink
Merge pull request #101726 from ankeesler/exec-plugin-integration-test
Browse files Browse the repository at this point in the history
test/integration/client: add TestExecPluginRotationViaInformer
  • Loading branch information
k8s-ci-robot authored May 18, 2021
2 parents 2d58b72 + a14cd8e commit 5c58620
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 50 deletions.
261 changes: 211 additions & 50 deletions test/integration/client/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,9 +41,11 @@ import (
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"

"k8s.io/client-go/util/connrotation"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
Expand All @@ -50,7 +54,8 @@ import (

// These constants are used to communicate behavior to the testdata/exec-plugin.sh test fixture.
const (
outputEnvVar = "EXEC_PLUGIN_OUTPUT"
outputEnvVar = "EXEC_PLUGIN_OUTPUT"
outputFileEnvVar = "EXEC_PLUGIN_OUTPUT_FILE"
)

type roundTripperFunc func(*http.Request) (*http.Response, error)
Expand Down Expand Up @@ -412,32 +417,53 @@ type informerSpy struct {
deletes []interface{}
}

func (es *informerSpy) OnAdd(obj interface{}) {
es.mu.Lock()
defer es.mu.Unlock()
es.adds = append(es.adds, obj)
func (is *informerSpy) OnAdd(obj interface{}) {
is.mu.Lock()
defer is.mu.Unlock()
is.adds = append(is.adds, obj)
}

func (es *informerSpy) OnUpdate(old, new interface{}) {
es.mu.Lock()
defer es.mu.Unlock()
es.updates = append(es.updates, oldNew{old: old, new: new})
func (is *informerSpy) OnUpdate(old, new interface{}) {
is.mu.Lock()
defer is.mu.Unlock()
is.updates = append(is.updates, oldNew{old: old, new: new})
}

func (es *informerSpy) OnDelete(obj interface{}) {
es.mu.Lock()
defer es.mu.Unlock()
es.deletes = append(es.deletes, obj)
func (is *informerSpy) OnDelete(obj interface{}) {
is.mu.Lock()
defer is.mu.Unlock()
is.deletes = append(is.deletes, obj)
}

// waitForEvents waits for adds, updates, and deletes to be filled with at least one event.
func (es *informerSpy) waitForEvents(t *testing.T) {
if err := wait.PollImmediate(time.Millisecond*250, time.Second*20, func() (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()
return len(es.adds) > 0 && len(es.updates) > 0 && len(es.deletes) > 0, nil
}); err != nil {
t.Fatalf("failed to wait for events: %v", err)
func (is *informerSpy) clear() {
is.mu.Lock()
defer is.mu.Unlock()
is.adds = []interface{}{}
is.updates = []oldNew{}
is.deletes = []interface{}{}
}

// waitForEvents waits for adds, updates, and deletes to be populated with at least one event.
func (is *informerSpy) waitForEvents(t *testing.T, wantEvents bool) {
t.Helper()

err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) {
is.mu.Lock()
defer is.mu.Unlock()
return len(is.adds) > 0 && len(is.updates) > 0 && len(is.deletes) > 0, nil
})
if wantEvents {
if err != nil {
t.Fatalf("wanted events, but got error: %v", err)
}
} else {
if !errors.Is(err, wait.ErrWaitTimeout) {
if err != nil {
t.Fatalf("wanted no events, but got error: %v", err)
} else {
t.Fatalf("wanted no events, but got some: %s", spew.Sprintf("%#v", is))
}
}
}
}

Expand Down Expand Up @@ -504,26 +530,128 @@ func TestExecPluginViaInformer(t *testing.T) {
if test.clientConfigFunc != nil {
test.clientConfigFunc(clientConfig)
}
client := clientset.NewForConfigOrDie(clientConfig)

informerSpy := startConfigMapInformer(ctx, t, client, ns.Name)
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, client.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t)

// Validate that the informer was called correctly.
if diff := cmp.Diff([]interface{}{createdCM}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" {
t.Errorf("unexpected add event(s), -want, +got:\n%s", diff)
}
if diff := cmp.Diff([]oldNew{{createdCM, updatedCM}}, informerSpy.updates, oldNewComparer); diff != "" {
t.Errorf("unexpected update event(s), -want, +got:\n%s", diff)
}
if diff := cmp.Diff([]interface{}{deletedCM}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" {
t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff)
}
informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name)
waitForInformerSync(ctx, t, informer, true, "")
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t, true)
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)
})
}
}

type execPlugin struct {
t *testing.T
outputFile *os.File
}

func newExecPlugin(t *testing.T) *execPlugin {
t.Helper()
outputFile, err := ioutil.TempFile("", "kubernetes-client-exec-test-plugin-output-file-*")
if err != nil {
t.Fatal(err)
}
return &execPlugin{t: t, outputFile: outputFile}
}

func (e *execPlugin) config() *clientcmdapi.ExecConfig {
return &clientcmdapi.ExecConfig{
Command: "testdata/exec-plugin.sh",
// TODO(ankeesler): move to v1 once exec plugins go GA.
APIVersion: "client.authentication.k8s.io/v1beta1",
Env: []clientcmdapi.ExecEnvVar{
{
Name: outputFileEnvVar,
Value: e.outputFile.Name(),
},
},
}
}

func (e *execPlugin) rotateToken(newToken string, lifetime time.Duration) {
e.t.Helper()

expirationTimestamp := metav1.NewTime(time.Now().Add(lifetime)).Format(time.RFC3339Nano)
newOutput := fmt.Sprintf(`{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1beta1",
"status": {
"expirationTimestamp": %q,
"token": %q
}
}`, expirationTimestamp, newToken)
if err := os.WriteFile(e.outputFile.Name(), []byte(newOutput), 0644); err != nil {
e.t.Fatal(err)
}
}

func TestExecPluginRotationViaInformer(t *testing.T) {
t.Parallel()

result, clientAuthorizedToken, _, _ := startTestServer(t)
const clientUnauthorizedToken = "invalid-token"
const tokenLifetime = time.Second * 5

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

adminClient := clientset.NewForConfigOrDie(result.ClientConfig)
ns := createNamespace(ctx, t, adminClient)

clientDialer := connrotation.NewDialer((&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext)

execPlugin := newExecPlugin(t)

clientConfig := rest.AnonymousClientConfig(result.ClientConfig)
clientConfig.ExecProvider = execPlugin.config()
clientConfig.Dial = clientDialer.DialContext
clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
// This makes it helpful to see what is happening with the informer's client.
return transport.NewDebuggingRoundTripper(rt, transport.DebugCurlCommand, transport.DebugURLTiming)
})

// Initialize informer spy wth invalid token.
// Make sure informer never syncs because it can't authenticate.
execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime)
informer, informerSpy := startConfigMapInformer(ctx, t, clientset.NewForConfigOrDie(clientConfig), ns.Name)
waitForInformerSync(ctx, t, informer, false, "")
createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t, false)

// Rotate token to valid token.
// Make sure informer sees events because it now has a valid token with which it can authenticate.
execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime)
waitForInformerSync(ctx, t, informer, true, "")
informerSpy.clear()
createdCM, updatedCM, deletedCM := createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t, true)
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)

// Rotate token to something invalid and clip watch connection.
// Informer should recreate connection with invalid token.
// Make sure informer does not see events since it is using the invalid token.
execPlugin.rotateToken(clientUnauthorizedToken, tokenLifetime)
time.Sleep(tokenLifetime) // wait for old token to expire to make sure the watch is restarted with clientUnauthorizedToken
clientDialer.CloseAll()
waitForInformerSync(ctx, t, informer, true, "")
informerSpy.clear()
createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t, false)

// Rotate token to valid token.
// Make sure informer sees events because it now has a valid token with which it can authenticate.
lastSyncResourceVersion := informer.LastSyncResourceVersion()
execPlugin.rotateToken(clientAuthorizedToken, tokenLifetime)
waitForInformerSync(ctx, t, informer, true, lastSyncResourceVersion)
informerSpy.clear()
createdCM, updatedCM, deletedCM = createUpdateDeleteConfigMap(ctx, t, adminClient.CoreV1().ConfigMaps(ns.Name))
informerSpy.waitForEvents(t, true)
assertInformerEvents(t, informerSpy, createdCM, updatedCM, deletedCM)
}

func startTestServer(t *testing.T) (result *kubeapiservertesting.TestServer, clientAuthorizedToken string, clientCertFileName string, clientKeyFileName string) {
certDir, err := ioutil.TempDir("", "kubernetes-client-exec-test-cert-dir-*")
if err != nil {
Expand Down Expand Up @@ -634,22 +762,39 @@ func createNamespace(ctx context.Context, t *testing.T, client clientset.Interfa
return ns
}

func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) *informerSpy {
func startConfigMapInformer(ctx context.Context, t *testing.T, client clientset.Interface, namespace string) (cache.SharedIndexInformer, *informerSpy) {
t.Helper()

var informerSpy informerSpy
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(namespace))
informerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(&informerSpy)
informerFactory.Start(ctx.Done())
synced := informerFactory.WaitForCacheSync(ctx.Done())
if len(synced) != 1 {
t.Fatalf("expected only 1 synced type, got %v", synced)
cmInformer := informerFactory.Core().V1().ConfigMaps().Informer()
cmInformer.AddEventHandler(&informerSpy)
if err := cmInformer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// t.Logf("watch error handler: failure in reflector %#v: %v", r, err) // Uncomment for more verbose logging
}); err != nil {
t.Fatalf("could not set watch error handler: %v", err)
}
if cmSynced, ok := synced[reflect.TypeOf(&corev1.ConfigMap{})]; !(cmSynced && ok) {
t.Fatalf("expected ConfigMaps to be synced, got %v", synced)
informerFactory.Start(ctx.Done())

return cmInformer, &informerSpy
}

func waitForInformerSync(ctx context.Context, t *testing.T, informer cache.SharedIndexInformer, wantSynced bool, lastSyncResourceVersion string) {
t.Helper()

syncCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if gotSynced := cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced); wantSynced != gotSynced {
t.Fatalf("wanted sync %t, got sync %t", wantSynced, gotSynced)
}

return &informerSpy
if len(lastSyncResourceVersion) != 0 {
if err := wait.PollImmediate(time.Second, time.Second*30, func() (bool, error) {
return informer.LastSyncResourceVersion() != lastSyncResourceVersion, nil
}); err != nil {
t.Fatalf("informer never changed resource versions from %q: %v", lastSyncResourceVersion, err)
}
}
}

func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.ConfigMapInterface) (created, updated, deleted *corev1.ConfigMap) {
Expand All @@ -658,21 +803,37 @@ func createUpdateDeleteConfigMap(ctx context.Context, t *testing.T, cms v1.Confi
var err error
created, err = cms.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm"}}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
t.Fatal("could not create ConfigMap:", err)
}

updated = created.DeepCopy()
updated.Annotations = map[string]string{"tuna": "fish"}
updated, err = cms.Update(ctx, updated, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
t.Fatal("could not update ConfigMap:", err)
}

if err := cms.Delete(ctx, updated.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
t.Fatal("could not delete ConfigMap:", err)
}

deleted = updated.DeepCopy()

return created, updated, deleted
}

func assertInformerEvents(t *testing.T, informerSpy *informerSpy, created, updated, deleted interface{}) {
t.Helper()

// Validate that the informer was called correctly.
if diff := cmp.Diff([]interface{}{created}, informerSpy.adds, objectMetaSansResourceVersionComparer); diff != "" {
t.Errorf("unexpected add event(s), -want, +got:\n%s", diff)
}
if diff := cmp.Diff([]oldNew{{created, updated}}, informerSpy.updates, oldNewComparer); diff != "" {
t.Errorf("unexpected update event(s), -want, +got:\n%s", diff)
}
if diff := cmp.Diff([]interface{}{deleted}, informerSpy.deletes, objectMetaSansResourceVersionComparer); diff != "" {
t.Errorf("unexpected deleted event(s), -want, +got:\n%s", diff)
}

}
5 changes: 5 additions & 0 deletions test/integration/client/testdata/exec-plugin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ set -o errexit
set -o nounset
set -o pipefail

if [[ -n "${EXEC_PLUGIN_OUTPUT_FILE-""}" ]]; then
cat "$EXEC_PLUGIN_OUTPUT_FILE"
exit
fi

echo "$EXEC_PLUGIN_OUTPUT"

0 comments on commit 5c58620

Please sign in to comment.