Skip to content

Commit

Permalink
restructure code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
xrstf committed Mar 19, 2022
1 parent 4fc672a commit 06daf10
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
20 changes: 7 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -169,26 +168,19 @@ func watchKubernetes(ctx context.Context, log logrus.FieldLogger, args []string,
fmt.Println(clientset)
}

log.Debug("Creating REST mapper...")

mapper, cache, err := kubeutil.CreateRESTMapper(config, log)
resolver, err := kubeutil.NewResolver(config, log)
if err != nil {
log.Fatalf("Failed to create Kubernetes REST mapper: %v", err)
}

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create dynamic Kubernetes client: %v", err)
}

// validate resource kinds
log.Debug("Resolving resource kinds...")

kinds := map[string]schema.GroupVersionKind{}
for _, resourceKind := range resourceKinds {
log.Debugf("Resolving %s...", resourceKind)

parsed, err := kubeutil.MappingFor(mapper, cache, resourceKind)
parsed, err := resolver.Resolve(resourceKind)
if err != nil {
log.Fatalf("Unknown resource kind %q: %v", resourceKind, err)
}
Expand All @@ -207,13 +199,15 @@ func watchKubernetes(ctx context.Context, log logrus.FieldLogger, args []string,
log.Debug("Starting to watch resources...")

wg := sync.WaitGroup{}
w := watcher.NewWatcher(printer, appOpts.namespaces, resourceNames)

for _, gvk := range kinds {
dynamicInterface, err := kubeutil.GetDynamicInterface(gvk, dynamicClient, mapper)
dynamicInterface, err := resolver.ResourceInterfaceFor(gvk)
if err != nil {
log.Fatalf("Failed to create dynamic interface for %q resources: %v", gvk.Kind, err)
}

w, err := dynamicInterface.Watch(ctx, metav1.ListOptions{
wi, err := dynamicInterface.Watch(ctx, metav1.ListOptions{
LabelSelector: appOpts.labels,
})
if err != nil {
Expand All @@ -222,7 +216,7 @@ func watchKubernetes(ctx context.Context, log logrus.FieldLogger, args []string,

wg.Add(1)
go func() {
watcher.NewWatcher(printer, appOpts.namespaces, resourceNames).Watch(ctx, w)
w.Watch(ctx, wi)
wg.Done()
}()
}
Expand Down
54 changes: 42 additions & 12 deletions pkg/kubernetes/discovery.go → pkg/kubernetes/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ import (
"k8s.io/client-go/restmapper"
)

func CreateRESTMapper(config *rest.Config, log logrus.FieldLogger) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error) {
type Resolver struct {
mapper meta.RESTMapper
dynamicClient dynamic.Interface
cache discovery.CachedDiscoveryInterface
log logrus.FieldLogger
}

func NewResolver(config *rest.Config, log logrus.FieldLogger) (*Resolver, error) {
var (
discoveryClient discovery.DiscoveryInterface
cache discovery.CachedDiscoveryInterface
Expand All @@ -31,7 +38,7 @@ func CreateRESTMapper(config *rest.Config, log logrus.FieldLogger) (meta.RESTMap

discoveryClient, err = discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, nil, err
return nil, err
}

cache = memory.NewMemCacheClient(discoveryClient)
Expand All @@ -43,7 +50,7 @@ func CreateRESTMapper(config *rest.Config, log logrus.FieldLogger) (meta.RESTMap

client, err := disk.NewCachedDiscoveryClientForConfig(config, discoveryCacheDir, httpCacheDir, 6*time.Hour)
if err != nil {
return nil, nil, err
return nil, err
}

discoveryClient = client
Expand All @@ -53,7 +60,17 @@ func CreateRESTMapper(config *rest.Config, log logrus.FieldLogger) (meta.RESTMap
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cache)
fancyMapper := restmapper.NewShortcutExpander(mapper, discoveryClient)

return fancyMapper, cache, nil
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create dynamic Kubernetes client: %v", err)
}

return &Resolver{
mapper: fancyMapper,
dynamicClient: dynamicClient,
cache: cache,
log: log,
}, nil
}

// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive
Expand All @@ -70,26 +87,39 @@ func computeDiscoverCacheDir(parentDir, host string) string {
return filepath.Join(parentDir, safeHost)
}

func GetDynamicInterface(gvk schema.GroupVersionKind, dynamicClient dynamic.Interface, mapper meta.RESTMapper) (dynamic.ResourceInterface, error) {
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
func (r *Resolver) ResourceInterfaceFor(gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) {
mapping, err := r.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("failed to determine mapping: %w", err)
}

return dynamicClient.Resource(mapping.Resource), nil
return r.dynamicClient.Resource(mapping.Resource), nil
}

func (r *Resolver) InvalidateCache() {
r.cache.Invalidate()
}

func (r *Resolver) ResolveWithoutRetry(resourceOrKindArg string) (*meta.RESTMapping, error) {
mapping, err := mappingFor(r.mapper, resourceOrKindArg)
if meta.IsNoMatchError(err) {
return nil, nil
}

return mapping, err
}

func MappingFor(restMapper meta.RESTMapper, cache discovery.CachedDiscoveryInterface, resourceOrKindArg string) (*meta.RESTMapping, error) {
mapping, err := mappingFor(restMapper, resourceOrKindArg)
func (r *Resolver) Resolve(resourceOrKindArg string) (*meta.RESTMapping, error) {
mapping, err := mappingFor(r.mapper, resourceOrKindArg)
if meta.IsNoMatchError(err) {
cache.Invalidate()
r.cache.Invalidate()

// try again
mapping, err = mappingFor(restMapper, resourceOrKindArg)
mapping, err = mappingFor(r.mapper, resourceOrKindArg)
}

if meta.IsNoMatchError(err) {
return nil, fmt.Errorf("the server doesn't have a resource type %q", resourceOrKindArg)
return nil, nil
}

return mapping, err
Expand Down

0 comments on commit 06daf10

Please sign in to comment.