Skip to content

Commit

Permalink
Merge pull request kubernetes#117740 from Richabanker/uvip-impl
Browse files Browse the repository at this point in the history
 Unknown Version Interoperability Proxy Impl
  • Loading branch information
k8s-ci-robot committed Jul 19, 2023
2 parents d1d86da + c1aef65 commit 66e99b3
Show file tree
Hide file tree
Showing 29 changed files with 2,166 additions and 55 deletions.
14 changes: 14 additions & 0 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
Expand All @@ -57,6 +58,7 @@ func createAggregatorConfig(
externalInformers kubeexternalinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
Expand All @@ -76,6 +78,16 @@ func createAggregatorConfig(
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
}

if peerProxy != nil {
originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
// Add peer proxy handler to aggregator-apiserver.
// wrap the peer proxy handler first.
apiHandler = peerProxy.WrapHandler(apiHandler)
return originalHandlerChainBuilder(apiHandler, c)
}
}

// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
Expand Down Expand Up @@ -104,6 +116,8 @@ func createAggregatorConfig(
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
PeerCAFile: commandOptions.PeerCAFile,
PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand Down Expand Up @@ -258,6 +259,21 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
},
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
if err != nil {
return nil, nil, nil, err
}
}
}

clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
if err != nil {
return nil, nil, nil, err
Expand Down
57 changes: 49 additions & 8 deletions cmd/kube-apiserver/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package testing

import (
"context"
"crypto/rsa"
"crypto/x509"
"fmt"
"net"
Expand All @@ -38,12 +39,15 @@ import (
serveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
clientgotransport "k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
Expand Down Expand Up @@ -77,6 +81,14 @@ type TestServerInstanceOptions struct {
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager
// CA file used for requestheader authn during communication between:
// 1. kube-apiserver and peer when the local apiserver is not able to serve the request due
// to version skew
// 2. kube-apiserver and aggregated apiserver

// We specify this as on option to pass a common proxyCA to multiple apiservers to simulate
// an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections.
ProxyCA *ProxyCA
}

// TestServer return values supplied by kube-test-ApiServer
Expand All @@ -95,6 +107,16 @@ type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
Cleanup(func())
}

// ProxyCA contains the certificate authority certificate and key which is used to verify client connections
// to kube-apiservers. The clients can be :
// 1. aggregated apiservers
// 2. peer kube-apiservers
type ProxyCA struct {
ProxySigningCert *x509.Certificate
ProxySigningKey *rsa.PrivateKey
}

// NewDefaultTestServerOptions Default options for TestServer instances
Expand Down Expand Up @@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
reqHeaders := serveroptions.NewDelegatingAuthenticationOptions()
s.Authentication.RequestHeader = &reqHeaders.RequestHeader

// create certificates for aggregation and client-cert auth
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
var proxySigningKey *rsa.PrivateKey
var proxySigningCert *x509.Certificate

if instanceOptions.ProxyCA != nil {
// use provided proxyCA
proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey
proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert

} else {
// create certificates for aggregation and client-cert auth
proxySigningKey, err = testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}
}
proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt")
if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
Expand Down Expand Up @@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, err
}
s.Authentication.ClientCert.ClientCA = clientCACertFile
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
// TODO: set up a general clean up for testserver
if clientgotransport.DialerStopCh == wait.NeverStop {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
t.Cleanup(cancel)
clientgotransport.DialerStopCh = ctx.Done()
}
s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt")
}
}

s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device
Expand Down
53 changes: 53 additions & 0 deletions pkg/controlplane/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
openapicommon "k8s.io/kube-openapi/pkg/common"

"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/kubeapiserver"
Expand Down Expand Up @@ -193,3 +199,50 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := controlplane.DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}
25 changes: 25 additions & 0 deletions pkg/controlplane/apiserver/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/util/keyutil"
Expand Down Expand Up @@ -63,6 +64,16 @@ type Options struct {
ProxyClientCertFile string
ProxyClientKeyFile string

// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string

// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool

Expand Down Expand Up @@ -154,6 +165,20 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) {
"when it must call out during a request. This includes proxying requests to a user "+
"api-server and calling out to webhook admission plugins.")

fs.StringVar(&s.PeerCAFile, "peer-ca-file", s.PeerCAFile,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this file will be used to verify serving certificates of peer kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability.")

fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertiseIP, "peer-advertise-ip", s.PeerAdvertiseAddress.PeerAdvertiseIP,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this IP will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")

fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertisePort, "peer-advertise-port", s.PeerAdvertiseAddress.PeerAdvertisePort,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this port will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")

fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")

Expand Down
29 changes: 29 additions & 0 deletions pkg/controlplane/apiserver/options/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/pkg/api/legacyscheme"
)
Expand Down Expand Up @@ -69,6 +70,32 @@ func validateAPIPriorityAndFairness(options *Options) []error {
return nil
}

func validateUnknownVersionInteroperabilityProxyFeature() []error {
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
return nil
}
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
}
return nil
}

func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
err := []error{}
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if options.PeerCAFile != "" {
err = append(err, fmt.Errorf("--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertiseIP != "" {
err = append(err, fmt.Errorf("--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertisePort != "" {
err = append(err, fmt.Errorf("--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
}
return err
}

// Validate checks Options and return a slice of found errs.
func (s *Options) Validate() []error {
var errs []error
Expand All @@ -83,6 +110,8 @@ func (s *Options) Validate() []error {
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)

return errs
}
Loading

0 comments on commit 66e99b3

Please sign in to comment.