Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ Use application/vnd.kubernetes.protobuf as content-type if possible #1149

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return nil, err
}

client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
Expand Down
38 changes: 35 additions & 3 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,41 @@ package apiutil

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)

var (
protobufScheme = runtime.NewScheme()
protobufSchemeLock sync.RWMutex
)

func init() {
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
panic(err)
}
}

// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
// be additional types that do support protobuf.
func AddToProtobufScheme(builder runtime.SchemeBuilder) error {
protobufSchemeLock.Lock()
defer protobufSchemeLock.Unlock()
return builder.AddToScheme(protobufScheme)
}

// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
func NewDiscoveryRESTMapper(c *rest.Config) (meta.RESTMapper, error) {
Expand Down Expand Up @@ -93,16 +117,16 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
// baseConfig, if set, otherwise a default serializer will be set.
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
cfg := createRestConfig(gvk, baseConfig)
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
cfg := createRestConfig(gvk, isUnstructured, baseConfig)
if cfg.NegotiatedSerializer == nil {
cfg.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: codecs}
}
return rest.RESTClientFor(cfg)
}

//createRestConfig copies the base config and updates needed fields for a new rest config
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config) *rest.Config {
gv := gvk.GroupVersion()

cfg := rest.CopyConfig(baseConfig)
Expand All @@ -115,5 +139,13 @@ func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *res
if cfg.UserAgent == "" {
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
}
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
if cfg.ContentType == "" && !isUnstructured {
protobufSchemeLock.RLock()
if protobufScheme.Recognizes(gvk) {
cfg.ContentType = runtime.ContentTypeProtobuf
}
protobufSchemeLock.RUnlock()
}
return cfg
}
12 changes: 7 additions & 5 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ func New(config *rest.Config, options Options) (Client, error) {
}

clientcache := &clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),

structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
}

rawMetaClient, err := metadata.NewForConfig(config)
Expand Down
27 changes: 19 additions & 8 deletions pkg/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand All @@ -43,20 +44,22 @@ type clientCache struct {
// codecs are used to create a REST client for a gvk
codecs serializer.CodecFactory

// resourceByType caches type metadata
resourceByType map[schema.GroupVersionKind]*resourceMeta
mu sync.RWMutex
// structuredResourceByType caches structured type metadata
structuredResourceByType map[schema.GroupVersionKind]*resourceMeta
// unstructuredResourceByType caches unstructured type metadata
unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta
mu sync.RWMutex
}

// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
// If the object is a list, the resource represents the item's type instead.
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList bool) (*resourceMeta, error) {
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool) (*resourceMeta, error) {
if strings.HasSuffix(gvk.Kind, "List") && isList {
// if this was a list, treat it as a request for the item's resource
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

client, err := apiutil.RESTClientForGVK(gvk, c.config, c.codecs)
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
if err != nil {
return nil, err
}
Expand All @@ -75,10 +78,18 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
return nil, err
}

_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

// It's better to do creation work twice than to not let multiple
// people make requests at once
c.mu.RLock()
r, known := c.resourceByType[gvk]
resourceByType := c.structuredResourceByType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this so that it's under the lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if isUnstructured {
resourceByType = c.unstructuredResourceByType
}
r, known := resourceByType[gvk]
c.mu.RUnlock()

if known {
Expand All @@ -88,11 +99,11 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
// Initialize a new Client
c.mu.Lock()
defer c.mu.Unlock()
r, err = c.newResource(gvk, meta.IsListType(obj))
r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured)
if err != nil {
return nil, err
}
c.resourceByType[gvk] = r
resourceByType[gvk] = r
return r, err
}

Expand Down