Skip to content

Commit

Permalink
Add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
richabanker committed Jul 19, 2023
1 parent cd5f3d9 commit c1aef65
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 8 deletions.
57 changes: 49 additions & 8 deletions cmd/kube-apiserver/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package testing

import (
"context"
"crypto/rsa"
"crypto/x509"
"fmt"
"net"
Expand All @@ -38,12 +39,15 @@ import (
serveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
clientgotransport "k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
Expand Down Expand Up @@ -77,6 +81,14 @@ type TestServerInstanceOptions struct {
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager
// CA file used for requestheader authn during communication between:
// 1. kube-apiserver and peer when the local apiserver is not able to serve the request due
// to version skew
// 2. kube-apiserver and aggregated apiserver

// We specify this as on option to pass a common proxyCA to multiple apiservers to simulate
// an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections.
ProxyCA *ProxyCA
}

// TestServer return values supplied by kube-test-ApiServer
Expand All @@ -95,6 +107,16 @@ type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
Cleanup(func())
}

// ProxyCA contains the certificate authority certificate and key which is used to verify client connections
// to kube-apiservers. The clients can be :
// 1. aggregated apiservers
// 2. peer kube-apiservers
type ProxyCA struct {
ProxySigningCert *x509.Certificate
ProxySigningKey *rsa.PrivateKey
}

// NewDefaultTestServerOptions Default options for TestServer instances
Expand Down Expand Up @@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
reqHeaders := serveroptions.NewDelegatingAuthenticationOptions()
s.Authentication.RequestHeader = &reqHeaders.RequestHeader

// create certificates for aggregation and client-cert auth
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
var proxySigningKey *rsa.PrivateKey
var proxySigningCert *x509.Certificate

if instanceOptions.ProxyCA != nil {
// use provided proxyCA
proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey
proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert

} else {
// create certificates for aggregation and client-cert auth
proxySigningKey, err = testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}
}
proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt")
if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
Expand Down Expand Up @@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, err
}
s.Authentication.ClientCert.ClientCA = clientCACertFile
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
// TODO: set up a general clean up for testserver
if clientgotransport.DialerStopCh == wait.NeverStop {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
t.Cleanup(cancel)
clientgotransport.DialerStopCh = ctx.Done()
}
s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt")
}
}

s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device
Expand Down
27 changes: 27 additions & 0 deletions test/integration/apiserver/peerproxy/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package peerproxy

import (
"testing"

"k8s.io/kubernetes/test/integration/framework"
)

func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}
244 changes: 244 additions & 0 deletions test/integration/apiserver/peerproxy/peer_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package peerproxy

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
kubefeatures "k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
)

func TestPeerProxiedRequest(t *testing.T) {

ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)

// ensure to stop cert reloading after shutdown
transport.DialerStopCh = ctx.Done()

// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()

// create sharedetcd
etcd := framework.SharedEtcd()

// create certificates for aggregation and client-cert auth
proxyCA, err := createProxyCertContent()
require.NoError(t, err)

// start test server with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-a")
serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
EnableCertAuth: true,
ProxyCA: &proxyCA},
[]string{}, etcd)
defer serverA.TearDownFn()

// start another test server with some api disabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-b")
serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
EnableCertAuth: true,
ProxyCA: &proxyCA},
[]string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd)
defer serverB.TearDownFn()

kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
require.NoError(t, err)

kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
require.NoError(t, err)

// create jobs resource using serverA
job := createJobResource()
_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)

klog.Infof("\nServerA has created jobs\n")

// List jobs using ServerB
// This request should be proxied to ServerA since ServerB does not have batch API enabled
jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
require.NoError(t, err)
assert.NotEmpty(t, jobsB)
assert.Equal(t, job.Name, jobsB.Items[0].Name)
}

func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {

ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)

// ensure to stop cert reloading after shutdown
transport.DialerStopCh = ctx.Done()

// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()

// create sharedetcd
etcd := framework.SharedEtcd()

// create certificates for aggregation and client-cert auth
proxyCA, err := createProxyCertContent()
require.NoError(t, err)

// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated
// once it is shutdown
controlplane.IdentityLeaseDurationSeconds = 10
controlplane.IdentityLeaseGCPeriod = time.Second
controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second

// start serverA with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-a")
serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second)
setupStorageVersionGC(ctx, kubeClientSetA, informersA)
// reset lease duration to default value for serverB and serverC since we will not be
// shutting these down
controlplane.IdentityLeaseDurationSeconds = 3600

// start serverB with some api disabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-b")
serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{
fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd)
defer serverB.TearDownFn()
kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second)
setupStorageVersionGC(ctx, kubeClientSetB, informersB)

// start serverC with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-c")
serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
defer serverC.TearDownFn()

// create jobs resource using serverA
job := createJobResource()
_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)
klog.Infof("\nServerA has created jobs\n")

// shutdown serverA
serverA.TearDownFn()

var jobsB *v1.JobList
// list jobs using ServerB which it should proxy to ServerC and get back valid response
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, nil
}
if jobsB != nil {
return true, nil
}
return false, nil
})
klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
require.NoError(t, err)
assert.NotEmpty(t, jobsB)
assert.Equal(t, job.Name, jobsB.Items[0].Name)
}

func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) {
leaseInformer := informers.Coordination().V1().Leases()
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
go leaseInformer.Informer().Run(ctx.Done())
go storageVersionInformer.Informer().Run(ctx.Done())

controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer)
go controller.Run(ctx)
}

func createProxyCertContent() (kastesting.ProxyCA, error) {
result := kastesting.ProxyCA{}
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}

result = kastesting.ProxyCA{
ProxySigningCert: proxySigningCert,
ProxySigningKey: proxySigningKey,
}
return result, nil
}

func createJobResource() *v1.Job {
return &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: v1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
Image: "test",
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}
}

0 comments on commit c1aef65

Please sign in to comment.