Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Move cluster-specific code out of the manager #950

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ This example implements a *new* Kubernetes resource, ChaosPod, and creates a cus
5. Adds ChaosPod webhooks to manager
6. Starts the manager

### multicluster

This example implements a simplistic controller that watches pods in one cluster (`referencecluster`) and creates
an identical pod for each pod observed in a different cluster (`mirrorcluster`).

* `main.go`: Initialization code
* `main_test.go`: Tests that verify the reconciliation
* `reconciler/reconciler.go`: The actual reconciliation logic

## Deploying and Running

To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html).
To install and run the provided examples, see the Kubebuilder [Quick Start](https://book.kubebuilder.io/quick-start.html).
74 changes: 74 additions & 0 deletions examples/multicluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"os"

"k8s.io/client-go/rest"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/examples/multicluster/reconciler"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
)

func main() {
log := ctrl.Log.WithName("pod-mirror-controller")

referenceClusterCfg, err := config.GetConfigWithContext("reference-cluster")
if err != nil {
log.Error(err, "failed to get reference cluster config")
os.Exit(1)
}

mirrorClusterCfg, err := config.GetConfigWithContext("mirror-cluster")
if err != nil {
log.Error(err, "failed to get mirror cluster config")
os.Exit(1)
}
ctrl.SetupSignalHandler()

if err := run(referenceClusterCfg, mirrorClusterCfg, ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "failed to run controller")
os.Exit(1)
}

log.Info("Finished gracefully")
}

func run(referenceClusterConfig, mirrorClusterConfig *rest.Config, stop <-chan struct{}) error {
mgr, err := ctrl.NewManager(referenceClusterConfig, ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}
clusterConnector, err := clusterconnector.New(mirrorClusterConfig, mgr, "mirror_cluster")
if err != nil {
return fmt.Errorf("failed to construct clusterconnector: %w", err)
}

if err := reconciler.Add(mgr, clusterConnector); err != nil {
return fmt.Errorf("failed to construct reconciler: %w", err)
}

if err := mgr.Start(stop); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}
129 changes: 129 additions & 0 deletions examples/multicluster/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
)

func TestRun(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t, "MirrorPod reconciler Integration Suite", []Reporter{printer.NewlineReporter{}})
}

var _ = Describe("clusterconnector.ClusterConnector", func() {
var stop chan struct{}
var referenceClusterCfg *rest.Config
var mirrorClusterCfg *rest.Config
var referenceClusterTestEnv *envtest.Environment
var mirrorClusterTestEnv *envtest.Environment
var referenceClusterClient client.Client
var mirrorClusterClient client.Client

Describe("multi-cluster-controller", func() {
BeforeEach(func() {
stop = make(chan struct{})
referenceClusterTestEnv = &envtest.Environment{}
mirrorClusterTestEnv = &envtest.Environment{}

var err error
referenceClusterCfg, err = referenceClusterTestEnv.Start()
Expect(err).NotTo(HaveOccurred())

mirrorClusterCfg, err = mirrorClusterTestEnv.Start()
Expect(err).NotTo(HaveOccurred())

referenceClusterClient, err = client.New(referenceClusterCfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

mirrorClusterClient, err = client.New(mirrorClusterCfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

go func() {
run(referenceClusterCfg, mirrorClusterCfg, stop)
}()
})

AfterEach(func() {
close(stop)
Expect(referenceClusterTestEnv.Stop()).To(Succeed())
Expect(mirrorClusterTestEnv.Stop()).To(Succeed())
})

It("Should reconcile pods", func() {
ctx := context.Background()
referencePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "satan",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "fancy-one",
Image: "nginx",
}},
},
}
Expect(referenceClusterClient.Create(ctx, referencePod)).NotTo(HaveOccurred())
name := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name}

By("Setting a finalizer", func() {
Eventually(func() error {
updatedPod := &corev1.Pod{}
if err := referenceClusterClient.Get(ctx, name, updatedPod); err != nil {
return err
}
if n := len(updatedPod.Finalizers); n != 1 {
return fmt.Errorf("expected exactly one finalizer, got %d", n)
}
return nil
}).Should(Succeed())

})

By("Creating a pod in the mirror cluster", func() {
Eventually(func() error {
return mirrorClusterClient.Get(ctx, name, &corev1.Pod{})
}).Should(Succeed())

})

By("Recreating a manually deleted pod in the mirror cluster", func() {
Expect(mirrorClusterClient.Delete(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: name.Namespace, Name: name.Name}}),
).NotTo(HaveOccurred())

Eventually(func() error {
return mirrorClusterClient.Get(ctx, name, &corev1.Pod{})
}).Should(Succeed())

})

By("Cleaning up after the reference pod got deleted", func() {
Expect(referenceClusterClient.Delete(ctx, referencePod)).NotTo(HaveOccurred())

Eventually(func() bool {
return apierrors.IsNotFound(mirrorClusterClient.Get(ctx, name, &corev1.Pod{}))
}).Should(BeTrue())

Eventually(func() bool {
return apierrors.IsNotFound(referenceClusterClient.Get(ctx, name, &corev1.Pod{}))
}).Should(BeTrue())
})
})

})

})
125 changes: 125 additions & 0 deletions examples/multicluster/reconciler/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/clusterconnector"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func Add(mgr ctrl.Manager, mirrorCluster clusterconnector.ClusterConnector) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch Pods in the reference cluster
For(&corev1.Pod{}).
// Watch pods in the mirror cluster
Watches(
source.NewKindWithCache(&corev1.Pod{}, mirrorCluster.GetCache()),
&handler.EnqueueRequestForObject{},
).
Complete(&reconciler{
referenceClusterClient: mgr.GetClient(),
mirrorClusterClient: mirrorCluster.GetClient(),
})
}

type reconciler struct {
referenceClusterClient client.Client
mirrorClusterClient client.Client
}

func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, r.reconcile(req)
}

const podFinalizerName = "pod-finalzer.mirror.org/v1"

func (r *reconciler) reconcile(req reconcile.Request) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

referencePod := &corev1.Pod{}
if err := r.referenceClusterClient.Get(ctx, req.NamespacedName, referencePod); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get pod from reference clsuster: %w", err)
}

if referencePod.DeletionTimestamp != nil && sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) {
mirrorClusterPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Namespace: referencePod.Namespace,
Name: referencePod.Name,
}}
if err := r.mirrorClusterClient.Delete(ctx, mirrorClusterPod); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete pod in mirror cluster: %w", err)
}

referencePod.Finalizers = sets.NewString(referencePod.Finalizers...).Delete(podFinalizerName).UnsortedList()
if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil {
return fmt.Errorf("failed to update pod in refernce cluster after removing finalizer: %w", err)
}

return nil
}

if !sets.NewString(referencePod.Finalizers...).Has(podFinalizerName) {
referencePod.Finalizers = append(referencePod.Finalizers, podFinalizerName)
if err := r.referenceClusterClient.Update(ctx, referencePod); err != nil {
return fmt.Errorf("failed to update pod after adding finalizer: %w", err)
}
}

// Check if pod already exists
podName := types.NamespacedName{Namespace: referencePod.Namespace, Name: referencePod.Name}
podExists := true
if err := r.mirrorClusterClient.Get(ctx, podName, &corev1.Pod{}); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to check in mirror cluster if pod exists: %w", err)
}
podExists = false
}
if podExists {
return nil
}

mirrorPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: referencePod.Namespace,
Name: referencePod.Name,
},
Spec: *referencePod.Spec.DeepCopy(),
}
if err := r.mirrorClusterClient.Create(ctx, mirrorPod); err != nil {
return fmt.Errorf("failed to create mirror pod: %w", err)
}

return nil
}
Loading