Skip to content

Commit

Permalink
Add support for fetching all resources of a specific category in migr…
Browse files Browse the repository at this point in the history
…ation.KubernetesSource

Signed-off-by: Alper Rifat Ulucinar <ulucinar@users.noreply.github.com>
  • Loading branch information
ulucinar committed Jun 8, 2023
1 parent 57978a6 commit d1727da
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
113 changes: 87 additions & 26 deletions pkg/migration/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ var (
// KubernetesSource is a source implementation to read resources from Kubernetes
// cluster.
type KubernetesSource struct {
registry *Registry
categories []Category
index int
items []UnstructuredWithMetadata
dynamicClient dynamic.Interface
cachedDiscoveryClient discovery.CachedDiscoveryInterface
restMapper meta.RESTMapper
categoryExpander restmapper.CategoryExpander
cacheDir string
}

Expand All @@ -50,22 +53,40 @@ func WithCacheDir(cacheDir string) KubernetesSourceOption {
}
}

// WithRegistry configures a KubernetesSource to use the specified registry
// for determining the GVKs of resources which will be read from the
// Kubernetes API server.
func WithRegistry(r *Registry) KubernetesSourceOption {
return func(s *KubernetesSource) {
s.registry = r
}
}

// WithCategories configures a KubernetesSource so that it will fetch
// all resources belonging to the specified categories.
func WithCategories(c []Category) KubernetesSourceOption {
return func(s *KubernetesSource) {
s.categories = c
}
}

// NewKubernetesSourceFromKubeConfig initializes a new KubernetesSource using
// the specified kube config file and KubernetesSourceOptions.
func NewKubernetesSourceFromKubeConfig(r *Registry, kubeconfigPath string, opts ...KubernetesSourceOption) (*KubernetesSource, error) {
func NewKubernetesSourceFromKubeConfig(kubeconfigPath string, opts ...KubernetesSourceOption) (*KubernetesSource, error) {
ks := &KubernetesSource{}
for _, o := range opts {
o(ks)
}
dynamicClient, err := InitializeDynamicClient(kubeconfigPath)
var err error
ks.dynamicClient, err = InitializeDynamicClient(kubeconfigPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize a Kubernetes dynamic client from kubeconfig: %s", kubeconfigPath)
}
cachedDiscoveryClient, err := InitializeDiscoveryClient(kubeconfigPath, ks.cacheDir)
ks.cachedDiscoveryClient, err = InitializeDiscoveryClient(kubeconfigPath, ks.cacheDir)
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize a Kubernetes discovery client from kubeconfig: %s", kubeconfigPath)
}
return NewKubernetesSource(r, dynamicClient, cachedDiscoveryClient)
return ks, errors.Wrap(ks.init(), errKubernetesSourceInit)
}

// NewKubernetesSource returns a KubernetesSource
Expand All @@ -76,52 +97,92 @@ func NewKubernetesSourceFromKubeConfig(r *Registry, kubeconfigPath string, opts
// Group: "ec2.aws.upbound.io",
// Version: "v1beta1",
// Kind: "VPC",
func NewKubernetesSource(r *Registry, dynamicClient dynamic.Interface, discoveryClient discovery.CachedDiscoveryInterface) (*KubernetesSource, error) {
func NewKubernetesSource(dynamicClient dynamic.Interface, discoveryClient discovery.CachedDiscoveryInterface, opts ...KubernetesSourceOption) (*KubernetesSource, error) {
ks := &KubernetesSource{
dynamicClient: dynamicClient,
cachedDiscoveryClient: discoveryClient,
restMapper: restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient),
}
return ks, errors.Wrap(ks.init(r), errKubernetesSourceInit)
for _, o := range opts {
o(ks)
}
return ks, errors.Wrap(ks.init(), errKubernetesSourceInit)
}

func (ks *KubernetesSource) init(r *Registry) error {
if err := ks.getResources(r.claimTypes, CategoryClaim); err != nil {
func (ks *KubernetesSource) init() error {
ks.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(ks.cachedDiscoveryClient)
ks.categoryExpander = restmapper.NewDiscoveryCategoryExpander(ks.cachedDiscoveryClient)

for _, c := range ks.categories {
if err := ks.getCategoryResources(c); err != nil {
return errors.Wrapf(err, "cannot get resources of the category: %s", c)
}
}

if ks.registry == nil {
return nil
}
if err := ks.getGVKResources(ks.registry.claimTypes, CategoryClaim); err != nil {
return errors.Wrap(err, "cannot get claims")
}
if err := ks.getResources(r.compositeTypes, CategoryComposite); err != nil {
if err := ks.getGVKResources(ks.registry.compositeTypes, CategoryComposite); err != nil {
return errors.Wrap(err, "cannot get composites")
}
if err := ks.getResources(r.GetCompositionGVKs(), CategoryComposition); err != nil {
if err := ks.getGVKResources(ks.registry.GetCompositionGVKs(), CategoryComposition); err != nil {
return errors.Wrap(err, "cannot get compositions")
}
return errors.Wrap(ks.getResources(r.GetManagedResourceGVKs(), CategoryManaged), "cannot get managed resources")
return errors.Wrap(ks.getGVKResources(ks.registry.GetManagedResourceGVKs(), CategoryManaged), "cannot get managed resources")
}

func (ks *KubernetesSource) getResources(gvks []schema.GroupVersionKind, category Category) error {
func (ks *KubernetesSource) getCategoryResources(c Category) error {
grs, _ := ks.categoryExpander.Expand(c.String())
for _, gr := range grs {
gvrs, err := ks.restMapper.ResourcesFor(schema.GroupVersionResource{
Group: gr.Group,
Resource: gr.Resource,
})
if err != nil {
return errors.Wrapf(err, "cannot discover GVRs for GroupResource: %s", gr.String())
}
for _, gvr := range gvrs {
if err := ks.getResourcesFor(gvr, c); err != nil {
return errors.Wrapf(err, "cannot get resources of the category: %s", c.String())
}
}
}
return nil
}

func (ks *KubernetesSource) getGVKResources(gvks []schema.GroupVersionKind, category Category) error {
for _, gvk := range gvks {
m, err := ks.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return errors.Wrapf(err, "cannot get REST mappings for GVK: %s", gvk.String())
}
ri := ks.dynamicClient.Resource(m.Resource)
unstructuredList, err := ri.List(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "cannot list resources of GVR: %s", m.Resource.String())
}
for _, u := range unstructuredList.Items {
ks.items = append(ks.items, UnstructuredWithMetadata{
Object: u,
Metadata: Metadata{
Path: string(u.GetUID()),
Category: category,
},
})
if err := ks.getResourcesFor(m.Resource, category); err != nil {
return errors.Wrapf(err, "cannot get resources for GVK: %s", gvk.String())
}
}
return nil
}

func (ks *KubernetesSource) getResourcesFor(gvr schema.GroupVersionResource, category Category) error {
ri := ks.dynamicClient.Resource(gvr)
unstructuredList, err := ri.List(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "cannot list resources of GVR: %s", gvr.String())
}
for _, u := range unstructuredList.Items {
ks.items = append(ks.items, UnstructuredWithMetadata{
Object: u,
Metadata: Metadata{
Path: string(u.GetUID()),
Category: category,
},
})
}
return nil
}

// HasNext checks the next item
func (ks *KubernetesSource) HasNext() (bool, error) {
return ks.index < len(ks.items), nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestNewKubernetesSource(t *testing.T) {
},
}

ks, err := NewKubernetesSource(r, dynamicClient, memory.NewMemCacheClient(client.Discovery()))
ks, err := NewKubernetesSource(dynamicClient, memory.NewMemCacheClient(client.Discovery()), WithRegistry(r))
if diff := cmp.Diff(tc.want.err, err); diff != "" {
t.Errorf("\nNext(...): -want, +got:\n%s", diff)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/migration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ const (
const (
categoryUnknown Category = ""
// CategoryClaim category for composite claim resources
CategoryClaim Category = "Claim"
CategoryClaim Category = "claim"
// CategoryComposite category for composite resources
CategoryComposite Category = "Composite"
CategoryComposite Category = "composite"
// CategoryComposition category for compositions
CategoryComposition Category = "Composition"
CategoryComposition Category = "composition"
// CategoryManaged category for managed resources
CategoryManaged Category = "Managed"
CategoryManaged Category = "managed"
)

// Plan represents a migration plan for migrating managed resources,
Expand Down Expand Up @@ -171,6 +171,11 @@ type Resource struct {
// Category specifies if a resource is a Claim, Composite or a Managed resource
type Category string

// String returns a string representing the receiver Category.
func (c Category) String() string {
return string(c)
}

// Metadata holds metadata for an object read from a Source
type Metadata struct {
// Path uniquely identifies the path for this object on its Source
Expand Down

0 comments on commit d1727da

Please sign in to comment.