Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
pkg/cluster: remove disaster recovery/restore logic from pkg/cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Nov 7, 2017
1 parent 18a37e5 commit 472ee98
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 167 deletions.
128 changes: 10 additions & 118 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ import (
"encoding/json"
"fmt"
"math"
"net/url"
"reflect"
"strings"
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/backup/backupapi"
"github.com/coreos/etcd-operator/pkg/debug"
"github.com/coreos/etcd-operator/pkg/garbagecollection"
"github.com/coreos/etcd-operator/pkg/generated/clientset/versioned"
"github.com/coreos/etcd-operator/pkg/util/etcdutil"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
"github.com/coreos/etcd-operator/pkg/util/retryutil"

"github.com/pborman/uuid"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -87,8 +86,6 @@ type Cluster struct {
// process runs in.
members etcdutil.MemberSet

bm *backupManager

tlsConfig *tls.Config

gc *garbagecollection.GC
Expand Down Expand Up @@ -134,7 +131,6 @@ func New(config Config, cl *api.EtcdCluster) *Cluster {
}

func (c *Cluster) setup() error {
var err error
var shouldCreateCluster bool
switch c.status.Phase {
case api.ClusterPhaseNone:
Expand All @@ -159,19 +155,6 @@ func (c *Cluster) setup() error {
}
}

if c.cluster.Spec.Backup != nil {
c.bm, err = newBackupManager(c.config, c.cluster, c.logger)
if err != nil {
return err
}
if !shouldCreateCluster {
err := c.bm.upgradeIfNeeded()
if err != nil {
return err
}
}
}

if shouldCreateCluster {
return c.create()
}
Expand All @@ -188,21 +171,7 @@ func (c *Cluster) create() error {

c.gc.CollectCluster(c.cluster.Name, c.cluster.UID)

if c.bm != nil {
if err := c.bm.setup(); err != nil {
return err
}
}

if c.cluster.Spec.Restore == nil {
// Note: For restore case, we don't need to create seed member,
// and will go through reconcile loop and disaster recovery.
if err := c.prepareSeedMember(); err != nil {
return err
}
}

return nil
return c.prepareSeedMember()
}

func (c *Cluster) prepareSeedMember() error {
Expand Down Expand Up @@ -305,12 +274,8 @@ func (c *Cluster) run() {
continue
}
if len(running) == 0 {
c.logger.Warningf("all etcd pods are dead. Trying to recover from a previous backup")
rerr = c.disasterRecovery(nil)
if rerr != nil {
c.logger.Errorf("fail to do disaster recovery: %v", rerr)
}
// On normal recovery case, we need backoff. On error case, this could be either backoff or leading to cluster delete.
// TODO: how to handle this case?
c.logger.Warningf("all etcd pods are dead.")
break
}

Expand All @@ -327,10 +292,6 @@ func (c *Cluster) run() {
c.logger.Errorf("failed to reconcile: %v", rerr)
break
}

if err := c.updateLocalBackupStatus(); err != nil {
c.logger.Warningf("failed to update local backup service status: %v", err)
}
c.updateMemberStatus(c.members)
if err := c.updateCRStatus(); err != nil {
c.logger.Warningf("periodic update CR status failed: %v", err)
Expand Down Expand Up @@ -365,57 +326,25 @@ func (c *Cluster) handleUpdateEvent(event *clusterEvent) error {
// TODO: we can't handle another upgrade while an upgrade is in progress

c.logSpecUpdate(*oldSpec, event.cluster.Spec)

ob, nb := oldSpec.Backup, event.cluster.Spec.Backup
if !isBackupPolicyEqual(ob, nb) {
err := c.updateBackupPolicy(ob, nb)
if err != nil {
return fmt.Errorf("failed to update backup policy: %v", err)
}
}
return nil
}

func (c *Cluster) updateBackupPolicy(ob, nb *api.BackupPolicy) error {
var err error
switch {
case ob == nil && nb != nil:
c.bm, err = newBackupManager(c.config, c.cluster, c.logger)
if err != nil {
return err
}
return c.bm.setup()
case ob != nil && nb == nil:
c.bm.deleteBackupSidecar()
c.bm = nil
case ob != nil && nb != nil:
return c.bm.updateSidecar(c.cluster)
default:
panic("unexpected backup spec comparison")
}
return nil
}

func isSpecEqual(s1, s2 api.ClusterSpec) bool {
if s1.Size != s2.Size || s1.Paused != s2.Paused || s1.Version != s2.Version {
return false
}
return isBackupPolicyEqual(s1.Backup, s2.Backup)
return true
}

func isBackupPolicyEqual(b1, b2 *api.BackupPolicy) bool {
return reflect.DeepEqual(b1, b2)
}

func (c *Cluster) startSeedMember(recoverFromBackup bool) error {
func (c *Cluster) startSeedMember() error {
m := &etcdutil.Member{
Name: etcdutil.CreateMemberName(c.cluster.Name, c.memberCounter),
Namespace: c.cluster.Namespace,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
}
ms := etcdutil.NewMemberSet(m)
if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil {
if err := c.createPod(ms, m, "new"); err != nil {
return fmt.Errorf("failed to create seed member (%s): %v", m.Name, err)
}
c.memberCounter++
Expand All @@ -439,12 +368,7 @@ func (c *Cluster) isSecureClient() bool {

// bootstrap creates the seed etcd member for a new cluster.
func (c *Cluster) bootstrap() error {
return c.startSeedMember(false)
}

// recover recovers the cluster by creating a seed etcd member from a backup.
func (c *Cluster) recover() error {
return c.startSeedMember(true)
return c.startSeedMember()
}

func (c *Cluster) Update(cl *api.EtcdCluster) {
Expand All @@ -456,14 +380,6 @@ func (c *Cluster) Update(cl *api.EtcdCluster) {

func (c *Cluster) delete() {
c.gc.CollectCluster(c.cluster.Name, garbagecollection.NullUID)

if c.bm == nil {
return
}

if err := c.bm.cleanup(); err != nil {
c.logger.Errorf("cluster deletion: backup manager failed to cleanup: %v", err)
}
}

func (c *Cluster) setupServices() error {
Expand All @@ -475,18 +391,8 @@ func (c *Cluster) setupServices() error {
return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error {
var pod *v1.Pod
if state == "new" {
var backupURL *url.URL
if needRecovery {
serviceAddr := k8sutil.BackupServiceAddr(c.cluster.Name)
backupURL = backupapi.NewBackupURL("http", serviceAddr, c.cluster.Spec.Version, -1)
}
pod = k8sutil.NewSeedMemberPod(c.cluster.Name, members, m, c.cluster.Spec, c.cluster.AsOwner(), backupURL)
} else {
pod = k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, "", c.cluster.Spec, c.cluster.AsOwner())
}
func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error {
pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner())
_, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
return err
}
Expand Down Expand Up @@ -572,20 +478,6 @@ func (c *Cluster) updateCRStatus() error {
return nil
}

func (c *Cluster) updateLocalBackupStatus() error {
if c.bm == nil {
return nil
}

bs, err := c.bm.getStatus()
if err != nil {
return err
}
c.status.BackupServiceStatus = backupServiceStatusToTPRBackupServiceStatu(bs)

return nil
}

func (c *Cluster) reportFailedStatus() {
retryInterval := 5 * time.Second

Expand Down
56 changes: 7 additions & 49 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"k8s.io/api/core/v1"
)

// ErrLostQuorum indicates that the etcd cluster lost its quorum.
var ErrLostQuorum = errors.New("lost quorum")

// reconcile reconciles cluster current state to desired state specified by spec.
// - it tries to reconcile the cluster to desired size.
// - if the cluster needs for upgrade, it tries to upgrade old member one by one.
Expand Down Expand Up @@ -68,7 +71,7 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
// 1. Remove all pods from running set that does not belong to member set.
// 2. L consist of remaining pods of runnings
// 3. If L = members, the current state matches the membership state. END.
// 4. If len(L) < len(members)/2 + 1, quorum lost. Go to recovery process.
// 4. If len(L) < len(members)/2 + 1, return quorum lost error.
// 5. Add one missing member. END.
func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
c.logger.Infof("running members: %s", running)
Expand All @@ -90,8 +93,8 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
}

if L.Size() < c.members.Size()/2+1 {
c.logger.Infof("Disaster recovery")
return c.disasterRecovery(L)
c.logger.Infof("lost quorum")
return ErrLostQuorum
}

c.logger.Infof("removing one dead member")
Expand Down Expand Up @@ -139,7 +142,7 @@ func (c *Cluster) addOneMember() error {
newMember.ID = resp.Member.ID
c.members.Add(newMember)

if err := c.createPod(c.members, newMember, "existing", false); err != nil {
if err := c.createPod(c.members, newMember, "existing"); err != nil {
return fmt.Errorf("fail to create member's pod (%s): %v", newMember.Name, err)
}
c.memberCounter++
Expand Down Expand Up @@ -213,51 +216,6 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
return nil
}

func (c *Cluster) disasterRecovery(left etcdutil.MemberSet) error {
c.status.SetRecoveringCondition()

if c.cluster.Spec.SelfHosted != nil {
return errors.New("self-hosted cluster cannot be recovered from disaster")
}

if c.cluster.Spec.Backup == nil {
return newFatalError("fail to do disaster recovery: no backup policy has been defined")
}

backupNow := false
if len(left) > 0 {
c.logger.Infof("pods are still running (%v). Will try to make a latest backup from one of them.", left)
err := c.bm.requestBackup()
if err != nil {
c.logger.Errorln(err)
} else {
backupNow = true
}
}
if backupNow {
c.logger.Info("made a latest backup")
} else {
// We don't return error if backupnow failed. Instead, we ask if there is previous backup.
// If so, we can still continue. Otherwise, it's fatal error.
exist, err := c.bm.checkBackupExist(c.cluster.Spec.Version)
if err != nil {
c.logger.Errorln(err)
return err
}
if !exist {
return newFatalError("no backup exist for disaster recovery")
}
}

for _, m := range left {
err := c.removePod(m.Name)
if err != nil {
return err
}
}
return c.recover()
}

func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool {
return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil
}
Expand Down

0 comments on commit 472ee98

Please sign in to comment.