diff --git a/cmd/controller-manager/main.go b/cmd/controller-manager/main.go index baedda0793..88ad688c7c 100644 --- a/cmd/controller-manager/main.go +++ b/cmd/controller-manager/main.go @@ -48,7 +48,6 @@ var ( leaseDuration = 15 * time.Second renewDuration = 5 * time.Second retryPeriod = 3 * time.Second - resyncDuration = 30 * time.Second waitDuration = 5 * time.Second ) @@ -62,6 +61,7 @@ func init() { flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)") flag.DurationVar(&tikvFailoverPeriod, "tikv-failover-period", time.Duration(5*time.Minute), "TiKV failover period default(5m)") flag.DurationVar(&tidbFailoverPeriod, "tidb-failover-period", time.Duration(5*time.Minute), "TiDB failover period") + flag.DurationVar(&controller.ResyncDuration, "resync-duration", time.Duration(30*time.Second), "Resync time of informer") flag.BoolVar(&controller.TestMode, "test-mode", false, "whether tidb-operator run in test mode") flag.Parse() @@ -104,18 +104,18 @@ func main() { var informerFactory informers.SharedInformerFactory var kubeInformerFactory kubeinformers.SharedInformerFactory if controller.ClusterScoped { - informerFactory = informers.NewSharedInformerFactory(cli, resyncDuration) - kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, resyncDuration) + informerFactory = informers.NewSharedInformerFactory(cli, controller.ResyncDuration) + kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, controller.ResyncDuration) } else { options := []informers.SharedInformerOption{ informers.WithNamespace(ns), } - informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, resyncDuration, options...) + informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, controller.ResyncDuration, options...) kubeoptions := []kubeinformers.SharedInformerOption{ kubeinformers.WithNamespace(ns), } - kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, resyncDuration, kubeoptions...) + kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, controller.ResyncDuration, kubeoptions...) } rl := resourcelock.EndpointsLock{ diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 03dd812406..c97b8c5dc7 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -16,6 +16,7 @@ package controller import ( "fmt" "math" + "time" "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" @@ -33,6 +34,8 @@ var ( ClusterScoped bool // TestMode defines whether tidb operator run in test mode, test mode is only open when test TestMode bool + // ResyncDuration is the resync time of informer + ResyncDuration time.Duration ) const ( diff --git a/pkg/controller/pvc_control.go b/pkg/controller/pvc_control.go index 1cf74c0b19..b48b71e857 100644 --- a/pkg/controller/pvc_control.go +++ b/pkg/controller/pvc_control.go @@ -216,7 +216,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis return fpc.PVCIndexer.Delete(pvc) } -// Update updates the annotation, labels and spec of pvc +// UpdatePVC updates the annotation, labels and spec of pvc func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { defer fpc.updatePVCTracker.inc() if fpc.updatePVCTracker.errorReady() { diff --git a/pkg/manager/member/pd_scaler_test.go b/pkg/manager/member/pd_scaler_test.go index 8e38954176..b0b7b2074b 100644 --- a/pkg/manager/member/pd_scaler_test.go +++ b/pkg/manager/member/pd_scaler_test.go @@ -40,6 +40,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading bool hasPVC bool hasDeferAnn bool + annoIsNil bool pvcDeleteErr bool statusSyncFailed bool err bool @@ -61,19 +62,18 @@ func TestPDScalerScaleOut(t *testing.T) { scaler, _, pvcIndexer, pvcControl := newFakePDScaler() - pvc1 := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType) - pvc2 := pvc1.DeepCopy() - pvc1.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) - pvc2.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas+1) + pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType) + pvc.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) + if !test.annoIsNil { + pvc.Annotations = map[string]string{} + } + if test.hasDeferAnn { - pvc1.Annotations = map[string]string{} - pvc1.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) - pvc2.Annotations = map[string]string{} - pvc2.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) + pvc.Annotations = map[string]string{} + pvc.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) } if test.hasPVC { - pvcIndexer.Add(pvc1) - pvcIndexer.Add(pvc2) + pvcIndexer.Add(pvc) } if test.pvcDeleteErr { @@ -102,6 +102,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, statusSyncFailed: false, err: false, @@ -113,6 +114,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: true, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, statusSyncFailed: false, err: false, @@ -124,6 +126,19 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: false, hasDeferAnn: false, + annoIsNil: true, + pvcDeleteErr: false, + statusSyncFailed: false, + err: false, + changed: true, + }, + { + name: "pvc annotation is not nil but doesn't contain defer deletion annotation", + update: normalPDMember, + pdUpgrading: false, + hasPVC: false, + hasDeferAnn: false, + annoIsNil: false, pvcDeleteErr: false, statusSyncFailed: false, err: false, @@ -135,6 +150,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: true, + annoIsNil: false, pvcDeleteErr: true, statusSyncFailed: false, err: true, @@ -155,6 +171,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: true, + annoIsNil: false, pvcDeleteErr: false, statusSyncFailed: false, err: false, @@ -168,6 +185,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: true, + annoIsNil: false, pvcDeleteErr: false, statusSyncFailed: false, err: true, @@ -186,6 +204,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, statusSyncFailed: false, err: true, @@ -197,6 +216,7 @@ func TestPDScalerScaleOut(t *testing.T) { pdUpgrading: false, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, statusSyncFailed: true, err: true, diff --git a/pkg/manager/member/tikv_scaler.go b/pkg/manager/member/tikv_scaler.go index 4160d2edd8..a803e79c9e 100644 --- a/pkg/manager/member/tikv_scaler.go +++ b/pkg/manager/member/tikv_scaler.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/pdapi" apps "k8s.io/api/apps/v1beta1" corelisters "k8s.io/client-go/listers/core/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" ) type tikvScaler struct { @@ -78,6 +79,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe resetReplicas(newSet, oldSet) return err } + for _, store := range tc.Status.TiKV.Stores { if store.PodName == podName { state := store.State @@ -135,9 +137,48 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe } } - // store not found in TidbCluster status, - // this can happen when TiKV joins cluster but we haven't synced its status - // so return error to wait another round for safety + // When store not found in TidbCluster status, there are two situations as follows: + // 1. This can happen when TiKV joins cluster but we haven't synced its status. + // In this situation return error to wait another round for safety. + // + // 2. This can happen when TiKV pod has not been successfully registered in the cluster, such as always pending. + // In this situation we should delete this TiKV pod immediately to avoid blocking the subsequent operations. + if !podutil.IsPodReady(pod) { + pvcName := ordinalPVCName(v1alpha1.TiKVMemberType, setName, ordinal) + pvc, err := tsd.pvcLister.PersistentVolumeClaims(ns).Get(pvcName) + if err != nil { + resetReplicas(newSet, oldSet) + return err + } + safeTimeDeadline := pod.CreationTimestamp.Add(5 * controller.ResyncDuration) + if time.Now().Before(safeTimeDeadline) { + // Wait for 5 resync periods to ensure that the following situation does not occur: + // + // The tikv pod starts for a while, but has not synced its status, and then the pod becomes not ready. + // Here we wait for 5 resync periods to ensure that the status of this tikv pod has been synced. + // After this period of time, if there is still no information about this tikv in TidbCluster status, + // then we can be sure that this tikv has never been added to the tidb cluster. + // So we can scale in this tikv pod safely. + resetReplicas(newSet, oldSet) + return fmt.Errorf("TiKV %s/%s is not ready, wait for some resync periods to synced its status", ns, podName) + } + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + now := time.Now().Format(time.RFC3339) + pvc.Annotations[label.AnnPVCDeferDeleting] = now + _, err = tsd.pvcControl.UpdatePVC(tc, pvc) + if err != nil { + glog.Errorf("pod %s not ready, tikv scale in: failed to set pvc %s/%s annotation: %s to %s", + podName, ns, pvcName, label.AnnPVCDeferDeleting, now) + resetReplicas(newSet, oldSet) + return err + } + glog.Infof("pod %s not ready, tikv scale in: set pvc %s/%s annotation: %s to %s", + podName, ns, pvcName, label.AnnPVCDeferDeleting, now) + decreaseReplicas(newSet, oldSet) + return nil + } resetReplicas(newSet, oldSet) return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName) } diff --git a/pkg/manager/member/tikv_scaler_test.go b/pkg/manager/member/tikv_scaler_test.go index c9af96c765..22387ecba3 100644 --- a/pkg/manager/member/tikv_scaler_test.go +++ b/pkg/manager/member/tikv_scaler_test.go @@ -39,6 +39,7 @@ func TestTiKVScalerScaleOut(t *testing.T) { hasPVC bool hasDeferAnn bool pvcDeleteErr bool + annoIsNil bool errExpectFn func(*GomegaWithT, error) changed bool } @@ -57,19 +58,18 @@ func TestTiKVScalerScaleOut(t *testing.T) { scaler, _, pvcIndexer, _, pvcControl := newFakeTiKVScaler() - pvc1 := newPVCForStatefulSet(oldSet, v1alpha1.TiKVMemberType) - pvc2 := pvc1.DeepCopy() - pvc1.Name = ordinalPVCName(v1alpha1.TiKVMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) - pvc2.Name = ordinalPVCName(v1alpha1.TiKVMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) + pvc := newPVCForStatefulSet(oldSet, v1alpha1.TiKVMemberType) + pvc.Name = ordinalPVCName(v1alpha1.TiKVMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) + if !test.annoIsNil { + pvc.Annotations = map[string]string{} + } + if test.hasDeferAnn { - pvc1.Annotations = map[string]string{} - pvc1.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) - pvc2.Annotations = map[string]string{} - pvc2.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) + pvc.Annotations = map[string]string{} + pvc.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339) } if test.hasPVC { - pvcIndexer.Add(pvc1) - pvcIndexer.Add(pvc2) + pvcIndexer.Add(pvc) } if test.pvcDeleteErr { @@ -91,6 +91,7 @@ func TestTiKVScalerScaleOut(t *testing.T) { tikvUpgrading: false, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, errExpectFn: errExpectNil, changed: true, @@ -100,6 +101,7 @@ func TestTiKVScalerScaleOut(t *testing.T) { tikvUpgrading: true, hasPVC: true, hasDeferAnn: false, + annoIsNil: true, pvcDeleteErr: false, errExpectFn: errExpectNil, changed: false, @@ -109,6 +111,17 @@ func TestTiKVScalerScaleOut(t *testing.T) { tikvUpgrading: false, hasPVC: false, hasDeferAnn: false, + annoIsNil: true, + pvcDeleteErr: false, + errExpectFn: errExpectNil, + changed: true, + }, + { + name: "pvc annotation is not nil but doesn't contain defer deletion annotation", + tikvUpgrading: false, + hasPVC: true, + hasDeferAnn: false, + annoIsNil: false, pvcDeleteErr: false, errExpectFn: errExpectNil, changed: true, @@ -138,11 +151,15 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr bool hasPVC bool storeIDSynced bool + isPodReady bool + hasSynced bool pvcUpdateErr bool errExpectFn func(*GomegaWithT, error) changed bool } + controller.ResyncDuration = 0 + testFn := func(test *testcase, t *testing.T) { t.Log(test.name) tc := newTidbClusterForPD() @@ -159,11 +176,21 @@ func TestTiKVScalerScaleIn(t *testing.T) { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ - Name: tikvPodName(tc.GetName(), 4), - Namespace: corev1.NamespaceDefault, + Name: tikvPodName(tc.GetName(), 4), + Namespace: corev1.NamespaceDefault, + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, }, } + readyPodFunc(pod) + if !test.isPodReady { + notReadyPodFunc(pod) + } + + if !test.hasSynced { + pod.CreationTimestamp = metav1.Time{Time: time.Now().Add(1 * time.Hour)} + } + scaler, pdControl, pvcIndexer, podIndexer, pvcControl := newFakeTiKVScaler() if test.hasPVC { @@ -199,16 +226,31 @@ func TestTiKVScalerScaleIn(t *testing.T) { tests := []testcase{ { - name: "ordinal store is up, delete store success", + name: "store is up, delete store failed", tikvUpgrading: false, storeFun: normalStoreFun, delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, }, + { + name: "store state is up, delete store success", + tikvUpgrading: false, + storeFun: normalStoreFun, + delStoreErr: false, + hasPVC: true, + storeIDSynced: true, + isPodReady: true, + hasSynced: true, + pvcUpdateErr: false, + errExpectFn: errExpectRequeue, + changed: false, + }, { name: "tikv is upgrading", tikvUpgrading: true, @@ -216,35 +258,57 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNil, changed: false, }, { - name: "store id not match", + name: "status.TiKV.Stores is empty", tikvUpgrading: false, - storeFun: normalStoreFun, + storeFun: func(tc *v1alpha1.TidbCluster) { + tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{} + }, delStoreErr: false, hasPVC: true, - storeIDSynced: false, + storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, }, { - name: "status.TiKV.Stores is empty", + name: "tikv pod is not ready now, not sure if the status has been synced", tikvUpgrading: false, storeFun: func(tc *v1alpha1.TidbCluster) { - normalStoreFun(tc) tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{} }, delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: false, + hasSynced: false, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, }, + { + name: "tikv pod is not ready now, make sure the status has been synced", + tikvUpgrading: false, + storeFun: func(tc *v1alpha1.TidbCluster) { + tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{} + }, + delStoreErr: false, + hasPVC: true, + storeIDSynced: true, + isPodReady: false, + hasSynced: true, + pvcUpdateErr: false, + errExpectFn: errExpectNil, + changed: true, + }, { name: "podName not match", tikvUpgrading: false, @@ -257,6 +321,8 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, @@ -273,6 +339,8 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, @@ -289,31 +357,37 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, - errExpectFn: errExpectNotNil, + errExpectFn: errExpectRequeue, changed: false, }, { - name: "store state is up, delete store success", + name: "store state is tombstone", tikvUpgrading: false, - storeFun: normalStoreFun, + storeFun: tombstoneStoreFun, delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, - errExpectFn: errExpectRequeue, - changed: false, + errExpectFn: errExpectNil, + changed: true, }, { - name: "store state is tombstone", + name: "store state is tombstone and store id not match", tikvUpgrading: false, - storeFun: tombstoneStoreFun, + storeFun: normalStoreFun, delStoreErr: false, hasPVC: true, - storeIDSynced: true, + storeIDSynced: false, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, - errExpectFn: errExpectNil, - changed: true, + errExpectFn: errExpectNotNil, + changed: false, }, { name: "store state is tombstone, id is not integer", @@ -327,6 +401,8 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: true, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, @@ -338,21 +414,12 @@ func TestTiKVScalerScaleIn(t *testing.T) { delStoreErr: false, hasPVC: false, storeIDSynced: true, + isPodReady: true, + hasSynced: true, pvcUpdateErr: false, errExpectFn: errExpectNotNil, changed: false, }, - { - name: "store state is tombstone, don't have pvc", - tikvUpgrading: false, - storeFun: tombstoneStoreFun, - delStoreErr: false, - hasPVC: true, - storeIDSynced: true, - pvcUpdateErr: true, - errExpectFn: errExpectNotNil, - changed: false, - }, } for i := range tests { @@ -393,6 +460,24 @@ func tombstoneStoreFun(tc *v1alpha1.TidbCluster) { } } +func readyPodFunc(pod *corev1.Pod) { + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + } +} + +func notReadyPodFunc(pod *corev1.Pod) { + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + } +} + func errExpectRequeue(g *GomegaWithT, err error) { g.Expect(controller.IsRequeueError(err)).To(Equal(true)) }