Skip to content

Commit

Permalink
Use application/vnd.kubernetes.protobuf as content-type if possible
Browse files Browse the repository at this point in the history
Signed-off-by: Siyu Wang <FillZpp.pub@gmail.com>
  • Loading branch information
FillZpp committed Oct 19, 2020
1 parent f52b618 commit 00b4b34
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return nil, err
}

client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,24 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)

var (
protobufScheme = runtime.NewScheme()
)

func init() {
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
panic(err)
}
}

// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
func NewDiscoveryRESTMapper(c *rest.Config) (meta.RESTMapper, error) {
Expand Down Expand Up @@ -71,16 +85,16 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
// baseConfig, if set, otherwise a default serializer will be set.
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
cfg := createRestConfig(gvk, baseConfig)
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
cfg := createRestConfig(gvk, isUnstructured, baseConfig)
if cfg.NegotiatedSerializer == nil {
cfg.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: codecs}
}
return rest.RESTClientFor(cfg)
}

//createRestConfig copies the base config and updates needed fields for a new rest config
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config) *rest.Config {
gv := gvk.GroupVersion()

cfg := rest.CopyConfig(baseConfig)
Expand All @@ -93,5 +107,9 @@ func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *res
if cfg.UserAgent == "" {
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
}
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
if cfg.ContentType == "" && !isUnstructured && protobufScheme.Recognizes(gvk) {
cfg.ContentType = runtime.ContentTypeProtobuf
}
return cfg
}
12 changes: 7 additions & 5 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func New(config *rest.Config, options Options) (Client, error) {
}

clientcache := &clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),

structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
}

c := &client{
Expand Down
27 changes: 19 additions & 8 deletions pkg/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand All @@ -43,20 +44,22 @@ type clientCache struct {
// codecs are used to create a REST client for a gvk
codecs serializer.CodecFactory

// resourceByType caches type metadata
resourceByType map[schema.GroupVersionKind]*resourceMeta
mu sync.RWMutex
// structuredResourceByType caches structured type metadata
structuredResourceByType map[schema.GroupVersionKind]*resourceMeta
// unstructuredResourceByType caches unstructured type metadata
unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta
mu sync.RWMutex
}

// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
// If the object is a list, the resource represents the item's type instead.
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList bool) (*resourceMeta, error) {
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool) (*resourceMeta, error) {
if strings.HasSuffix(gvk.Kind, "List") && isList {
// if this was a list, treat it as a request for the item's resource
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

client, err := apiutil.RESTClientForGVK(gvk, c.config, c.codecs)
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
if err != nil {
return nil, err
}
Expand All @@ -75,10 +78,18 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
return nil, err
}

_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

// It's better to do creation work twice than to not let multiple
// people make requests at once
c.mu.RLock()
r, known := c.resourceByType[gvk]
resourceByType := c.structuredResourceByType
if isUnstructured {
resourceByType = c.unstructuredResourceByType
}
r, known := resourceByType[gvk]
c.mu.RUnlock()

if known {
Expand All @@ -88,11 +99,11 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
// Initialize a new Client
c.mu.Lock()
defer c.mu.Unlock()
r, err = c.newResource(gvk, meta.IsListType(obj))
r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured)
if err != nil {
return nil, err
}
c.resourceByType[gvk] = r
resourceByType[gvk] = r
return r, err
}

Expand Down

0 comments on commit 00b4b34

Please sign in to comment.