Skip to content

Commit

Permalink
feat: sync CR status
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsxu committed Feb 15, 2022
1 parent 9165c3b commit 69fc89a
Showing 1 changed file with 92 additions and 11 deletions.
103 changes: 92 additions & 11 deletions controllers/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrl_log "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -55,7 +59,7 @@ type ZookeeperClusterReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Logger = ctrl_log.FromContext(ctx)
r.Logger.Info("starting reconcile")
r.Logger.Info("Starting reconcile")

zk := &zookeeperv1alpha1.ZookeeperCluster{}

Expand All @@ -64,25 +68,102 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, client.IgnoreNotFound(err)
}

svcHeadless := r.createHeadlessService(zk)
if err := r.Create(ctx, svcHeadless); err != nil {
r.Logger.Error(err, "creating headless service")
r.Logger.Info("Creating headless service")
desiredServiceHeadless := r.createHeadlessService(zk)
actualServiceHeadless := &corev1.Service{}

if err := r.Get(ctx, types.NamespacedName{
Namespace: desiredServiceHeadless.Namespace,
Name: desiredServiceHeadless.Name,
}, actualServiceHeadless); err != nil {
// Not Found. Create
if apierrors.IsNotFound(err) {
if err = r.Create(ctx, desiredServiceHeadless); err != nil {
r.Logger.Error(err, "creating headless service")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
} else {
// TODO: Found. Update
}

r.Logger.Info("Creating statefulset")
desiredStatefulSet := r.createStatefulSet(zk)
actualStatefulSet := &appsv1.StatefulSet{}

if err := r.Get(ctx, types.NamespacedName{
Namespace: desiredStatefulSet.Namespace,
Name: desiredStatefulSet.Name,
}, actualStatefulSet); err != nil {
// Not Found. Create
if apierrors.IsNotFound(err) {
if err = r.Create(ctx, desiredStatefulSet); err != nil {
r.Logger.Error(err, "creating statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
} else {
// TODO: Found. Update
}

r.Logger.Info("Creating client service")
desiredServiceClient := r.createService(zk)
actualServiceClient := &corev1.Service{}

if err := r.Get(ctx, types.NamespacedName{
Namespace: desiredServiceClient.Namespace,
Name: desiredServiceClient.Name,
}, actualServiceClient); err != nil {
if apierrors.IsNotFound(err) {
if err = r.Create(ctx, desiredServiceClient); err != nil {
r.Logger.Error(err, "creating client service")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
} else {
// TODO: Found. Update
}

r.Logger.Info("Updating zk status")
pods := &corev1.PodList{}
if err := r.List(ctx, pods, &client.ListOptions{
Namespace: zk.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{"app": zk.Name}),
}); err != nil {
r.Logger.Error(err, "list pods")
return ctrl.Result{}, err
}

sts := r.createStatefulSet(zk)
if err := r.Create(ctx, sts); err != nil {
r.Logger.Error(err, "creating statefulset")
if err := r.Get(ctx, types.NamespacedName{
Namespace: desiredStatefulSet.Namespace,
Name: desiredStatefulSet.Name,
}, actualStatefulSet); err != nil {
r.Logger.Error(err, "get client service")
return ctrl.Result{}, err
}

svcClient := r.createService(zk)
if err := r.Create(ctx, svcClient); err != nil {
r.Logger.Error(err, "creating client service")
zk.Status.ReadyReplicas = int32(len(pods.Items))
if len(pods.Items) > 0 && len(pods.Items[0].Status.HostIP) > 0 {
zk.Status.Address = fmt.Sprintf("%s:%d", pods.Items[0].Status.HostIP, actualServiceClient.Spec.Ports[0].NodePort)
}

if err := r.Status().Update(ctx, zk); err != nil {
r.Logger.Error(err, "update zk status")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
if zk.Spec.Replicas == int32(len(pods.Items)) {
r.Logger.Info("Stop reconciling")
return ctrl.Result{}, nil
}

return ctrl.Result{RequeueAfter: time.Second}, nil
}

func (r *ZookeeperClusterReconciler) createHeadlessService(zk *zookeeperv1alpha1.ZookeeperCluster) *corev1.Service {
Expand Down

0 comments on commit 69fc89a

Please sign in to comment.