Skip to content

Commit

Permalink
Fix tikv scale in failure in some cases (#726) (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored and weekface committed Aug 9, 2019
1 parent c0cf0cd commit a5a995f
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 58 deletions.
10 changes: 5 additions & 5 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ var (
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
resyncDuration = 30 * time.Second
waitDuration = 5 * time.Second
)

Expand All @@ -62,6 +61,7 @@ func init() {
flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)")
flag.DurationVar(&tikvFailoverPeriod, "tikv-failover-period", time.Duration(5*time.Minute), "TiKV failover period default(5m)")
flag.DurationVar(&tidbFailoverPeriod, "tidb-failover-period", time.Duration(5*time.Minute), "TiDB failover period")
flag.DurationVar(&controller.ResyncDuration, "resync-duration", time.Duration(30*time.Second), "Resync time of informer")
flag.BoolVar(&controller.TestMode, "test-mode", false, "whether tidb-operator run in test mode")

flag.Parse()
Expand Down Expand Up @@ -104,18 +104,18 @@ func main() {
var informerFactory informers.SharedInformerFactory
var kubeInformerFactory kubeinformers.SharedInformerFactory
if controller.ClusterScoped {
informerFactory = informers.NewSharedInformerFactory(cli, resyncDuration)
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, resyncDuration)
informerFactory = informers.NewSharedInformerFactory(cli, controller.ResyncDuration)
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, controller.ResyncDuration)
} else {
options := []informers.SharedInformerOption{
informers.WithNamespace(ns),
}
informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, resyncDuration, options...)
informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, controller.ResyncDuration, options...)

kubeoptions := []kubeinformers.SharedInformerOption{
kubeinformers.WithNamespace(ns),
}
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, resyncDuration, kubeoptions...)
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, controller.ResyncDuration, kubeoptions...)
}

rl := resourcelock.EndpointsLock{
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller
import (
"fmt"
"math"
"time"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
Expand All @@ -33,6 +34,8 @@ var (
ClusterScoped bool
// TestMode defines whether tidb operator run in test mode, test mode is only open when test
TestMode bool
// ResyncDuration is the resync time of informer
ResyncDuration time.Duration
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
return fpc.PVCIndexer.Delete(pvc)
}

// Update updates the annotation, labels and spec of pvc
// UpdatePVC updates the annotation, labels and spec of pvc
func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.inc()
if fpc.updatePVCTracker.errorReady() {
Expand Down
40 changes: 30 additions & 10 deletions pkg/manager/member/pd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading bool
hasPVC bool
hasDeferAnn bool
annoIsNil bool
pvcDeleteErr bool
statusSyncFailed bool
err bool
Expand All @@ -61,19 +62,18 @@ func TestPDScalerScaleOut(t *testing.T) {

scaler, _, pvcIndexer, pvcControl := newFakePDScaler()

pvc1 := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc2 := pvc1.DeepCopy()
pvc1.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas)
pvc2.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas+1)
pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas)
if !test.annoIsNil {
pvc.Annotations = map[string]string{}
}

if test.hasDeferAnn {
pvc1.Annotations = map[string]string{}
pvc1.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
pvc2.Annotations = map[string]string{}
pvc2.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
pvc.Annotations = map[string]string{}
pvc.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
}
if test.hasPVC {
pvcIndexer.Add(pvc1)
pvcIndexer.Add(pvc2)
pvcIndexer.Add(pvc)
}

if test.pvcDeleteErr {
Expand Down Expand Up @@ -102,6 +102,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -113,6 +114,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: true,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -124,6 +126,19 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: false,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
changed: true,
},
{
name: "pvc annotation is not nil but doesn't contain defer deletion annotation",
update: normalPDMember,
pdUpgrading: false,
hasPVC: false,
hasDeferAnn: false,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -135,6 +150,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: true,
statusSyncFailed: false,
err: true,
Expand All @@ -155,6 +171,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -168,6 +185,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: true,
Expand All @@ -186,6 +204,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: true,
Expand All @@ -197,6 +216,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: true,
err: true,
Expand Down
47 changes: 44 additions & 3 deletions pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

type tikvScaler struct {
Expand Down Expand Up @@ -78,6 +79,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
resetReplicas(newSet, oldSet)
return err
}

for _, store := range tc.Status.TiKV.Stores {
if store.PodName == podName {
state := store.State
Expand Down Expand Up @@ -135,9 +137,48 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
}
}

// store not found in TidbCluster status,
// this can happen when TiKV joins cluster but we haven't synced its status
// so return error to wait another round for safety
// When store not found in TidbCluster status, there are two situations as follows:
// 1. This can happen when TiKV joins cluster but we haven't synced its status.
// In this situation return error to wait another round for safety.
//
// 2. This can happen when TiKV pod has not been successfully registered in the cluster, such as always pending.
// In this situation we should delete this TiKV pod immediately to avoid blocking the subsequent operations.
if !podutil.IsPodReady(pod) {
pvcName := ordinalPVCName(v1alpha1.TiKVMemberType, setName, ordinal)
pvc, err := tsd.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if err != nil {
resetReplicas(newSet, oldSet)
return err
}
safeTimeDeadline := pod.CreationTimestamp.Add(5 * controller.ResyncDuration)
if time.Now().Before(safeTimeDeadline) {
// Wait for 5 resync periods to ensure that the following situation does not occur:
//
// The tikv pod starts for a while, but has not synced its status, and then the pod becomes not ready.
// Here we wait for 5 resync periods to ensure that the status of this tikv pod has been synced.
// After this period of time, if there is still no information about this tikv in TidbCluster status,
// then we can be sure that this tikv has never been added to the tidb cluster.
// So we can scale in this tikv pod safely.
resetReplicas(newSet, oldSet)
return fmt.Errorf("TiKV %s/%s is not ready, wait for some resync periods to synced its status", ns, podName)
}
if pvc.Annotations == nil {
pvc.Annotations = map[string]string{}
}
now := time.Now().Format(time.RFC3339)
pvc.Annotations[label.AnnPVCDeferDeleting] = now
_, err = tsd.pvcControl.UpdatePVC(tc, pvc)
if err != nil {
glog.Errorf("pod %s not ready, tikv scale in: failed to set pvc %s/%s annotation: %s to %s",
podName, ns, pvcName, label.AnnPVCDeferDeleting, now)
resetReplicas(newSet, oldSet)
return err
}
glog.Infof("pod %s not ready, tikv scale in: set pvc %s/%s annotation: %s to %s",
podName, ns, pvcName, label.AnnPVCDeferDeleting, now)
decreaseReplicas(newSet, oldSet)
return nil
}
resetReplicas(newSet, oldSet)
return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName)
}
Expand Down
Loading

0 comments on commit a5a995f

Please sign in to comment.