diff --git a/pkg/label/label.go b/pkg/label/label.go index 17ea6c6d95..1c1099437c 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -15,9 +15,10 @@ package label import ( "fmt" + "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "strings" ) const ( @@ -52,6 +53,11 @@ const ( AnnPVCPodScheduling = "tidb.pingcap.com/pod-scheduling" // AnnTiDBPartition is pod annotation which TiDB pod chould upgrade to AnnTiDBPartition string = "tidb.pingcap.com/tidb-partition" + // AnnForceUpgradeKey is tc annotation key to indicate whether force upgrade should be done + AnnForceUpgradeKey = "tidb.pingcap.com/force-upgrade" + + // AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done + AnnForceUpgradeVal = "true" // PDLabelVal is PD label value PDLabelVal string = "pd" diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index ead59f4f54..3810fb3775 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -200,6 +200,16 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu glog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", ns, tcName, err) } + if !tc.Status.PD.Synced { + force := needForceUpgrade(tc) + if force { + tc.Status.PD.Phase = v1alpha1.UpgradePhase + setUpgradePartition(newPDSet, 0) + errSTS := pmm.updateStatefulSet(tc, newPDSet, oldPDSet) + return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd needs force upgrade, %v", ns, tcName, errSTS) + } + } + if !templateEqual(newPDSet.Spec.Template, oldPDSet.Spec.Template) || tc.Status.PD.Phase == v1alpha1.UpgradePhase { if err := pmm.pdUpgrader.Upgrade(tc, oldPDSet, newPDSet); err != nil { return err @@ -227,7 +237,9 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu } } - // TODO FIXME equal is false every time + return pmm.updateStatefulSet(tc, newPDSet, oldPDSet) +} +func (pmm *pdMemberManager) updateStatefulSet(tc *v1alpha1.TidbCluster, newPDSet, oldPDSet *apps.StatefulSet) error { if !statefulSetEqual(*newPDSet, *oldPDSet) { set := *oldPDSet set.Spec.Template = newPDSet.Spec.Template diff --git a/pkg/manager/member/pd_member_manager_test.go b/pkg/manager/member/pd_member_manager_test.go index dfd0b22b99..a7120ecee2 100644 --- a/pkg/manager/member/pd_member_manager_test.go +++ b/pkg/manager/member/pd_member_manager_test.go @@ -639,6 +639,133 @@ func TestPDMemberManagerUpgrade(t *testing.T) { } } +func TestPDMemberManagerSyncPDSts(t *testing.T) { + g := NewGomegaWithT(t) + type testcase struct { + name string + modify func(cluster *v1alpha1.TidbCluster) + pdHealth *pdapi.HealthInfo + err bool + statusChange func(*apps.StatefulSet) + expectStatefulSetFn func(*GomegaWithT, *apps.StatefulSet, error) + expectTidbClusterFn func(*GomegaWithT, *v1alpha1.TidbCluster) + } + + testFn := func(test *testcase, t *testing.T) { + tc := newTidbClusterForPD() + ns := tc.Namespace + tcName := tc.Name + + pmm, fakeSetControl, _, fakePDControl, _, _, _ := newFakePDMemberManager() + pdClient := controller.NewFakePDClient(fakePDControl, tc) + + pdClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) { + return test.pdHealth, nil + }) + pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) { + return &metapb.Cluster{Id: uint64(1)}, nil + }) + + fakeSetControl.SetStatusChange(test.statusChange) + + err := pmm.Sync(tc) + g.Expect(controller.IsRequeueError(err)).To(BeTrue()) + + _, err = pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + + test.modify(tc) + pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) { + return &metapb.Cluster{Id: uint64(1)}, fmt.Errorf("cannot get cluster") + }) + err = pmm.syncPDStatefulSetForTidbCluster(tc) + if test.err { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).NotTo(HaveOccurred()) + } + + if test.expectStatefulSetFn != nil { + set, err := pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName)) + test.expectStatefulSetFn(g, set, err) + } + if test.expectTidbClusterFn != nil { + test.expectTidbClusterFn(g, tc) + } + } + tests := []testcase{ + { + name: "force upgrade", + modify: func(cluster *v1alpha1.TidbCluster) { + cluster.Spec.PD.Image = "pd-test-image:v2" + cluster.Spec.PD.Replicas = 1 + cluster.ObjectMeta.Annotations = make(map[string]string) + cluster.ObjectMeta.Annotations["tidb.pingcap.com/force-upgrade"] = "true" + }, + pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{ + {Name: "pd1", MemberID: uint64(1), ClientUrls: []string{"http://pd1:2379"}, Health: false}, + {Name: "pd2", MemberID: uint64(2), ClientUrls: []string{"http://pd2:2379"}, Health: false}, + {Name: "pd3", MemberID: uint64(3), ClientUrls: []string{"http://pd3:2379"}, Health: false}, + }}, + err: true, + statusChange: func(set *apps.StatefulSet) { + set.Status.Replicas = *set.Spec.Replicas + set.Status.CurrentRevision = "pd-1" + set.Status.UpdateRevision = "pd-1" + observedGeneration := int64(1) + set.Status.ObservedGeneration = &observedGeneration + }, + expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(set.Spec.Template.Spec.Containers[0].Image).To(Equal("pd-test-image:v2")) + g.Expect(*set.Spec.Replicas).To(Equal(int32(1))) + g.Expect(*set.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(0))) + }, + expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) { + g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase)) + }, + }, + { + name: "non force upgrade", + modify: func(cluster *v1alpha1.TidbCluster) { + cluster.Spec.PD.Image = "pd-test-image:v2" + cluster.Spec.PD.Replicas = 1 + }, + pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{ + {Name: "pd1", MemberID: uint64(1), ClientUrls: []string{"http://pd1:2379"}, Health: false}, + {Name: "pd2", MemberID: uint64(2), ClientUrls: []string{"http://pd2:2379"}, Health: false}, + {Name: "pd3", MemberID: uint64(3), ClientUrls: []string{"http://pd3:2379"}, Health: false}, + }}, + err: true, + statusChange: func(set *apps.StatefulSet) { + set.Status.Replicas = *set.Spec.Replicas + set.Status.CurrentRevision = "pd-1" + set.Status.UpdateRevision = "pd-1" + observedGeneration := int64(1) + set.Status.ObservedGeneration = &observedGeneration + }, + expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(set.Spec.Template.Spec.Containers[0].Image).To(Equal("pd-test-image")) + g.Expect(*set.Spec.Replicas).To(Equal(int32(3))) + g.Expect(*set.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3))) + }, + expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) { + g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase)) + }, + }, + } + for i := range tests { + t.Logf("begin: %s", tests[i].name) + testFn(&tests[i], t) + t.Logf("end: %s", tests[i].name) + } +} + func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetControl, *controller.FakeServiceControl, *pdapi.FakePDControl, cache.Indexer, cache.Indexer, *controller.FakePodControl) { cli := fake.NewSimpleClientset() kubeCli := kubefake.NewSimpleClientset() diff --git a/pkg/manager/member/pd_upgrader.go b/pkg/manager/member/pd_upgrader.go index 96cc49c54a..bea952f130 100644 --- a/pkg/manager/member/pd_upgrader.go +++ b/pkg/manager/member/pd_upgrader.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" - "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/pdapi" apps "k8s.io/api/apps/v1beta1" corelisters "k8s.io/client-go/listers/core/v1" @@ -42,14 +41,6 @@ func NewPDUpgrader(pdControl pdapi.PDControlInterface, } func (pu *pdUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error { - force, err := pu.needForceUpgrade(tc) - if err != nil { - return err - } - if force { - return pu.forceUpgrade(tc, oldSet, newSet) - } - return pu.gracefulUpgrade(tc, oldSet, newSet) } @@ -97,35 +88,6 @@ func (pu *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Sta return nil } -func (pu *pdUpgrader) needForceUpgrade(tc *v1alpha1.TidbCluster) (bool, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() - instanceName := tc.GetLabels()[label.InstanceLabelKey] - selector, err := label.New().Instance(instanceName).PD().Selector() - if err != nil { - return false, err - } - pdPods, err := pu.podLister.Pods(ns).List(selector) - if err != nil { - return false, err - } - - imagePullFailedCount := 0 - for _, pod := range pdPods { - revisionHash, exist := pod.Labels[apps.ControllerRevisionHashLabelKey] - if !exist { - return false, fmt.Errorf("tidbcluster: [%s/%s]'s pod:[%s] doesn't have label: %s", ns, tcName, pod.GetName(), apps.ControllerRevisionHashLabelKey) - } - if revisionHash == tc.Status.PD.StatefulSet.CurrentRevision { - if imagePullFailed(pod) { - imagePullFailedCount++ - } - } - } - - return imagePullFailedCount >= int(tc.Status.PD.StatefulSet.Replicas)/2+1, nil -} - func (pu *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet) error { ns := tc.GetNamespace() tcName := tc.GetName() @@ -161,6 +123,9 @@ func NewFakePDUpgrader() Upgrader { } func (fpu *fakePDUpgrader) Upgrade(tc *v1alpha1.TidbCluster, _ *apps.StatefulSet, _ *apps.StatefulSet) error { + if !tc.Status.PD.Synced { + return fmt.Errorf("tidbcluster: pd status sync failed,can not to be upgraded") + } tc.Status.PD.Phase = v1alpha1.UpgradePhase return nil } diff --git a/pkg/manager/member/pd_upgrader_test.go b/pkg/manager/member/pd_upgrader_test.go index c4cabe4986..88db2d860c 100644 --- a/pkg/manager/member/pd_upgrader_test.go +++ b/pkg/manager/member/pd_upgrader_test.go @@ -167,36 +167,6 @@ func TestPDUpgraderUpgrade(t *testing.T) { g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(func() *int32 { i := int32(3); return &i }())) }, }, - { - name: "force upgrade", - changeFn: func(tc *v1alpha1.TidbCluster) { - tc.Status.PD.Synced = false - }, - changePods: func(pods []*corev1.Pod) { - pods[1].Status = corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ - { - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{Reason: ErrImagePull}, - }, - }, - }} - pods[0].Status = corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ - { - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{Reason: ErrImagePull}, - }, - }, - }} - }, - transferLeaderErr: false, - errExpectFn: func(g *GomegaWithT, err error) { - g.Expect(err).NotTo(HaveOccurred()) - }, - expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) { - g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase)) - g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(func() *int32 { i := int32(0); return &i }())) - }, - }, { name: "error when transfer leader", changeFn: func(tc *v1alpha1.TidbCluster) { diff --git a/pkg/manager/member/utils.go b/pkg/manager/member/utils.go index a9c27d7506..7a1d061635 100644 --- a/pkg/manager/member/utils.go +++ b/pkg/manager/member/utils.go @@ -18,7 +18,9 @@ import ( "fmt" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" + "github.com/pingcap/tidb-operator/pkg/label" apps "k8s.io/api/apps/v1beta1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -214,3 +216,15 @@ func CombineAnnotations(a, b map[string]string) map[string]string { } return a } + +// needForceUpgrade check if force upgrade is necessary +func needForceUpgrade(tc *v1alpha1.TidbCluster) bool { + // Check if annotation 'pingcap.com/force-upgrade: "true"' is set + if tc.Annotations != nil { + forceVal, ok := tc.Annotations[label.AnnForceUpgradeKey] + if ok && (forceVal == label.AnnForceUpgradeVal) { + return true + } + } + return false +}