Skip to content

Commit

Permalink
Use proto only if we have proto types
Browse files Browse the repository at this point in the history
We should not try to use proto if the scheme does not have the
protobuf representation in the go type; we won't be able to
deserialize it.
  • Loading branch information
justinsb committed Nov 13, 2021
1 parent 4e7f0c9 commit 602bb3c
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return nil, err
}

client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
client, err := apiutil.RESTClientForGVK(ip.Scheme, gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
Expand Down
149 changes: 132 additions & 17 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package apiutil

import (
"fmt"
"reflect"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -29,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)
Expand All @@ -39,17 +39,8 @@ var (
protobufSchemeLock sync.RWMutex
)

func init() {
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
panic(err)
}
}

// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
// be additional types that do support protobuf.
// be additional types where we do want to support protobuf.
func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
protobufSchemeLock.Lock()
defer protobufSchemeLock.Unlock()
Expand Down Expand Up @@ -117,8 +108,8 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
// baseConfig, if set, otherwise a default serializer will be set.
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
func RESTClientForGVK(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
return rest.RESTClientFor(createRestConfig(scheme, gvk, isUnstructured, baseConfig, codecs))
}

// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
Expand All @@ -135,7 +126,7 @@ func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _
}

// createRestConfig copies the base config and updates needed fields for a new rest config.
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
func createRestConfig(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
gv := gvk.GroupVersion()

cfg := rest.CopyConfig(baseConfig)
Expand All @@ -150,11 +141,9 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
}
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
if cfg.ContentType == "" && !isUnstructured {
protobufSchemeLock.RLock()
if protobufScheme.Recognizes(gvk) {
if canUseProtobuf(scheme, gvk) {
cfg.ContentType = runtime.ContentTypeProtobuf
}
protobufSchemeLock.RUnlock()
}

if cfg.NegotiatedSerializer == nil {
Expand All @@ -169,3 +158,129 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf

return cfg
}

// canUseProtobuf returns true if we should use protobuf encoding.
// We need two things: (1) the apiserver must support protobuf for the type,
// and (2) we must have a proto-compatible receiving go type.
// Because it's hard to know in general if apiserver supports proto for a given GVK,
// we maintain a list of built-in apigroups which do support proto;
// additional schemas can be added as proto-safe using AddToProtobufScheme.
// We check if we have a proto-compatible type by interface casting.
func canUseProtobuf(scheme *runtime.Scheme, gvk schema.GroupVersionKind) bool {
// Check that the client supports proto for this type
gvkType, found := scheme.AllKnownTypes()[gvk]
if !found {
// We aren't going to be able to deserialize proto without type information.
return false
}
if !implementsProto(gvkType) {
// We don't have proto information, can't parse proto
return false
}

// Check that the apiserver also supports proto for this type
serverSupportsProto := false

// Check for built-in types well-known to support proto
serverSupportsProto = isWellKnownKindThatSupportsProto(gvk)

// Check for additional types explicitly marked as supporting proto
if !serverSupportsProto {
protobufSchemeLock.RLock()
serverSupportsProto = protobufScheme.Recognizes(gvk)
protobufSchemeLock.RUnlock()
}

return serverSupportsProto
}

// isWellKnownKindThatSupportsProto returns true if the gvk is a well-known Kind that supports proto encoding.
func isWellKnownKindThatSupportsProto(gvk schema.GroupVersionKind) bool {
// corev1
if gvk.Group == "" && gvk.Version == "v1" {
return true
}

// extensions v1beta1
if gvk.Group == "extensions" && gvk.Version == "v1beta1" {
return true
}

// Generated with `kubectl api-resources -oname | grep "\." | sort | cut -f2- -d. | sort | uniq | awk '{print "case \"" $0 "\": return true" }'` (before adding any CRDs)
switch gvk.Group {
case "admissionregistration.k8s.io":
return true
case "apiextensions.k8s.io":
return true
case "apiregistration.k8s.io":
return true
case "apps":
return true
case "authentication.k8s.io":
return true
case "authorization.k8s.io":
return true
case "autoscaling":
return true
case "batch":
return true
case "certificates.k8s.io":
return true
case "coordination.k8s.io":
return true
case "discovery.k8s.io":
return true
case "events.k8s.io":
return true
case "flowcontrol.apiserver.k8s.io":
return true
case "networking.k8s.io":
return true
case "node.k8s.io":
return true
case "policy":
return true
case "rbac.authorization.k8s.io":
return true
case "scheduling.k8s.io":
return true
case "storage.k8s.io":
return true
}
return false
}

var (
memoizeImplementsProto = make(map[reflect.Type]bool)
memoizeImplementsProtoLock sync.RWMutex
)

// protoMessage is implemented by protobuf messages (of all libraries).
type protoMessage interface {
ProtoMessage()
}

var protoMessageType = reflect.TypeOf(new(protoMessage)).Elem()

// implementsProto returns true if the local go type supports protobuf deserialization.
func implementsProto(t reflect.Type) bool {
memoizeImplementsProtoLock.RLock()
result, found := memoizeImplementsProto[t]
memoizeImplementsProtoLock.RUnlock()

if found {
return result
}

// We normally get the raw struct e.g. v1.Pod, not &v1.Pod
if t.Kind() == reflect.Struct {
return implementsProto(reflect.PtrTo(t))
}

result = t.Implements(protoMessageType)
memoizeImplementsProtoLock.Lock()
memoizeImplementsProto[t] = result
memoizeImplementsProtoLock.Unlock()

return result
}
138 changes: 138 additions & 0 deletions pkg/client/apiutil/apimachinery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package apiutil contains utilities for working with raw Kubernetes
// API machinery, such as creating RESTMappers and raw REST clients,
// and extracting the GVK of an object.
package apiutil

import (
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)

var exampleSchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}

type ExampleCRD struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}

func (e *ExampleCRD) DeepCopyObject() runtime.Object {
panic("not implemented")
}

func TestCanUseProtobuf(t *testing.T) {
exampleCRDScheme := runtime.NewScheme()

builder := &scheme.Builder{GroupVersion: exampleSchemeGroupVersion}
builder.Register(&ExampleCRD{})
if err := builder.AddToScheme(exampleCRDScheme); err != nil {
t.Fatalf("AddToScheme failed: %v", err)
}

schemes := map[string]*runtime.Scheme{
"kubernetes": kubernetesscheme.Scheme,
"empty": runtime.NewScheme(),
"example.com": exampleCRDScheme,
}
grid := []struct {
scheme string
gvk schema.GroupVersionKind
wantType string
wantCanUseProtobuf bool
}{
{
scheme: "kubernetes",
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
wantType: "v1.Pod",
wantCanUseProtobuf: true,
},
{
scheme: "kubernetes",
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Pod"},
wantType: "",
wantCanUseProtobuf: false,
},
{
scheme: "empty",
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
wantType: "",
wantCanUseProtobuf: false,
},
{
scheme: "example.com",
gvk: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"},
wantType: "",
wantCanUseProtobuf: false,
},
{
scheme: "example.com",
gvk: schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ExampleCRD"},
wantType: "apiutil.ExampleCRD",
wantCanUseProtobuf: false,
},
}

for _, g := range grid {
t.Run(fmt.Sprintf("%#v", g), func(t *testing.T) {
scheme := schemes[g.scheme]
if scheme == nil {
t.Errorf("scheme %q not found", g.scheme)
}

gotType := ""
if t := scheme.AllKnownTypes()[g.gvk]; t != nil {
gotType = t.String()
}
if gotType != g.wantType {
t.Errorf("unexpected type got %v, want %v", gotType, g.wantType)

}
gotCanUseProtobuf := canUseProtobuf(scheme, g.gvk)
if gotCanUseProtobuf != g.wantCanUseProtobuf {
t.Errorf("canUseProtobuf(%#v, %#v) got %v, want %v", g.scheme, g.gvk, gotCanUseProtobuf, g.wantCanUseProtobuf)
}
})
}
}

func TestCanUseProtobufForAllBuiltins(t *testing.T) {
emptyScheme := runtime.NewScheme()

allKnownTypes := kubernetesscheme.Scheme.AllKnownTypes()
for gvk := range allKnownTypes {
// Ignore internal bookkeeping types
if gvk.Version == "__internal" || gvk.Group == "internal.apiserver.k8s.io" {
continue
}

if !canUseProtobuf(kubernetesscheme.Scheme, gvk) {
t.Errorf("canUseProtobuf for built-in GVK %#v returned false, expected built-ins to support proto", gvk)
}

// If we don't have the type in the scheme, double check we don't try to use proto.
if canUseProtobuf(emptyScheme, gvk) {
t.Errorf("canUseProtobuf for built-in GVK %#v returned true with empty scheme, but empty scheme cannot support proto", gvk)
}
}
}
2 changes: 1 addition & 1 deletion pkg/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstruc
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
client, err := apiutil.RESTClientForGVK(c.scheme, gvk, isUnstructured, c.config, c.codecs)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 602bb3c

Please sign in to comment.