diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index eb6fe6b6d..b0b26208e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -19,13 +19,11 @@ import ( "encoding/json" "fmt" "math" - "net/url" "reflect" "strings" "time" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" - "github.com/coreos/etcd-operator/pkg/backup/backupapi" "github.com/coreos/etcd-operator/pkg/debug" "github.com/coreos/etcd-operator/pkg/garbagecollection" "github.com/coreos/etcd-operator/pkg/generated/clientset/versioned" @@ -33,6 +31,7 @@ import ( "github.com/coreos/etcd-operator/pkg/util/k8sutil" "github.com/coreos/etcd-operator/pkg/util/retryutil" + "github.com/pborman/uuid" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -87,8 +86,6 @@ type Cluster struct { // process runs in. members etcdutil.MemberSet - bm *backupManager - tlsConfig *tls.Config gc *garbagecollection.GC @@ -134,7 +131,6 @@ func New(config Config, cl *api.EtcdCluster) *Cluster { } func (c *Cluster) setup() error { - var err error var shouldCreateCluster bool switch c.status.Phase { case api.ClusterPhaseNone: @@ -159,19 +155,6 @@ func (c *Cluster) setup() error { } } - if c.cluster.Spec.Backup != nil { - c.bm, err = newBackupManager(c.config, c.cluster, c.logger) - if err != nil { - return err - } - if !shouldCreateCluster { - err := c.bm.upgradeIfNeeded() - if err != nil { - return err - } - } - } - if shouldCreateCluster { return c.create() } @@ -188,21 +171,7 @@ func (c *Cluster) create() error { c.gc.CollectCluster(c.cluster.Name, c.cluster.UID) - if c.bm != nil { - if err := c.bm.setup(); err != nil { - return err - } - } - - if c.cluster.Spec.Restore == nil { - // Note: For restore case, we don't need to create seed member, - // and will go through reconcile loop and disaster recovery. - if err := c.prepareSeedMember(); err != nil { - return err - } - } - - return nil + return c.prepareSeedMember() } func (c *Cluster) prepareSeedMember() error { @@ -305,12 +274,8 @@ func (c *Cluster) run() { continue } if len(running) == 0 { - c.logger.Warningf("all etcd pods are dead. Trying to recover from a previous backup") - rerr = c.disasterRecovery(nil) - if rerr != nil { - c.logger.Errorf("fail to do disaster recovery: %v", rerr) - } - // On normal recovery case, we need backoff. On error case, this could be either backoff or leading to cluster delete. + // TODO: how to handle this case? + c.logger.Warningf("all etcd pods are dead.") break } @@ -327,10 +292,6 @@ func (c *Cluster) run() { c.logger.Errorf("failed to reconcile: %v", rerr) break } - - if err := c.updateLocalBackupStatus(); err != nil { - c.logger.Warningf("failed to update local backup service status: %v", err) - } c.updateMemberStatus(c.members) if err := c.updateCRStatus(); err != nil { c.logger.Warningf("periodic update CR status failed: %v", err) @@ -365,34 +326,6 @@ func (c *Cluster) handleUpdateEvent(event *clusterEvent) error { // TODO: we can't handle another upgrade while an upgrade is in progress c.logSpecUpdate(*oldSpec, event.cluster.Spec) - - ob, nb := oldSpec.Backup, event.cluster.Spec.Backup - if !isBackupPolicyEqual(ob, nb) { - err := c.updateBackupPolicy(ob, nb) - if err != nil { - return fmt.Errorf("failed to update backup policy: %v", err) - } - } - return nil -} - -func (c *Cluster) updateBackupPolicy(ob, nb *api.BackupPolicy) error { - var err error - switch { - case ob == nil && nb != nil: - c.bm, err = newBackupManager(c.config, c.cluster, c.logger) - if err != nil { - return err - } - return c.bm.setup() - case ob != nil && nb == nil: - c.bm.deleteBackupSidecar() - c.bm = nil - case ob != nil && nb != nil: - return c.bm.updateSidecar(c.cluster) - default: - panic("unexpected backup spec comparison") - } return nil } @@ -400,14 +333,10 @@ func isSpecEqual(s1, s2 api.ClusterSpec) bool { if s1.Size != s2.Size || s1.Paused != s2.Paused || s1.Version != s2.Version { return false } - return isBackupPolicyEqual(s1.Backup, s2.Backup) + return true } -func isBackupPolicyEqual(b1, b2 *api.BackupPolicy) bool { - return reflect.DeepEqual(b1, b2) -} - -func (c *Cluster) startSeedMember(recoverFromBackup bool) error { +func (c *Cluster) startSeedMember() error { m := &etcdutil.Member{ Name: etcdutil.CreateMemberName(c.cluster.Name, c.memberCounter), Namespace: c.cluster.Namespace, @@ -415,7 +344,7 @@ func (c *Cluster) startSeedMember(recoverFromBackup bool) error { SecureClient: c.isSecureClient(), } ms := etcdutil.NewMemberSet(m) - if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil { + if err := c.createPod(ms, m, "new"); err != nil { return fmt.Errorf("failed to create seed member (%s): %v", m.Name, err) } c.memberCounter++ @@ -439,12 +368,7 @@ func (c *Cluster) isSecureClient() bool { // bootstrap creates the seed etcd member for a new cluster. func (c *Cluster) bootstrap() error { - return c.startSeedMember(false) -} - -// recover recovers the cluster by creating a seed etcd member from a backup. -func (c *Cluster) recover() error { - return c.startSeedMember(true) + return c.startSeedMember() } func (c *Cluster) Update(cl *api.EtcdCluster) { @@ -456,14 +380,6 @@ func (c *Cluster) Update(cl *api.EtcdCluster) { func (c *Cluster) delete() { c.gc.CollectCluster(c.cluster.Name, garbagecollection.NullUID) - - if c.bm == nil { - return - } - - if err := c.bm.cleanup(); err != nil { - c.logger.Errorf("cluster deletion: backup manager failed to cleanup: %v", err) - } } func (c *Cluster) setupServices() error { @@ -475,18 +391,8 @@ func (c *Cluster) setupServices() error { return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) } -func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error { - var pod *v1.Pod - if state == "new" { - var backupURL *url.URL - if needRecovery { - serviceAddr := k8sutil.BackupServiceAddr(c.cluster.Name) - backupURL = backupapi.NewBackupURL("http", serviceAddr, c.cluster.Spec.Version, -1) - } - pod = k8sutil.NewSeedMemberPod(c.cluster.Name, members, m, c.cluster.Spec, c.cluster.AsOwner(), backupURL) - } else { - pod = k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, "", c.cluster.Spec, c.cluster.AsOwner()) - } +func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error { + pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner()) _, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod) return err } @@ -572,20 +478,6 @@ func (c *Cluster) updateCRStatus() error { return nil } -func (c *Cluster) updateLocalBackupStatus() error { - if c.bm == nil { - return nil - } - - bs, err := c.bm.getStatus() - if err != nil { - return err - } - c.status.BackupServiceStatus = backupServiceStatusToTPRBackupServiceStatu(bs) - - return nil -} - func (c *Cluster) reportFailedStatus() { retryInterval := 5 * time.Second diff --git a/pkg/cluster/reconcile.go b/pkg/cluster/reconcile.go index 97bb22763..5e6821a59 100644 --- a/pkg/cluster/reconcile.go +++ b/pkg/cluster/reconcile.go @@ -29,6 +29,9 @@ import ( "k8s.io/api/core/v1" ) +// ErrLostQuorum indicates that the etcd cluster lost its quorum. +var ErrLostQuorum = errors.New("lost quorum") + // reconcile reconciles cluster current state to desired state specified by spec. // - it tries to reconcile the cluster to desired size. // - if the cluster needs for upgrade, it tries to upgrade old member one by one. @@ -68,7 +71,7 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error { // 1. Remove all pods from running set that does not belong to member set. // 2. L consist of remaining pods of runnings // 3. If L = members, the current state matches the membership state. END. -// 4. If len(L) < len(members)/2 + 1, quorum lost. Go to recovery process. +// 4. If len(L) < len(members)/2 + 1, return quorum lost error. // 5. Add one missing member. END. func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error { c.logger.Infof("running members: %s", running) @@ -90,8 +93,8 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error { } if L.Size() < c.members.Size()/2+1 { - c.logger.Infof("Disaster recovery") - return c.disasterRecovery(L) + c.logger.Infof("lost quorum") + return ErrLostQuorum } c.logger.Infof("removing one dead member") @@ -139,7 +142,7 @@ func (c *Cluster) addOneMember() error { newMember.ID = resp.Member.ID c.members.Add(newMember) - if err := c.createPod(c.members, newMember, "existing", false); err != nil { + if err := c.createPod(c.members, newMember, "existing"); err != nil { return fmt.Errorf("fail to create member's pod (%s): %v", newMember.Name, err) } c.memberCounter++ @@ -213,51 +216,6 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error { return nil } -func (c *Cluster) disasterRecovery(left etcdutil.MemberSet) error { - c.status.SetRecoveringCondition() - - if c.cluster.Spec.SelfHosted != nil { - return errors.New("self-hosted cluster cannot be recovered from disaster") - } - - if c.cluster.Spec.Backup == nil { - return newFatalError("fail to do disaster recovery: no backup policy has been defined") - } - - backupNow := false - if len(left) > 0 { - c.logger.Infof("pods are still running (%v). Will try to make a latest backup from one of them.", left) - err := c.bm.requestBackup() - if err != nil { - c.logger.Errorln(err) - } else { - backupNow = true - } - } - if backupNow { - c.logger.Info("made a latest backup") - } else { - // We don't return error if backupnow failed. Instead, we ask if there is previous backup. - // If so, we can still continue. Otherwise, it's fatal error. - exist, err := c.bm.checkBackupExist(c.cluster.Spec.Version) - if err != nil { - c.logger.Errorln(err) - return err - } - if !exist { - return newFatalError("no backup exist for disaster recovery") - } - } - - for _, m := range left { - err := c.removePod(m.Name) - if err != nil { - return err - } - } - return c.recover() -} - func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool { return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil }