Skip to content

Commit

Permalink
Adding two types of client for unstructured and typed
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Hurley committed Sep 18, 2018
1 parent 10ed9c2 commit 00c5c79
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 175 deletions.
62 changes: 35 additions & 27 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var _ = Describe("Informer Cache", func() {
It("should be able to list objects that haven't been watched previously", func() {
By("listing all services in the cluster")
listObj := &kcorev1.ServiceList{}
Expect(informerCache.List(context.Background(), listObj)).To(Succeed())
Expect(informerCache.List(context.Background(), nil, listObj)).To(Succeed())

By("verifying that the returned list contains the Kubernetes service")
// NB: kubernetes default service is automatically created in testenv.
Expand Down Expand Up @@ -147,10 +147,10 @@ var _ = Describe("Informer Cache", func() {
By("listing pods with a particular label")
// NB: each pod has a "test-label": <pod-name>
out := kcorev1.PodList{}
Expect(informerCache.List(context.Background(), &out,
client.InNamespace(testNamespaceTwo),
client.MatchingLabels(map[string]string{"test-label": "test-pod-2"}),
)).To(Succeed())
lo := &client.ListOptions{}
lo.InNamespace(testNamespaceTwo)
lo.MatchingLabels(map[string]string{"test-label": "test-pod-2"})
Expect(informerCache.List(context.Background(), lo, &out)).To(Succeed())

By("verifying the returned pods have the correct label")
Expect(out.Items).NotTo(BeEmpty())
Expand All @@ -167,9 +167,9 @@ var _ = Describe("Informer Cache", func() {
// NB: each pod has a "test-label": <pod-name>
out := kcorev1.PodList{}
labels := map[string]string{"test-label": "test-pod-2"}
Expect(informerCache.List(context.Background(), &out,
client.MatchingLabels(labels),
)).To(Succeed())
lo := &client.ListOptions{}
lo.MatchingLabels(labels)
Expect(informerCache.List(context.Background(), lo, &out)).To(Succeed())

By("verifying multiple pods with the same label in different namespaces are returned")
Expect(out.Items).NotTo(BeEmpty())
Expand All @@ -184,9 +184,9 @@ var _ = Describe("Informer Cache", func() {
It("should be able to list objects by namespace", func() {
By("listing pods in test-namespace-1")
listObj := &kcorev1.PodList{}
Expect(informerCache.List(context.Background(),
client.InNamespace(testNamespaceOne),
listObj)).To(Succeed())
lo := &client.ListOptions{}
lo.InNamespace(testNamespaceOne)
Expect(informerCache.List(context.Background(), lo, listObj)).To(Succeed())

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expand Down Expand Up @@ -231,7 +231,8 @@ var _ = Describe("Informer Cache", func() {
Version: "v1",
Kind: "ServiceList",
})
Expect(informerCache.List(context.Background(), nil, listObj)).To(Succeed())
err := informerCache.List(context.Background(), nil, listObj)
Expect(err).To(Succeed())

By("verifying that the returned list contains the Kubernetes service")
// NB: kubernetes default service is automatically created in testenv.
Expand Down Expand Up @@ -271,8 +272,11 @@ var _ = Describe("Informer Cache", func() {
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(), client.InNamespace(testNamespaceTwo).
MatchingLabels(map[string]string{"test-label": "test-pod-2"}), &out)).To(Succeed())
lo := &client.ListOptions{}
lo.InNamespace(testNamespaceTwo)
lo.MatchingLabels(map[string]string{"test-label": "test-pod-2"})
err := informerCache.List(context.Background(), lo, &out)
Expect(err).To(Succeed())

By("verifying the returned pods have the correct label")
Expect(out.Items).NotTo(BeEmpty())
Expand All @@ -294,8 +298,10 @@ var _ = Describe("Informer Cache", func() {
Kind: "PodList",
})
labels := map[string]string{"test-label": "test-pod-2"}
Expect(informerCache.List(context.Background(),
client.MatchingLabels(labels), &out)).To(Succeed())
lo := &client.ListOptions{}
lo.MatchingLabels(labels)
err := informerCache.List(context.Background(), lo, &out)
Expect(err).To(Succeed())

By("verifying multiple pods with the same label in different namespaces are returned")
Expect(out.Items).NotTo(BeEmpty())
Expand All @@ -315,9 +321,10 @@ var _ = Describe("Informer Cache", func() {
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(),
client.InNamespace(testNamespaceOne),
listObj)).To(Succeed())
lo := &client.ListOptions{}
lo.InNamespace(testNamespaceOne)
err := informerCache.List(context.Background(), lo, listObj)
Expect(err).To(Succeed())

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expand Down Expand Up @@ -470,9 +477,9 @@ var _ = Describe("Informer Cache", func() {

By("listing Pods with restartPolicyOnFailure")
listObj := &kcorev1.PodList{}
Expect(informer.List(context.Background(), listObj,
client.MatchingField("spec.restartPolicy", "OnFailure"),
)).To(Succeed())
lo := &client.ListOptions{}
lo.MatchingField("spec.restartPolicy", "OnFailure")
Expect(informer.List(context.Background(), lo, listObj)).To(Succeed())

By("verifying that the returned pods have correct restart policy")
Expect(listObj.Items).NotTo(BeEmpty())
Expand Down Expand Up @@ -524,7 +531,7 @@ var _ = Describe("Informer Cache", func() {
By("verifying the object is received on the channel")
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})
}, 3)

It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
Expand Down Expand Up @@ -565,16 +572,17 @@ var _ = Describe("Informer Cache", func() {
Version: "v1",
Kind: "PodList",
})
Expect(informer.List(context.Background(),
client.MatchingField("spec.restartPolicy", "OnFailure"),
listObj)).To(Succeed())
lo := &client.ListOptions{}
lo.MatchingField("spec.restartPolicy", "OnFailure")
err = informer.List(context.Background(), lo, listObj)
Expect(err).To(Succeed())

By("verifying that the returned pods have correct restart policy")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.GetName()).To(Equal("test-pod-3"))
})
}, 3)
})
})
})
5 changes: 0 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt

// List implements Reader
func (ip *informerCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
itemsPtr, err := apimeta.GetItemsPtr(out)
if err != nil {
return nil
}

gvk, err := apiutil.GVKForObject(out, ip.Scheme)
if err != nil {
return err
Expand Down
107 changes: 43 additions & 64 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"reflect"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -59,16 +60,26 @@ func New(config *rest.Config, options Options) (Client, error) {
}
}

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}

c := &client{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
unstructuredResourceByGVK: make(map[schema.GroupVersionKind]*resourceMeta),
typedClient: typedClient{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
client: dynamicClient,
restMapper: options.Mapper,
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
}

return c, nil
Expand All @@ -79,85 +90,53 @@ var _ Client = &client{}
// client is a client.Client that reads and writes directly from/to an API server. It lazily initializes
// new clients at the time they are used, and caches the client.
type client struct {
cache clientCache
paramCodec runtime.ParameterCodec
typedClient typedClient
unstructuredClient unstructuredClient
}

// Create implements client.Client
func (c *client) Create(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Create(ctx, obj)
}
return o.Post().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Body(obj).
Do().
Into(obj)
return c.typedClient.Create(ctx, obj)
}

// Update implements client.Client
func (c *client) Update(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Update(ctx, obj)
}
return o.Put().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Body(obj).
Do().
Into(obj)
return c.typedClient.Update(ctx, obj)
}

// Delete implements client.Client
func (c *client) Delete(ctx context.Context, obj runtime.Object, opts ...DeleteOptionFunc) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Delete(ctx, obj, opts...)
}

deleteOpts := DeleteOptions{}
return o.Delete().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Body(deleteOpts.ApplyOptions(opts).AsDeleteOptions()).
Do().
Error()
return c.typedClient.Delete(ctx, obj, opts...)
}

// Get implements client.Client
func (c *client) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Get(ctx, key, obj)
}
return r.Get().
NamespaceIfScoped(key.Namespace, r.isNamespaced()).
Resource(r.resource()).
Name(key.Name).Do().Into(obj)
return c.typedClient.Get(ctx, key, obj)
}

// List implements client.Client
func (c *client) List(ctx context.Context, opts *ListOptions, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
_, ok := obj.(*unstructured.UnstructuredList)
if ok {
return c.unstructuredClient.List(ctx, opts, obj)
}
namespace := ""
if opts != nil {
namespace = opts.Namespace
}
return r.Get().
NamespaceIfScoped(namespace, r.isNamespaced()).
Resource(r.resource()).
Body(obj).
VersionedParams(opts.AsListOptions(), c.paramCodec).
Do().
Into(obj)
return c.typedClient.List(ctx, opts, obj)
}

// Status implements client.StatusClient
Expand All @@ -175,7 +154,7 @@ var _ StatusWriter = &statusWriter{}

// Update implements client.StatusWriter
func (sw *statusWriter) Update(_ context.Context, obj runtime.Object) error {
o, err := sw.client.cache.getObjMeta(obj)
o, err := sw.client.typedClient.cache.getObjMeta(obj)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 00c5c79

Please sign in to comment.