diff --git a/api/v1alpha1/etcdcluster_types.go b/api/v1alpha1/etcdcluster_types.go index 71cf65a6..bd7ef6d7 100644 --- a/api/v1alpha1/etcdcluster_types.go +++ b/api/v1alpha1/etcdcluster_types.go @@ -55,6 +55,7 @@ type EtcdClusterSpec struct { const ( EtcdConditionInitialized = "Initialized" EtcdConditionReady = "Ready" + EtcdConditionError = "Error" ) type EtcdCondType string @@ -66,6 +67,7 @@ const ( EtcdCondTypeWaitingForFirstQuorum EtcdCondType = "WaitingForFirstQuorum" EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady" EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady" + EtcdCondTypeSplitbrain EtcdCondType = "Splitbrain" ) const ( @@ -74,6 +76,7 @@ const ( EtcdReadyCondNegMessage EtcdCondMessage = "Cluster StatefulSet is not Ready" EtcdReadyCondPosMessage EtcdCondMessage = "Cluster StatefulSet is Ready" EtcdReadyCondNegWaitingForQuorum EtcdCondMessage = "Waiting for first quorum to be established" + EtcdErrorCondSplitbrainMessage EtcdCondMessage = "Etcd endpoints reporting more than one unique cluster ID" ) // EtcdClusterStatus defines the observed state of EtcdCluster diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 1d3153cd..69ccd7d7 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -16,6 +16,14 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 316d23ea..87aeab69 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -25,6 +25,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/aenix-io/etcd-operator/internal/log" @@ -47,6 +48,10 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) +const ( + etcdDefaultTimeout = 5 * time.Second +) + // EtcdClusterReconciler reconciles a EtcdCluster object type EtcdClusterReconciler struct { client.Client @@ -56,6 +61,7 @@ type EtcdClusterReconciler struct { // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch // +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch // +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch @@ -80,13 +86,68 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } + state := observables{} + + // create two services and the pdb + err = r.ensureUnconditionalObjects(ctx, instance) + if err != nil { + return ctrl.Result{}, err + } + + // fetch STS if exists + err = r.Get(ctx, req.NamespacedName, &state.statefulSet) + if client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err) + } + state.stsExists = state.statefulSet.UID != "" + + // fetch endpoints + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + state.endpointsFound = clusterClient != nil && singleClients != nil + + if !state.endpointsFound { + if !state.stsExists { + // TODO: happy path for new cluster creation + log.Debug(ctx, "happy path for new cluster creation (not yet implemented)") + } + } + + // get status of every endpoint and member list from every endpoint + state.etcdStatuses = make([]etcdStatus, len(singleClients)) + { + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) + for i := range singleClients { + wg.Add(1) + go func(i int) { + defer wg.Done() + state.etcdStatuses[i].fill(ctx, singleClients[i]) + }(i) + } + wg.Wait() + cancel() + } + state.setClusterID() + if state.inSplitbrain() { + log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue") + factory.SetCondition(instance, factory.NewCondition(etcdaenixiov1alpha1.EtcdConditionError). + WithStatus(true). + WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain)). + WithMessage(string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage)). + Complete(), + ) + return r.updateStatus(ctx, instance) + } // fill conditions if len(instance.Status.Conditions) == 0 { factory.FillConditions(instance) } // ensure managed resources - if err = r.ensureClusterObjects(ctx, instance); err != nil { + if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil { return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) } @@ -138,8 +199,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.updateStatus(ctx, instance) } -// ensureClusterObjects creates or updates all objects owned by cluster CR -func (r *EtcdClusterReconciler) ensureClusterObjects( +// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR +func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil { @@ -148,30 +209,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects( } log.Debug(ctx, "cluster state configmap reconciled") - if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile headless service failed") - return err - } - log.Debug(ctx, "headless service reconciled") - if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil { log.Error(ctx, err, "reconcile statefulset failed") return err } log.Debug(ctx, "statefulset reconciled") - if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile client service failed") - return err - } - log.Debug(ctx, "client service reconciled") - - if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile pdb failed") - return err - } - log.Debug(ctx, "pdb reconciled") - return nil } @@ -498,3 +541,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie return nil } + +// ensureUnconditionalObjects creates the two services and the PDB +// which can be created at the start of the reconciliation loop +// without any risk of disrupting the etcd cluster +func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error { + const concurrentOperations = 3 + c := make(chan error) + defer close(c) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(concurrentOperations) + wrapWithMsg := func(err error, msg string) error { + if err != nil { + return fmt.Errorf(msg+": %w", err) + } + return nil + } + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client), + "couldn't ensure client service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client), + "couldn't ensure headless service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client), + "couldn't ensure pod disruption budget"): + } + }(c) + + for i := 0; i < concurrentOperations; i++ { + if err := <-c; err != nil { + cancel() + + // let all goroutines select the ctx.Done() case to avoid races on closed channels + wg.Wait() + return err + } + } + return nil +} diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go new file mode 100644 index 00000000..4725171b --- /dev/null +++ b/internal/controller/factory/etcd_client.go @@ -0,0 +1,63 @@ +package factory + +import ( + "context" + "fmt" + + "github.com/aenix-io/etcd-operator/api/v1alpha1" + clientv3 "go.etcd.io/etcd/client/v3" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) { + cfg, err := configFromCluster(ctx, cluster, cli) + if err != nil { + return nil, nil, err + } + if len(cfg.Endpoints) == 0 { + return nil, nil, nil + } + eps := cfg.Endpoints + clusterClient, err := clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err) + } + singleClients := make([]*clientv3.Client, len(eps)) + for i, ep := range eps { + cfg.Endpoints = []string{ep} + singleClients[i], err = clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err) + } + } + return clusterClient, singleClients, nil +} + +func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) { + ep := v1.Endpoints{} + err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) + if client.IgnoreNotFound(err) != nil { + return clientv3.Config{}, err + } + if err != nil { + return clientv3.Config{Endpoints: []string{}}, nil + } + + names := map[string]struct{}{} + urls := make([]string, 0, 8) + for _, v := range ep.Subsets { + for _, addr := range v.Addresses { + names[addr.Hostname] = struct{}{} + } + for _, addr := range v.NotReadyAddresses { + names[addr.Hostname] = struct{}{} + } + } + for name := range names { + urls = append(urls, fmt.Sprintf("%s:%s", name, "2379")) + } + + return clientv3.Config{Endpoints: urls}, nil +} diff --git a/internal/controller/observables.go b/internal/controller/observables.go new file mode 100644 index 00000000..adbaba3f --- /dev/null +++ b/internal/controller/observables.go @@ -0,0 +1,80 @@ +package controller + +import ( + "context" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +// etcdStatus holds the details of the status that an etcd endpoint +// can return about itself, i.e. its own status and its perceived +// member list +type etcdStatus struct { + endpointStatus *clientv3.StatusResponse + endpointStatusError error + memberList *clientv3.MemberListResponse + memberListError error +} + +// observables stores observations that the operator can make about +// states of objects in kubernetes +type observables struct { + statefulSet appsv1.StatefulSet + stsExists bool + endpointsFound bool + etcdStatuses []etcdStatus + clusterID uint64 + _ int + _ []corev1.PersistentVolumeClaim +} + +// setClusterID populates the clusterID field based on etcdStatuses +func (o *observables) setClusterID() { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].endpointStatus != nil { + o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId + return + } + } +} + +// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses. +// If more than one unique ID is reported, cluster is in splitbrain. +func (o *observables) inSplitbrain() bool { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].endpointStatus != nil { + if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId { + return true + } + } + } + return false +} + +// fill takes a single-endpoint client and populates the fields of etcdStatus +// with the endpoint's status and its perceived member list. +func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0]) + }() + s.memberList, s.memberListError = c.MemberList(ctx) + wg.Wait() +} + +// TODO: make a real function +func (o *observables) _() int { + if o.etcdStatuses != nil { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].memberList != nil { + return len(o.etcdStatuses[i].memberList.Members) + } + } + } + return 0 +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index aadc826f..8e5578f6 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "os" "os/exec" "sync" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -95,7 +96,7 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) - By("wait for statefulset is ready", func() { + Eventually(func() error { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.readyReplicas}=3", @@ -103,8 +104,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred()) @@ -144,7 +145,7 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) - By("wait for statefulset is ready", func() { + Eventually(func() error { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.readyReplicas}=3", @@ -152,8 +153,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred()) @@ -192,7 +193,7 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) - By("wait for statefulset is ready", func() { + Eventually(func() error { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.availableReplicas}=3", @@ -200,8 +201,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred()) @@ -217,8 +218,10 @@ var _ = Describe("etcd-operator", Ordered, func() { auth := clientv3.NewAuth(client) By("check root role is created", func() { - _, err = auth.RoleGet(ctx, "root") - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + _, err = auth.RoleGet(ctx, "root") + return err + }, time.Second*20, time.Second*2).Should(Succeed()) }) By("check root user is created and has root role", func() {