From aede84b40f9a37c8628d024914492fc9caadd30c Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 9 May 2020 21:44:03 -0400 Subject: [PATCH] Add multicluster controller example --- examples/README.md | 11 +- examples/multicluster/main.go | 74 ++++++++++ examples/multicluster/main_test.go | 129 ++++++++++++++++++ .../multicluster/reconciler/reconciler.go | 125 +++++++++++++++++ 4 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 examples/multicluster/main.go create mode 100644 examples/multicluster/main_test.go create mode 100644 examples/multicluster/reconciler/reconciler.go diff --git a/examples/README.md b/examples/README.md index 2110ae214e..a378d0d22f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -31,6 +31,15 @@ This example implements a *new* Kubernetes resource, ChaosPod, and creates a cus 5. Adds ChaosPod webhooks to manager 6. Starts the manager +### multicluster + +This example implements a simplistic controller that watches pods in one cluster (`referencecluster`) and creates +an identical pod for each pod observed in a different cluster (`mirrorcluster`). + +* `main.go`: Initialization code +* `main_test.go`: Tests that verify the reconciliation +* `reconciler/reconciler.go`: The actual reconciliation logic + ## Deploying and Running -To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html). \ No newline at end of file +To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html). diff --git a/examples/multicluster/main.go b/examples/multicluster/main.go new file mode 100644 index 0000000000..5338a6eb97 --- /dev/null +++ b/examples/multicluster/main.go @@ -0,0 +1,74 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "k8s.io/client-go/rest" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/examples/multicluster/reconciler" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" +) + +func main() { + log := ctrl.Log.WithName("pod-mirror-controller") + + referenceClusterCfg, err := config.GetConfigWithContext("reference-cluster") + if err != nil { + log.Error(err, "failed to get reference cluster config") + os.Exit(1) + } + + mirrorClusterCfg, err := config.GetConfigWithContext("mirror-cluster") + if err != nil { + log.Error(err, "failed to get mirror cluster config") + os.Exit(1) + } + ctrl.SetupSignalHandler() + + if err := run(referenceClusterCfg, mirrorClusterCfg, ctrl.SetupSignalHandler()); err != nil { + log.Error(err, "failed to run controller") + os.Exit(1) + } + + log.Info("Finished gracefully") +} + +func run(referenceClusterConfig, mirrorClusterConfig *rest.Config, stop <-chan struct{}) error { + mgr, err := ctrl.NewManager(referenceClusterConfig, ctrl.Options{}) + if err != nil { + return fmt.Errorf("failed to construct manager: %w", err) + } + clusterConnector, err := clusterconnector.New(mirrorClusterConfig, mgr, "mirror_cluster") + if err != nil { + return fmt.Errorf("failed to construct clusterconnector: %w", err) + } + + if err := reconciler.Add(mgr, clusterConnector); err != nil { + return fmt.Errorf("failed to construct reconciler: %w", err) + } + + if err := mgr.Start(stop); err != nil { + return fmt.Errorf("failed to start manager: %w", err) + } + + return nil +} diff --git a/examples/multicluster/main_test.go b/examples/multicluster/main_test.go new file mode 100644 index 0000000000..63012567c5 --- /dev/null +++ b/examples/multicluster/main_test.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "fmt" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +func TestRun(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecsWithDefaultAndCustomReporters(t, "MirrorPod reconciler Integration Suite", []Reporter{printer.NewlineReporter{}}) +} + +var _ = Describe("clusterconnector.ClusterConnector", func() { + var stop chan struct{} + var referenceClusterCfg *rest.Config + var mirrorClusterCfg *rest.Config + var referenceClusterTestEnv *envtest.Environment + var mirrorClusterTestEnv *envtest.Environment + var referenceClusterClient client.Client + var mirrorClusterClient client.Client + + Describe("multi-cluster-controller", func() { + BeforeEach(func() { + stop = make(chan struct{}) + referenceClusterTestEnv = &envtest.Environment{} + mirrorClusterTestEnv = &envtest.Environment{} + + var err error + referenceClusterCfg, err = referenceClusterTestEnv.Start() + Expect(err).NotTo(HaveOccurred()) + + mirrorClusterCfg, err = mirrorClusterTestEnv.Start() + Expect(err).NotTo(HaveOccurred()) + + referenceClusterClient, err = client.New(referenceClusterCfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + mirrorClusterClient, err = client.New(mirrorClusterCfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + go func() { + run(referenceClusterCfg, mirrorClusterCfg, stop) + }() + }) + + AfterEach(func() { + close(stop) + Expect(referenceClusterTestEnv.Stop()).To(Succeed()) + Expect(mirrorClusterTestEnv.Stop()).To(Succeed()) + }) + + It("Should reconcile pods", func() { + ctx := context.Background() + referencePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "satan", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "fancy-one", + Image: "nginx", + }}, + }, + } + Expect(referenceClusterClient.Create(ctx, referencePod)).NotTo(HaveOccurred()) + name := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name} + + By("Setting a finalizer", func() { + Eventually(func() error { + updatedPod := &corev1.Pod{} + if err := referenceClusterClient.Get(ctx, name, updatedPod); err != nil { + return err + } + if n := len(updatedPod.Finalizers); n != 1 { + return fmt.Errorf("expected exactly one finalizer, got %d", n) + } + return nil + }).Should(Succeed()) + + }) + + By("Creating a pod in the mirror cluster", func() { + Eventually(func() error { + return mirrorClusterClient.Get(ctx, name, &corev1.Pod{}) + }).Should(Succeed()) + + }) + + By("Recreating a manually deleted pod in the mirror cluster", func() { + Expect(mirrorClusterClient.Delete(ctx, + &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: name.Namespace, Name: name.Name}}), + ).NotTo(HaveOccurred()) + + Eventually(func() error { + return mirrorClusterClient.Get(ctx, name, &corev1.Pod{}) + }).Should(Succeed()) + + }) + + By("Cleaning up after the reference pod got deleted", func() { + Expect(referenceClusterClient.Delete(ctx, referencePod)).NotTo(HaveOccurred()) + + Eventually(func() bool { + return apierrors.IsNotFound(mirrorClusterClient.Get(ctx, name, &corev1.Pod{})) + }).Should(BeTrue()) + + Eventually(func() bool { + return apierrors.IsNotFound(referenceClusterClient.Get(ctx, name, &corev1.Pod{})) + }).Should(BeTrue()) + }) + }) + + }) + +}) diff --git a/examples/multicluster/reconciler/reconciler.go b/examples/multicluster/reconciler/reconciler.go new file mode 100644 index 0000000000..a8304c4552 --- /dev/null +++ b/examples/multicluster/reconciler/reconciler.go @@ -0,0 +1,125 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/clusterconnector" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func Add(mgr ctrl.Manager, mirrorCluster clusterconnector.ClusterConnector) error { + return ctrl.NewControllerManagedBy(mgr). + // Watch Pods in the reference cluster + For(&corev1.Pod{}). + // Watch pods in the mirror cluster + Watches( + source.NewKindWithCache(&corev1.Pod{}, mirrorCluster.GetCache()), + &handler.EnqueueRequestForObject{}, + ). + Complete(&reconciler{ + referenceClusterClient: mgr.GetClient(), + mirrorClusterClient: mirrorCluster.GetClient(), + }) +} + +type reconciler struct { + referenceClusterClient client.Client + mirrorClusterClient client.Client +} + +func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + return reconcile.Result{}, r.reconcile(req) +} + +const podFinalizerName = "pod-finalzer.mirror.org/v1" + +func (r *reconciler) reconcile(req reconcile.Request) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + referencePod := &corev1.Pod{} + if err := r.referenceClusterClient.Get(ctx, req.NamespacedName, referencePod); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get pod from reference clsuster: %w", err) + } + + if referencePod.DeletionTimestamp != nil && sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) { + mirrorClusterPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Namespace: referencePod.Namespace, + Name: referencePod.Name, + }} + if err := r.mirrorClusterClient.Delete(ctx, mirrorClusterPod); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete pod in mirror cluster: %w", err) + } + + referencePod.Finalizers = sets.NewString(referencePod.Finalizers...).Delete(podFinalizerName).UnsortedList() + if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil { + return fmt.Errorf("failed to update pod in refernce cluster after removing finalizer: %w", err) + } + + return nil + } + + if !sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) { + referencePod.Finalizers = append(referencePod.Finalizers, podFinalizerName) + if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil { + return fmt.Errorf("failed to update pod after adding finalizer: %w", err) + } + } + + // Check if pod already exists + podName := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name} + podExists := true + if err := r.mirrorClusterClient.Get(ctx, podName, &corev1.Pod{}); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to check in mirror cluster if pod exists: %w", err) + } + podExists = false + } + if podExists { + return nil + } + + mirrorPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: referencePod.Namespace, + Name: referencePod.Name, + }, + Spec: *referencePod.Spec.DeepCopy(), + } + if err := r.mirrorClusterClient.Create(ctx, mirrorPod); err != nil { + return fmt.Errorf("failed to create mirror pod: %w", err) + } + + return nil +}