Skip to content

Commit

Permalink
Add impl for uvip
Browse files Browse the repository at this point in the history
  • Loading branch information
richabanker committed Jul 19, 2023
1 parent c684de5 commit cd5f3d9
Show file tree
Hide file tree
Showing 26 changed files with 1,846 additions and 47 deletions.
14 changes: 14 additions & 0 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
Expand All @@ -57,6 +58,7 @@ func createAggregatorConfig(
externalInformers kubeexternalinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
Expand All @@ -76,6 +78,16 @@ func createAggregatorConfig(
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
}

if peerProxy != nil {
originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
// Add peer proxy handler to aggregator-apiserver.
// wrap the peer proxy handler first.
apiHandler = peerProxy.WrapHandler(apiHandler)
return originalHandlerChainBuilder(apiHandler, c)
}
}

// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
Expand Down Expand Up @@ -104,6 +116,8 @@ func createAggregatorConfig(
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
PeerCAFile: commandOptions.PeerCAFile,
PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand Down Expand Up @@ -258,6 +259,21 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
},
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
if err != nil {
return nil, nil, nil, err
}
}
}

clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
if err != nil {
return nil, nil, nil, err
Expand Down
53 changes: 53 additions & 0 deletions pkg/controlplane/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
openapicommon "k8s.io/kube-openapi/pkg/common"

"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/kubeapiserver"
Expand Down Expand Up @@ -193,3 +199,50 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := controlplane.DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}
25 changes: 25 additions & 0 deletions pkg/controlplane/apiserver/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/util/keyutil"
Expand Down Expand Up @@ -63,6 +64,16 @@ type Options struct {
ProxyClientCertFile string
ProxyClientKeyFile string

// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string

// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool

Expand Down Expand Up @@ -154,6 +165,20 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) {
"when it must call out during a request. This includes proxying requests to a user "+
"api-server and calling out to webhook admission plugins.")

fs.StringVar(&s.PeerCAFile, "peer-ca-file", s.PeerCAFile,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this file will be used to verify serving certificates of peer kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability.")

fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertiseIP, "peer-advertise-ip", s.PeerAdvertiseAddress.PeerAdvertiseIP,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this IP will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")

fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertisePort, "peer-advertise-port", s.PeerAdvertiseAddress.PeerAdvertisePort,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this port will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")

fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")

Expand Down
29 changes: 29 additions & 0 deletions pkg/controlplane/apiserver/options/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/pkg/api/legacyscheme"
)
Expand Down Expand Up @@ -69,6 +70,32 @@ func validateAPIPriorityAndFairness(options *Options) []error {
return nil
}

func validateUnknownVersionInteroperabilityProxyFeature() []error {
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
return nil
}
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
}
return nil
}

func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
err := []error{}
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if options.PeerCAFile != "" {
err = append(err, fmt.Errorf("--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertiseIP != "" {
err = append(err, fmt.Errorf("--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertisePort != "" {
err = append(err, fmt.Errorf("--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
}
return err
}

// Validate checks Options and return a slice of found errs.
func (s *Options) Validate() []error {
var errs []error
Expand All @@ -83,6 +110,8 @@ func (s *Options) Validate() []error {
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)

return errs
}
82 changes: 82 additions & 0 deletions pkg/controlplane/apiserver/options/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ import (

kubeapiserveradmission "k8s.io/apiserver/pkg/admission"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/features"

peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
)

Expand Down Expand Up @@ -80,6 +85,83 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) {
}
}

func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) {
tests := []struct {
name string
featureEnabled bool
errShouldContain string
peerCAFile string
peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
}{
{
name: "feature disabled but peerCAFile set",
featureEnabled: false,
peerCAFile: "foo",
errShouldContain: "--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
{
name: "feature disabled but peerAdvertiseIP set",
featureEnabled: false,
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertiseIP: "1.2.3.4"},
errShouldContain: "--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
{
name: "feature disabled but peerAdvertisePort set",
featureEnabled: false,
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertisePort: "1"},
errShouldContain: "--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
options := &Options{
PeerCAFile: test.peerCAFile,
PeerAdvertiseAddress: test.peerAdvertiseAddress,
}
if test.featureEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.UnknownVersionInteroperabilityProxy, true)()
}
var errMessageGot string
if errs := validateUnknownVersionInteroperabilityProxyFlags(options); len(errs) > 0 {
errMessageGot = errs[0].Error()
}
if !strings.Contains(errMessageGot, test.errShouldContain) {
t.Errorf("Expected error message to contain: %q, but got: %q", test.errShouldContain, errMessageGot)
}

})
}
}

func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) {
const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled"
tests := []struct {
name string
featuresEnabled []featuregate.Feature
}{
{
name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI",
featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
for _, feature := range test.featuresEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)()
}
var errMessageGot string
if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 {
errMessageGot = errs[0].Error()
}
if !strings.Contains(errMessageGot, conflict) {
t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot)
}
})
}
}

func TestValidateOptions(t *testing.T) {
testCases := []struct {
name string
Expand Down
Loading

0 comments on commit cd5f3d9

Please sign in to comment.