Skip to content

Commit

Permalink
WIP: Use proto if we have proto types
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Aug 20, 2021
1 parent 4e7f0c9 commit 8067a1d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return nil, err
}

client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
client, err := apiutil.RESTClientForGVK(ip.Scheme, gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
Expand Down
75 changes: 46 additions & 29 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,39 @@ package apiutil

import (
"fmt"
"sync"
"reflect"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)

var (
protobufScheme = runtime.NewScheme()
protobufSchemeLock sync.RWMutex
)

func init() {
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
panic(err)
}
}

// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
// be additional types that do support protobuf.
func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
protobufSchemeLock.Lock()
defer protobufSchemeLock.Unlock()
return addToScheme(protobufScheme)
}
// var (
// protobufScheme = runtime.NewScheme()
// protobufSchemeLock sync.RWMutex
// )

// func init() {
// // Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
// // For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
// // See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
// if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
// panic(err)
// }
// }

// // AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
// // be additional types that do support protobuf.
// func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
// protobufSchemeLock.Lock()
// defer protobufSchemeLock.Unlock()
// return addToScheme(protobufScheme)
// }

// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
Expand Down Expand Up @@ -117,8 +116,8 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
// baseConfig, if set, otherwise a default serializer will be set.
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
func RESTClientForGVK(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
return rest.RESTClientFor(createRestConfig(scheme, gvk, isUnstructured, baseConfig, codecs))
}

// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
Expand All @@ -135,7 +134,7 @@ func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _
}

// createRestConfig copies the base config and updates needed fields for a new rest config.
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
func createRestConfig(scheme *runtime.Scheme, gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
gv := gvk.GroupVersion()

cfg := rest.CopyConfig(baseConfig)
Expand All @@ -150,11 +149,9 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
}
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
if cfg.ContentType == "" && !isUnstructured {
protobufSchemeLock.RLock()
if protobufScheme.Recognizes(gvk) {
if haveProtoDefinition(scheme, gvk) {
cfg.ContentType = runtime.ContentTypeProtobuf
}
protobufSchemeLock.RUnlock()
}

if cfg.NegotiatedSerializer == nil {
Expand All @@ -169,3 +166,23 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf

return cfg
}

// protoMessage is implemented by protobuf messages (of all libraries)
type protoMessage interface {
ProtoMessage()
}

var protoMessageType = reflect.TypeOf(new(protoMessage)).Elem()

// haveProtoDefinition returns true if the go type for the specified gvk support protobuf encoding.
func haveProtoDefinition(scheme *runtime.Scheme, gvk schema.GroupVersionKind) bool {
gvkType, found := scheme.AllKnownTypes()[gvk]
if !found {
return false
}
// We normally get the raw struct e.g. v1.Pod, not &v1.Pod
if gvkType.Kind() == reflect.Struct {
gvkType = reflect.PtrTo(gvkType)
}
return gvkType.Implements(protoMessageType)
}
2 changes: 1 addition & 1 deletion pkg/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstruc
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
client, err := apiutil.RESTClientForGVK(c.scheme, gvk, isUnstructured, c.config, c.codecs)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8067a1d

Please sign in to comment.