Skip to content

Commit

Permalink
Merge pull request #2147 from inteon/watch
Browse files Browse the repository at this point in the history
✨ Improve unstructured serialisation
  • Loading branch information
k8s-ci-robot committed Jan 27, 2023
2 parents dd6bafd + fa6aa54 commit 595f569
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 34 deletions.
19 changes: 3 additions & 16 deletions pkg/client/apiutil/apimachinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
Expand Down Expand Up @@ -153,19 +154,6 @@ func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
return rest.RESTClientFor(createRestConfig(gvk, isUnstructured, baseConfig, codecs))
}

// serializerWithDecodedGVK is a CodecFactory that overrides the DecoderToVersion of a WithoutConversionCodecFactory
// in order to avoid clearing the GVK from the decoded object.
//
// See https://github.com/kubernetes/kubernetes/issues/80609.
type serializerWithDecodedGVK struct {
serializer.WithoutConversionCodecFactory
}

// DecoderToVersion returns an decoder that does not do conversion.
func (f serializerWithDecodedGVK) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder {
return serializer
}

// createRestConfig copies the base config and updates needed fields for a new rest config.
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config {
gv := gvk.GroupVersion()
Expand All @@ -190,9 +178,8 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf
}

if isUnstructured {
// If the object is unstructured, we need to preserve the GVK information.
// Use our own custom serializer.
cfg.NegotiatedSerializer = serializerWithDecodedGVK{serializer.WithoutConversionCodecFactory{CodecFactory: codecs}}
// If the object is unstructured, we use the client-go dynamic serializer.
cfg = dynamic.ConfigFor(cfg)
} else {
cfg.NegotiatedSerializer = serializerWithTargetZeroingDecode{NegotiatedSerializer: serializer.WithoutConversionCodecFactory{CodecFactory: codecs}}
}
Expand Down
20 changes: 6 additions & 14 deletions pkg/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

Expand All @@ -33,16 +32,11 @@ func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &watchingClient{client: client, dynamic: dynamicClient}, nil
return &watchingClient{client: client}, nil
}

type watchingClient struct {
*client
dynamic dynamic.Interface
}

func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) {
Expand Down Expand Up @@ -82,20 +76,18 @@ func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialO
}

func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
gvk := obj.GroupVersionKind()
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")

r, err := w.client.unstructuredClient.resources.getResource(obj)
if err != nil {
return nil, err
}

listOpts := w.listOpts(opts...)

if listOpts.Namespace != "" && r.isNamespaced() {
return w.dynamic.Resource(r.mapping.Resource).Namespace(listOpts.Namespace).Watch(ctx, *listOpts.AsListOptions())
}
return w.dynamic.Resource(r.mapping.Resource).Watch(ctx, *listOpts.AsListOptions())
return r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), w.client.unstructuredClient.paramCodec).
Watch(ctx)
}

func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) {
Expand Down
17 changes: 13 additions & 4 deletions pkg/client/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("ClientWithWatch", func() {
Expect(cl).NotTo(BeNil())
})

watchSuite := func(through client.ObjectList, expectedType client.Object) {
watchSuite := func(through client.ObjectList, expectedType client.Object, checkGvk bool) {
cl, err := client.NewWithWatch(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())
Expand All @@ -99,10 +99,19 @@ var _ = Describe("ClientWithWatch", func() {
Expect(metaObject.GetName()).To(Equal(dep.Name))
Expect(metaObject.GetUID()).To(Equal(dep.UID))

if checkGvk {
runtimeObject := event.Object
gvk := runtimeObject.GetObjectKind().GroupVersionKind()
Expect(gvk).To(Equal(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
}))
}
}

It("should receive a create event when watching the typed object", func() {
watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{})
watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{}, false)
})

It("should receive a create event when watching the unstructured object", func() {
Expand All @@ -112,12 +121,12 @@ var _ = Describe("ClientWithWatch", func() {
Kind: "Deployment",
Version: "v1",
})
watchSuite(u, &unstructured.Unstructured{})
watchSuite(u, &unstructured.Unstructured{}, true)
})

It("should receive a create event when watching the metadata object", func() {
m := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}}
watchSuite(m, &metav1.PartialObjectMetadata{})
watchSuite(m, &metav1.PartialObjectMetadata{}, false)
})
})

Expand Down

0 comments on commit 595f569

Please sign in to comment.