-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding ability for clients, cache and watcher to work with unstructured #101
Merged
k8s-ci-robot
merged 8 commits into
kubernetes-sigs:master
from
shawn-hurley:unstructured
Sep 20, 2018
Merged
Changes from 7 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f06aa3e
Adding ability for clients, cache and watcher to work with unstructured
dbc79c9
Adding delegating reader for get and list requests for unstructured objs
07715f6
Adding test cases for getting and listing unstructured types.
247e85a
Adding unstructured clients testing.
7744200
Adding informer cache tests.
10ed9c2
Adding ability to use dynamic list for unstructured list watcher.
DirectXMan12 00c5c79
Adding two types of client for unstructured and typed
7471369
Status Writer working with unstructured as well as typed objects
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package internal | ||
|
||
import ( | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/api/meta" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
) | ||
|
||
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. | ||
// It uses a standard parameter codec constructed based on the given generated Scheme. | ||
type InformersMap struct { | ||
// we abstract over the details of structured vs unstructured with the specificInformerMaps | ||
|
||
structured *specificInformersMap | ||
unstructured *specificInformersMap | ||
|
||
// Scheme maps runtime.Objects to GroupVersionKinds | ||
Scheme *runtime.Scheme | ||
} | ||
|
||
// NewInformersMap creates a new InformersMap that can create informers for | ||
// both structured and unstructured objects. | ||
func NewInformersMap(config *rest.Config, | ||
scheme *runtime.Scheme, | ||
mapper meta.RESTMapper, | ||
resync time.Duration) *InformersMap { | ||
|
||
return &InformersMap{ | ||
structured: newStructuredInformersMap(config, scheme, mapper, resync), | ||
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync), | ||
|
||
Scheme: scheme, | ||
} | ||
} | ||
|
||
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. | ||
func (m *InformersMap) Start(stop <-chan struct{}) error { | ||
go m.structured.Start(stop) | ||
go m.unstructured.Start(stop) | ||
<-stop | ||
return nil | ||
} | ||
|
||
// WaitForCacheSync waits until all the caches have been synced. | ||
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool { | ||
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...) | ||
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...) | ||
|
||
return cache.WaitForCacheSync(stop, syncedFuncs...) | ||
} | ||
|
||
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns | ||
// the Informer from the map. | ||
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { | ||
_, isUnstructured := obj.(*unstructured.Unstructured) | ||
_, isUnstructuredList := obj.(*unstructured.UnstructuredList) | ||
isUnstructured = isUnstructured || isUnstructuredList | ||
|
||
if isUnstructured { | ||
return m.unstructured.Get(gvk, obj) | ||
} | ||
|
||
return m.structured.Get(gvk, obj) | ||
} | ||
|
||
// newStructuredInformersMap creates a new InformersMap for structured objects. | ||
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap { | ||
return newSpecificInformersMap(config, scheme, mapper, resync, createStructuredListWatch) | ||
} | ||
|
||
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects. | ||
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration) *specificInformersMap { | ||
return newSpecificInformersMap(config, scheme, mapper, resync, createUnstructuredListWatch) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,24 +27,31 @@ import ( | |
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/runtime/serializer" | ||
"k8s.io/apimachinery/pkg/watch" | ||
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil" | ||
) | ||
|
||
// NewInformersMap returns a new InformersMap | ||
func NewInformersMap(config *rest.Config, | ||
// clientListWatcherFunc knows how to create a ListWatcher | ||
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) | ||
|
||
// newSpecificInformersMap returns a new specificInformersMap (like | ||
// the generical InformersMap, except that it doesn't implement WaitForCacheSync). | ||
func newSpecificInformersMap(config *rest.Config, | ||
scheme *runtime.Scheme, | ||
mapper meta.RESTMapper, | ||
resync time.Duration) *InformersMap { | ||
ip := &InformersMap{ | ||
config: config, | ||
Scheme: scheme, | ||
mapper: mapper, | ||
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), | ||
codecs: serializer.NewCodecFactory(scheme), | ||
paramCodec: runtime.NewParameterCodec(scheme), | ||
resync: resync, | ||
resync time.Duration, createListWatcher createListWatcherFunc) *specificInformersMap { | ||
ip := &specificInformersMap{ | ||
config: config, | ||
Scheme: scheme, | ||
mapper: mapper, | ||
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), | ||
codecs: serializer.NewCodecFactory(scheme), | ||
paramCodec: runtime.NewParameterCodec(scheme), | ||
resync: resync, | ||
createListWatcher: createListWatcher, | ||
} | ||
return ip | ||
} | ||
|
@@ -58,9 +65,9 @@ type MapEntry struct { | |
Reader CacheReader | ||
} | ||
|
||
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. | ||
//It uses a standard parameter codec constructed based on the given generated Scheme. | ||
type InformersMap struct { | ||
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. | ||
// It uses a standard parameter codec constructed based on the given generated Scheme. | ||
type specificInformersMap struct { | ||
// Scheme maps runtime.Objects to GroupVersionKinds | ||
Scheme *runtime.Scheme | ||
|
||
|
@@ -90,10 +97,16 @@ type InformersMap struct { | |
|
||
// start is true if the informers have been started | ||
started bool | ||
|
||
// createClient knows how to create a client and a list object, | ||
// and allows for abstracting over the particulars of structured vs | ||
// unstructured objects. | ||
createListWatcher createListWatcherFunc | ||
} | ||
|
||
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. | ||
func (ip *InformersMap) Start(stop <-chan struct{}) error { | ||
// It doesn't return start because it can't return an error, and it's not a runnable directly. | ||
func (ip *specificInformersMap) Start(stop <-chan struct{}) { | ||
func() { | ||
ip.mu.Lock() | ||
defer ip.mu.Unlock() | ||
|
@@ -110,21 +123,20 @@ func (ip *InformersMap) Start(stop <-chan struct{}) error { | |
ip.started = true | ||
}() | ||
<-stop | ||
return nil | ||
} | ||
|
||
// WaitForCacheSync waits until all the caches have been synced | ||
func (ip *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool { | ||
// HasSyncedFuncs returns all the HasSynced functions for the informers in this map. | ||
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { | ||
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK)) | ||
for _, informer := range ip.informersByGVK { | ||
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced) | ||
} | ||
return cache.WaitForCacheSync(stop, syncedFuncs...) | ||
return syncedFuncs | ||
} | ||
|
||
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns | ||
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns | ||
// the Informer from the map. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need a note that structured and unstructured are treated distinctly |
||
func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { | ||
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { | ||
// Return the informer if it is found | ||
i, ok := func() (*MapEntry, bool) { | ||
ip.mu.RLock() | ||
|
@@ -154,7 +166,7 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M | |
|
||
// Create a NewSharedIndexInformer and add it to the map. | ||
var lw *cache.ListWatch | ||
lw, err := ip.newListWatch(gvk) | ||
lw, err := ip.createListWatcher(gvk, ip) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -191,22 +203,18 @@ func (ip *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*M | |
} | ||
|
||
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer. | ||
func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWatch, error) { | ||
// Construct a RESTClient for the groupVersionKind that we will use to | ||
// talk to the apiserver. | ||
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { | ||
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the | ||
// groupVersionKind to the Resource API we will use. | ||
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Get a listObject for listing that the ListWatch can DeepCopy | ||
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs) | ||
if err != nil { | ||
return nil, err | ||
} | ||
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") | ||
listObj, err := ip.Scheme.New(listGVK) | ||
if err != nil { | ||
|
@@ -228,3 +236,29 @@ func (ip *InformersMap) newListWatch(gvk schema.GroupVersionKind) (*cache.ListWa | |
}, | ||
}, nil | ||
} | ||
|
||
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { | ||
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the | ||
// groupVersionKind to the Resource API we will use. | ||
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) | ||
if err != nil { | ||
return nil, err | ||
} | ||
dynamicClient, err := dynamic.NewForConfig(ip.config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Create a new ListWatch for the obj | ||
return &cache.ListWatch{ | ||
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { | ||
return dynamicClient.Resource(mapping.Resource).List(opts) | ||
}, | ||
// Setup the watch function | ||
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { | ||
// Watch needs to be set to true separately | ||
opts.Watch = true | ||
return dynamicClient.Resource(mapping.Resource).Watch(opts) | ||
}, | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming GVKForObject has special handling for
unstructured
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it does. unstructured does conform to the runtime.Object interface and has the methods to retrieve the GVK from that interface at least that is my understanding.
Is there an edge case that I am not considering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. No, was just curious.