Skip to content

Commit

Permalink
Add multicluster controller example
Browse files Browse the repository at this point in the history
  • Loading branch information
alvaroaleman committed May 15, 2020
1 parent 26660e7 commit aede84b
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 1 deletion.
11 changes: 10 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ This example implements a *new* Kubernetes resource, ChaosPod, and creates a cus
5. Adds ChaosPod webhooks to manager
6. Starts the manager

### multicluster

This example implements a simplistic controller that watches pods in one cluster (`referencecluster`) and creates
an identical pod for each pod observed in a different cluster (`mirrorcluster`).

* `main.go`: Initialization code
* `main_test.go`: Tests that verify the reconciliation
* `reconciler/reconciler.go`: The actual reconciliation logic

## Deploying and Running

To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html).
To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html).
74 changes: 74 additions & 0 deletions examples/multicluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"os"

"k8s.io/client-go/rest"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/examples/multicluster/reconciler"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
)

func main() {
log := ctrl.Log.WithName("pod-mirror-controller")

referenceClusterCfg, err := config.GetConfigWithContext("reference-cluster")
if err != nil {
log.Error(err, "failed to get reference cluster config")
os.Exit(1)
}

mirrorClusterCfg, err := config.GetConfigWithContext("mirror-cluster")
if err != nil {
log.Error(err, "failed to get mirror cluster config")
os.Exit(1)
}
ctrl.SetupSignalHandler()

if err := run(referenceClusterCfg, mirrorClusterCfg, ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "failed to run controller")
os.Exit(1)
}

log.Info("Finished gracefully")
}

func run(referenceClusterConfig, mirrorClusterConfig *rest.Config, stop <-chan struct{}) error {
mgr, err := ctrl.NewManager(referenceClusterConfig, ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}
clusterConnector, err := clusterconnector.New(mirrorClusterConfig, mgr, "mirror_cluster")
if err != nil {
return fmt.Errorf("failed to construct clusterconnector: %w", err)
}

if err := reconciler.Add(mgr, clusterConnector); err != nil {
return fmt.Errorf("failed to construct reconciler: %w", err)
}

if err := mgr.Start(stop); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}
129 changes: 129 additions & 0 deletions examples/multicluster/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
)

func TestRun(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t, "MirrorPod reconciler Integration Suite", []Reporter{printer.NewlineReporter{}})
}

var _ = Describe("clusterconnector.ClusterConnector", func() {
var stop chan struct{}
var referenceClusterCfg *rest.Config
var mirrorClusterCfg *rest.Config
var referenceClusterTestEnv *envtest.Environment
var mirrorClusterTestEnv *envtest.Environment
var referenceClusterClient client.Client
var mirrorClusterClient client.Client

Describe("multi-cluster-controller", func() {
BeforeEach(func() {
stop = make(chan struct{})
referenceClusterTestEnv = &envtest.Environment{}
mirrorClusterTestEnv = &envtest.Environment{}

var err error
referenceClusterCfg, err = referenceClusterTestEnv.Start()
Expect(err).NotTo(HaveOccurred())

mirrorClusterCfg, err = mirrorClusterTestEnv.Start()
Expect(err).NotTo(HaveOccurred())

referenceClusterClient, err = client.New(referenceClusterCfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

mirrorClusterClient, err = client.New(mirrorClusterCfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

go func() {
run(referenceClusterCfg, mirrorClusterCfg, stop)
}()
})

AfterEach(func() {
close(stop)
Expect(referenceClusterTestEnv.Stop()).To(Succeed())
Expect(mirrorClusterTestEnv.Stop()).To(Succeed())
})

It("Should reconcile pods", func() {
ctx := context.Background()
referencePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "satan",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "fancy-one",
Image: "nginx",
}},
},
}
Expect(referenceClusterClient.Create(ctx, referencePod)).NotTo(HaveOccurred())
name := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name}

By("Setting a finalizer", func() {
Eventually(func() error {
updatedPod := &corev1.Pod{}
if err := referenceClusterClient.Get(ctx, name, updatedPod); err != nil {
return err
}
if n := len(updatedPod.Finalizers); n != 1 {
return fmt.Errorf("expected exactly one finalizer, got %d", n)
}
return nil
}).Should(Succeed())

})

By("Creating a pod in the mirror cluster", func() {
Eventually(func() error {
return mirrorClusterClient.Get(ctx, name, &corev1.Pod{})
}).Should(Succeed())

})

By("Recreating a manually deleted pod in the mirror cluster", func() {
Expect(mirrorClusterClient.Delete(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: name.Namespace, Name: name.Name}}),
).NotTo(HaveOccurred())

Eventually(func() error {
return mirrorClusterClient.Get(ctx, name, &corev1.Pod{})
}).Should(Succeed())

})

By("Cleaning up after the reference pod got deleted", func() {
Expect(referenceClusterClient.Delete(ctx, referencePod)).NotTo(HaveOccurred())

Eventually(func() bool {
return apierrors.IsNotFound(mirrorClusterClient.Get(ctx, name, &corev1.Pod{}))
}).Should(BeTrue())

Eventually(func() bool {
return apierrors.IsNotFound(referenceClusterClient.Get(ctx, name, &corev1.Pod{}))
}).Should(BeTrue())
})
})

})

})
125 changes: 125 additions & 0 deletions examples/multicluster/reconciler/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func Add(mgr ctrl.Manager, mirrorCluster clusterconnector.ClusterConnector) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch Pods in the reference cluster
For(&corev1.Pod{}).
// Watch pods in the mirror cluster
Watches(
source.NewKindWithCache(&corev1.Pod{}, mirrorCluster.GetCache()),
&handler.EnqueueRequestForObject{},
).
Complete(&reconciler{
referenceClusterClient: mgr.GetClient(),
mirrorClusterClient: mirrorCluster.GetClient(),
})
}

type reconciler struct {
referenceClusterClient client.Client
mirrorClusterClient client.Client
}

func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, r.reconcile(req)
}

const podFinalizerName = "pod-finalzer.mirror.org/v1"

func (r *reconciler) reconcile(req reconcile.Request) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

referencePod := &corev1.Pod{}
if err := r.referenceClusterClient.Get(ctx, req.NamespacedName, referencePod); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get pod from reference clsuster: %w", err)
}

if referencePod.DeletionTimestamp != nil && sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) {
mirrorClusterPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Namespace: referencePod.Namespace,
Name: referencePod.Name,
}}
if err := r.mirrorClusterClient.Delete(ctx, mirrorClusterPod); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete pod in mirror cluster: %w", err)
}

referencePod.Finalizers = sets.NewString(referencePod.Finalizers...).Delete(podFinalizerName).UnsortedList()
if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil {
return fmt.Errorf("failed to update pod in refernce cluster after removing finalizer: %w", err)
}

return nil
}

if !sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) {
referencePod.Finalizers = append(referencePod.Finalizers, podFinalizerName)
if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil {
return fmt.Errorf("failed to update pod after adding finalizer: %w", err)
}
}

// Check if pod already exists
podName := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name}
podExists := true
if err := r.mirrorClusterClient.Get(ctx, podName, &corev1.Pod{}); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to check in mirror cluster if pod exists: %w", err)
}
podExists = false
}
if podExists {
return nil
}

mirrorPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: referencePod.Namespace,
Name: referencePod.Name,
},
Spec: *referencePod.Spec.DeepCopy(),
}
if err := r.mirrorClusterClient.Create(ctx, mirrorPod); err != nil {
return fmt.Errorf("failed to create mirror pod: %w", err)
}

return nil
}

0 comments on commit aede84b

Please sign in to comment.