Skip to content

Commit

Permalink
Implementation of Ready status condition (#26)
Browse files Browse the repository at this point in the history
Update cluster Ready status according to StatefulSet status and update
ConfigMap cluster state to `existing` after first time STS is ready
fixes #24
  • Loading branch information
sircthulhu authored Mar 18, 2024
1 parent ae9f1e5 commit 9e35e26
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 29 deletions.
9 changes: 9 additions & 0 deletions api/v1alpha1/etcdcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ const (
EtcdConditionReady = "Ready"
)

type EtcdCondType string

const (
EtcdCondTypeInitStarted EtcdCondType = "InitializationStarted"
EtcdCondTypeInitComplete EtcdCondType = "InitializationComplete"
EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady"
EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady"
)

// EtcdClusterStatus defines the observed state of EtcdCluster
type EtcdClusterStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
Expand Down
109 changes: 81 additions & 28 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"slices"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -71,10 +74,10 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if len(instance.Status.Conditions) == 0 {
instance.Status.Conditions = append(instance.Status.Conditions, metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
Status: "False",
Status: metav1.ConditionFalse,
ObservedGeneration: instance.Generation,
LastTransitionTime: metav1.Now(),
Reason: "InitializationStarted",
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitStarted),
Message: "Cluster initialization has started",
})
}
Expand All @@ -84,41 +87,54 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}()

if err := r.ensureClusterObjects(ctx, instance); err != nil {
// check sts condition
isClusterReady := false
sts := &appsv1.StatefulSet{}
err = r.Get(ctx, client.ObjectKey{
Namespace: instance.Namespace,
Name: instance.Name,
}, sts)
if err == nil {
isClusterReady = sts.Status.ReadyReplicas == *sts.Spec.Replicas
}

if err := r.ensureClusterObjects(ctx, instance, isClusterReady); err != nil {
return ctrl.Result{}, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)
}

if initIdx := slices.IndexFunc(instance.Status.Conditions, func(condition metav1.Condition) bool {
return condition.Type == etcdaenixiov1alpha1.EtcdConditionInitialized
}); initIdx != -1 {
instance.Status.Conditions[initIdx].Status = "True"
instance.Status.Conditions[initIdx].LastTransitionTime = metav1.Now()
instance.Status.Conditions[initIdx].Reason = "InitializationComplete"
instance.Status.Conditions[initIdx].Message = "Cluster initialization is complete"
r.updateClusterState(instance, metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitComplete),
Message: "Cluster initialization is complete",
})
if isClusterReady {
r.updateClusterState(instance, metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeStatefulSetReady),
Message: "Cluster StatefulSet is Ready",
})
} else {
instance.Status.Conditions = append(instance.Status.Conditions, metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
Status: "True",
ObservedGeneration: instance.Generation,
r.updateClusterState(instance, metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionReady,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "InitializationComplete",
Message: "Cluster initialization is complete",
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeStatefulSetNotReady),
Message: "Cluster StatefulSet is not Ready",
})
}

// at this point we should have cluster that can be bootstrapped. We should check if the cluster is ready

// 4. ping cluster to check quorum and number of replica)
// 5. if cluster is ready, change configmap ETCD_INITIAL_CLUSTER_STATE to existing
// 6. mark CR as ready or not ready

return ctrl.Result{}, nil
}

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
func (r *EtcdClusterReconciler) ensureClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
// 1. create or update configmap <name>-cluster-state
if err := r.ensureClusterStateConfigMap(ctx, cluster); err != nil {
if err := r.ensureClusterStateConfigMap(ctx, cluster, isClusterInitialized); err != nil {
return err
}
if err := r.ensureClusterService(ctx, cluster); err != nil {
Expand Down Expand Up @@ -182,14 +198,21 @@ func (r *EtcdClusterReconciler) ensureClusterService(ctx context.Context, cluste

// ensureClusterStateConfigMap creates or updates cluster state configmap.
func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
configMap := &corev1.ConfigMap{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: r.getClusterStateConfigMapName(cluster),
}, configMap)
// configmap exists, skip editing.
if err == nil {
if isClusterInitialized {
// update cluster state to existing
configMap.Data["ETCD_INITIAL_CLUSTER_STATE"] = "existing"
if err = r.Update(ctx, configMap); err != nil {
return fmt.Errorf("cannot update cluster state configmap: %w", err)
}
}
return nil
}

Expand All @@ -212,6 +235,7 @@ func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
}
return nil
}

return fmt.Errorf("cannot get cluster state configmap: %w", err)
}

Expand Down Expand Up @@ -321,10 +345,20 @@ func (r *EtcdClusterReconciler) ensureClusterStatefulSet(
MountPath: "/var/run/etcd",
},
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz?serializable=false",
Port: intstr.FromInt32(2379),
},
},
InitialDelaySeconds: 1,
PeriodSeconds: 5,
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Path: "/livez",
Port: intstr.FromInt32(2379),
},
},
Expand All @@ -334,7 +368,7 @@ func (r *EtcdClusterReconciler) ensureClusterStatefulSet(
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Path: "/readyz",
Port: intstr.FromInt32(2379),
},
},
Expand Down Expand Up @@ -387,9 +421,28 @@ func (r *EtcdClusterReconciler) getClusterStateConfigMapName(cluster *etcdaenixi
return cluster.Name + "-cluster-state"
}

// updateClusterState patches status condition in cluster using merge by Type
func (r *EtcdClusterReconciler) updateClusterState(cluster *etcdaenixiov1alpha1.EtcdCluster, state metav1.Condition) {
if initIdx := slices.IndexFunc(cluster.Status.Conditions, func(condition metav1.Condition) bool {
return condition.Type == state.Type
}); initIdx != -1 {
cluster.Status.Conditions[initIdx].Status = state.Status
cluster.Status.Conditions[initIdx].LastTransitionTime = state.LastTransitionTime
cluster.Status.Conditions[initIdx].ObservedGeneration = cluster.Generation
cluster.Status.Conditions[initIdx].Reason = state.Reason
cluster.Status.Conditions[initIdx].Message = state.Message
} else {
state.ObservedGeneration = cluster.Generation
cluster.Status.Conditions = append(cluster.Status.Conditions, state)
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&etcdaenixiov1alpha1.EtcdCluster{}).
For(&etcdaenixiov1alpha1.EtcdCluster{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.Service{}).
Complete(r)
}
36 changes: 35 additions & 1 deletion internal/controller/etcdcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ var _ = Describe("EtcdCluster Controller", func() {

err = k8sClient.Get(ctx, typeNamespacedName, etcdcluster)
Expect(err).NotTo(HaveOccurred())
Expect(etcdcluster.Status.Conditions).To(HaveLen(1))
Expect(etcdcluster.Status.Conditions).To(HaveLen(2))
Expect(etcdcluster.Status.Conditions[0].Type).To(Equal(etcdaenixiov1alpha1.EtcdConditionInitialized))
Expect(etcdcluster.Status.Conditions[0].Status).To(Equal(metav1.ConditionStatus("True")))
Expect(etcdcluster.Status.Conditions[1].Type).To(Equal(etcdaenixiov1alpha1.EtcdConditionReady))
Expect(etcdcluster.Status.Conditions[1].Status).To(Equal(metav1.ConditionStatus("False")))

// check that ConfigMap is created
cm := &v1.ConfigMap{}
Expand All @@ -115,5 +117,37 @@ var _ = Describe("EtcdCluster Controller", func() {
err = k8sClient.Get(ctx, typeNamespacedName, sts)
Expect(err).NotTo(HaveOccurred(), "cluster statefulset should exist")
})

It("should successfully reconcile the resource twice and mark as ready", func() {
By("Reconciling the created resource twice (second time after marking sts as ready)")
controllerReconciler := &EtcdClusterReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

// check that StatefulSet is created
sts := &appsv1.StatefulSet{}
err = k8sClient.Get(ctx, typeNamespacedName, sts)
Expect(err).NotTo(HaveOccurred(), "cluster statefulset should exist")
// mark sts as ready
sts.Status.ReadyReplicas = int32(etcdcluster.Spec.Replicas)
sts.Status.Replicas = int32(etcdcluster.Spec.Replicas)
Expect(k8sClient.Status().Update(ctx, sts)).To(Succeed())
// reconcile and check EtcdCluster status
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
// check EtcdCluster status
err = k8sClient.Get(ctx, typeNamespacedName, etcdcluster)
Expect(err).NotTo(HaveOccurred())
Expect(etcdcluster.Status.Conditions[1].Type).To(Equal(etcdaenixiov1alpha1.EtcdConditionReady))
Expect(string(etcdcluster.Status.Conditions[1].Status)).To(Equal("True"))
})
})
})

0 comments on commit 9e35e26

Please sign in to comment.