Skip to content

Commit

Permalink
support force upgrade when pd cluster is unavailable (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielZhangQD authored Jul 8, 2019
1 parent d15453d commit ed45eb6
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 70 deletions.
8 changes: 7 additions & 1 deletion pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ package label

import (
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"strings"
)

const (
Expand Down Expand Up @@ -52,6 +53,11 @@ const (
AnnPVCPodScheduling = "tidb.pingcap.com/pod-scheduling"
// AnnTiDBPartition is pod annotation which TiDB pod chould upgrade to
AnnTiDBPartition string = "tidb.pingcap.com/tidb-partition"
// AnnForceUpgradeKey is tc annotation key to indicate whether force upgrade should be done
AnnForceUpgradeKey = "tidb.pingcap.com/force-upgrade"

// AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done
AnnForceUpgradeVal = "true"

// PDLabelVal is PD label value
PDLabelVal string = "pd"
Expand Down
14 changes: 13 additions & 1 deletion pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
glog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", ns, tcName, err)
}

if !tc.Status.PD.Synced {
force := needForceUpgrade(tc)
if force {
tc.Status.PD.Phase = v1alpha1.UpgradePhase
setUpgradePartition(newPDSet, 0)
errSTS := pmm.updateStatefulSet(tc, newPDSet, oldPDSet)
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd needs force upgrade, %v", ns, tcName, errSTS)
}
}

if !templateEqual(newPDSet.Spec.Template, oldPDSet.Spec.Template) || tc.Status.PD.Phase == v1alpha1.UpgradePhase {
if err := pmm.pdUpgrader.Upgrade(tc, oldPDSet, newPDSet); err != nil {
return err
Expand Down Expand Up @@ -227,7 +237,9 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
}
}

// TODO FIXME equal is false every time
return pmm.updateStatefulSet(tc, newPDSet, oldPDSet)
}
func (pmm *pdMemberManager) updateStatefulSet(tc *v1alpha1.TidbCluster, newPDSet, oldPDSet *apps.StatefulSet) error {
if !statefulSetEqual(*newPDSet, *oldPDSet) {
set := *oldPDSet
set.Spec.Template = newPDSet.Spec.Template
Expand Down
127 changes: 127 additions & 0 deletions pkg/manager/member/pd_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,133 @@ func TestPDMemberManagerUpgrade(t *testing.T) {
}
}

func TestPDMemberManagerSyncPDSts(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
name string
modify func(cluster *v1alpha1.TidbCluster)
pdHealth *pdapi.HealthInfo
err bool
statusChange func(*apps.StatefulSet)
expectStatefulSetFn func(*GomegaWithT, *apps.StatefulSet, error)
expectTidbClusterFn func(*GomegaWithT, *v1alpha1.TidbCluster)
}

testFn := func(test *testcase, t *testing.T) {
tc := newTidbClusterForPD()
ns := tc.Namespace
tcName := tc.Name

pmm, fakeSetControl, _, fakePDControl, _, _, _ := newFakePDMemberManager()
pdClient := controller.NewFakePDClient(fakePDControl, tc)

pdClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
return test.pdHealth, nil
})
pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) {
return &metapb.Cluster{Id: uint64(1)}, nil
})

fakeSetControl.SetStatusChange(test.statusChange)

err := pmm.Sync(tc)
g.Expect(controller.IsRequeueError(err)).To(BeTrue())

_, err = pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

test.modify(tc)
pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) {
return &metapb.Cluster{Id: uint64(1)}, fmt.Errorf("cannot get cluster")
})
err = pmm.syncPDStatefulSetForTidbCluster(tc)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}

if test.expectStatefulSetFn != nil {
set, err := pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
test.expectStatefulSetFn(g, set, err)
}
if test.expectTidbClusterFn != nil {
test.expectTidbClusterFn(g, tc)
}
}
tests := []testcase{
{
name: "force upgrade",
modify: func(cluster *v1alpha1.TidbCluster) {
cluster.Spec.PD.Image = "pd-test-image:v2"
cluster.Spec.PD.Replicas = 1
cluster.ObjectMeta.Annotations = make(map[string]string)
cluster.ObjectMeta.Annotations["tidb.pingcap.com/force-upgrade"] = "true"
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd1", MemberID: uint64(1), ClientUrls: []string{"http://pd1:2379"}, Health: false},
{Name: "pd2", MemberID: uint64(2), ClientUrls: []string{"http://pd2:2379"}, Health: false},
{Name: "pd3", MemberID: uint64(3), ClientUrls: []string{"http://pd3:2379"}, Health: false},
}},
err: true,
statusChange: func(set *apps.StatefulSet) {
set.Status.Replicas = *set.Spec.Replicas
set.Status.CurrentRevision = "pd-1"
set.Status.UpdateRevision = "pd-1"
observedGeneration := int64(1)
set.Status.ObservedGeneration = &observedGeneration
},
expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(set.Spec.Template.Spec.Containers[0].Image).To(Equal("pd-test-image:v2"))
g.Expect(*set.Spec.Replicas).To(Equal(int32(1)))
g.Expect(*set.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(0)))
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase))
},
},
{
name: "non force upgrade",
modify: func(cluster *v1alpha1.TidbCluster) {
cluster.Spec.PD.Image = "pd-test-image:v2"
cluster.Spec.PD.Replicas = 1
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd1", MemberID: uint64(1), ClientUrls: []string{"http://pd1:2379"}, Health: false},
{Name: "pd2", MemberID: uint64(2), ClientUrls: []string{"http://pd2:2379"}, Health: false},
{Name: "pd3", MemberID: uint64(3), ClientUrls: []string{"http://pd3:2379"}, Health: false},
}},
err: true,
statusChange: func(set *apps.StatefulSet) {
set.Status.Replicas = *set.Spec.Replicas
set.Status.CurrentRevision = "pd-1"
set.Status.UpdateRevision = "pd-1"
observedGeneration := int64(1)
set.Status.ObservedGeneration = &observedGeneration
},
expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(set.Spec.Template.Spec.Containers[0].Image).To(Equal("pd-test-image"))
g.Expect(*set.Spec.Replicas).To(Equal(int32(3)))
g.Expect(*set.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase))
},
},
}
for i := range tests {
t.Logf("begin: %s", tests[i].name)
testFn(&tests[i], t)
t.Logf("end: %s", tests[i].name)
}
}

func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetControl, *controller.FakeServiceControl, *pdapi.FakePDControl, cache.Indexer, cache.Indexer, *controller.FakePodControl) {
cli := fake.NewSimpleClientset()
kubeCli := kubefake.NewSimpleClientset()
Expand Down
41 changes: 3 additions & 38 deletions pkg/manager/member/pd_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -42,14 +41,6 @@ func NewPDUpgrader(pdControl pdapi.PDControlInterface,
}

func (pu *pdUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
force, err := pu.needForceUpgrade(tc)
if err != nil {
return err
}
if force {
return pu.forceUpgrade(tc, oldSet, newSet)
}

return pu.gracefulUpgrade(tc, oldSet, newSet)
}

Expand Down Expand Up @@ -97,35 +88,6 @@ func (pu *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Sta
return nil
}

func (pu *pdUpgrader) needForceUpgrade(tc *v1alpha1.TidbCluster) (bool, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
instanceName := tc.GetLabels()[label.InstanceLabelKey]
selector, err := label.New().Instance(instanceName).PD().Selector()
if err != nil {
return false, err
}
pdPods, err := pu.podLister.Pods(ns).List(selector)
if err != nil {
return false, err
}

imagePullFailedCount := 0
for _, pod := range pdPods {
revisionHash, exist := pod.Labels[apps.ControllerRevisionHashLabelKey]
if !exist {
return false, fmt.Errorf("tidbcluster: [%s/%s]'s pod:[%s] doesn't have label: %s", ns, tcName, pod.GetName(), apps.ControllerRevisionHashLabelKey)
}
if revisionHash == tc.Status.PD.StatefulSet.CurrentRevision {
if imagePullFailed(pod) {
imagePullFailedCount++
}
}
}

return imagePullFailedCount >= int(tc.Status.PD.StatefulSet.Replicas)/2+1, nil
}

func (pu *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
Expand Down Expand Up @@ -161,6 +123,9 @@ func NewFakePDUpgrader() Upgrader {
}

func (fpu *fakePDUpgrader) Upgrade(tc *v1alpha1.TidbCluster, _ *apps.StatefulSet, _ *apps.StatefulSet) error {
if !tc.Status.PD.Synced {
return fmt.Errorf("tidbcluster: pd status sync failed,can not to be upgraded")
}
tc.Status.PD.Phase = v1alpha1.UpgradePhase
return nil
}
30 changes: 0 additions & 30 deletions pkg/manager/member/pd_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,6 @@ func TestPDUpgraderUpgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(func() *int32 { i := int32(3); return &i }()))
},
},
{
name: "force upgrade",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Synced = false
},
changePods: func(pods []*corev1.Pod) {
pods[1].Status = corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{
{
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{Reason: ErrImagePull},
},
},
}}
pods[0].Status = corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{
{
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{Reason: ErrImagePull},
},
},
}}
},
transferLeaderErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(func() *int32 { i := int32(0); return &i }()))
},
},
{
name: "error when transfer leader",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"fmt"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -214,3 +216,15 @@ func CombineAnnotations(a, b map[string]string) map[string]string {
}
return a
}

// needForceUpgrade check if force upgrade is necessary
func needForceUpgrade(tc *v1alpha1.TidbCluster) bool {
// Check if annotation 'pingcap.com/force-upgrade: "true"' is set
if tc.Annotations != nil {
forceVal, ok := tc.Annotations[label.AnnForceUpgradeKey]
if ok && (forceVal == label.AnnForceUpgradeVal) {
return true
}
}
return false
}

0 comments on commit ed45eb6

Please sign in to comment.