Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiKV upgrade bug fix #626

Merged
merged 31 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e9c31e8
fix evict schedulers have not been deleted
xiaojingchen Jun 21, 2019
facc381
aaaa
weekface Jun 27, 2019
9c379e0
stash
weekface Jul 2, 2019
f5951eb
bb
weekface Jul 2, 2019
41903c8
sss
weekface Jul 2, 2019
e31fbc4
fix
xiaojingchen Jul 3, 2019
8fc19a1
change main
xiaojingchen Jul 3, 2019
9b9e001
remove useless log
xiaojingchen Jul 3, 2019
44b5f04
remove useless code
xiaojingchen Jul 3, 2019
d1a3d19
remove useless code
xiaojingchen Jul 3, 2019
ad3522a
remove useless code
xiaojingchen Jul 3, 2019
d74e56d
fix lint error
xiaojingchen Jul 3, 2019
b0f5b7c
Merge branch 'master' into weekface/aaaa
xiaojingchen Jul 3, 2019
f0a493b
address comment
xiaojingchen Jul 3, 2019
8c76ac4
fix
xiaojingchen Jul 4, 2019
8c6f549
add a lack argument
xiaojingchen Jul 4, 2019
e86fb93
add test mode
xiaojingchen Jul 4, 2019
41c7a79
fix bug
xiaojingchen Jul 4, 2019
589f096
add log
xiaojingchen Jul 4, 2019
bb5aab9
fix CheckUpgrade logic
xiaojingchen Jul 4, 2019
e6e2e4c
remove useless logs
xiaojingchen Jul 4, 2019
c4ace86
add logs and add one tidb version
xiaojingchen Jul 5, 2019
e027be7
fix regions peers in same rack bug
xiaojingchen Jul 5, 2019
7f7681b
Merge branch 'master' into tikv-upgrade-fix
weekface Jul 5, 2019
3cbd0ab
add debug log
xiaojingchen Jul 8, 2019
5e6a36d
Merge branch 'tikv-upgrade-fix' of https://github.com/xiaojingchen/ti…
xiaojingchen Jul 8, 2019
f4b14e2
fix CheckUpgrade's replicas error
xiaojingchen Jul 8, 2019
35888fc
change log
xiaojingchen Jul 9, 2019
189b1b1
Merge branch 'master' into tikv-upgrade-fix
xiaojingchen Jul 9, 2019
2d9e88e
fix lint
xiaojingchen Jul 9, 2019
fa34393
Merge branch 'tikv-upgrade-fix' of https://github.com/xiaojingchen/ti…
xiaojingchen Jul 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ spec:
- -tikv-failover-period={{ .Values.controllerManager.tikvFailoverPeriod | default "5m" }}
- -tidb-failover-period={{ .Values.controllerManager.tidbFailoverPeriod | default "5m" }}
- -v={{ .Values.controllerManager.logLevel }}
{{- if .Values.testMode }}
- -test-mode={{ .Values.testMode }}
{{- end}}
env:
- name: NAMESPACE
valueFrom:
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func init() {
flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)")
flag.DurationVar(&tikvFailoverPeriod, "tikv-failover-period", time.Duration(5*time.Minute), "TiKV failover period default(5m)")
flag.DurationVar(&tidbFailoverPeriod, "tidb-failover-period", time.Duration(5*time.Minute), "TiDB failover period")
flag.BoolVar(&controller.TestMode, "test-mode", false, "whether tidb-operator run in test mode")

flag.Parse()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
DefaultStorageClassName string
// ClusterScoped controls whether operator should manage kubernetes cluster wide TiDB clusters
ClusterScoped bool
// TestMode defines whether tidb operator run in test mode, test mode is only open when test
TestMode bool
)

const (
Expand Down
29 changes: 12 additions & 17 deletions pkg/manager/member/tikv_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (tku *tikvUpgrader) upgradeTiKVPod(tc *v1alpha1.TidbCluster, ordinal int32,
return err
}
_, evicting := upgradePod.Annotations[EvictLeaderBeginTime]
if !evicting {
glog.Infof("start to evict leader:index:%d,upgradePodName:%s,storeID:%d", ordinal, upgradePodName, storeID)
return tku.beginEvictLeader(tc, storeID, upgradePod)
}

if tku.readyToUpgrade(upgradePod, store) {
err := tku.endEvictLeader(tc, ordinal)
Expand All @@ -132,9 +136,6 @@ func (tku *tikvUpgrader) upgradeTiKVPod(tc *v1alpha1.TidbCluster, ordinal int32,
return nil
}

if !evicting {
return tku.beginEvictLeader(tc, storeID, upgradePod)
}
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s tikv pod: [%s] is evicting leader", ns, tcName, upgradePodName)
}
}
Expand Down Expand Up @@ -173,29 +174,23 @@ func (tku *tikvUpgrader) beginEvictLeader(tc *v1alpha1.TidbCluster, storeID uint
}

func (tku *tikvUpgrader) endEvictLeader(tc *v1alpha1.TidbCluster, ordinal int32) error {
// wait 5 second before delete evict scheduler,it is for auto test can catch these info
if controller.TestMode {
time.Sleep(5 * time.Second)
}
store := tku.getStoreByOrdinal(tc, ordinal)
storeID, err := strconv.ParseUint(store.ID, 10, 64)
if err != nil {
return err
}
upgradedPodName := tikvPodName(tc.GetName(), ordinal)
upgradedPod, err := tku.podLister.Pods(tc.GetNamespace()).Get(upgradedPodName)
if err != nil {
return err
}
_, evicting := upgradedPod.Annotations[EvictLeaderBeginTime]
if evicting {
delete(upgradedPod.Annotations, EvictLeaderBeginTime)
_, err = tku.podControl.UpdatePod(tc, upgradedPod)
if err != nil {
return err
}
}
err = controller.GetPDClient(tku.pdControl, tc).EndEvictLeader(storeID)

err = tku.pdControl.GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName()).EndEvictLeader(storeID)
if err != nil {
return err
}

glog.Infof("successed to remove evict leader,ordinal:%d,storeID:%d", ordinal, storeID)

return nil
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/manager/member/tikv_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
changeOldSet: func(oldSet *apps.StatefulSet) {
SetLastAppliedConfigAnnotation(oldSet)
},
changePods: nil,
changePods: func(pods []*corev1.Pod) {
for _, pod := range pods {
if pod.GetName() == tikvPodName(upgradeTcName, 2) {
pod.Annotations = map[string]string{EvictLeaderBeginTime: time.Now().Add(-1 * time.Minute).Format(time.RFC3339)}
}
}
},
beginEvictLeaderErr: false,
endEvictLeaderErr: false,
updatePodErr: false,
Expand All @@ -138,10 +144,6 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
if pods[tikvPodName(upgradeTcName, 2)].Annotations != nil {
_, exist := pods[tikvPodName(upgradeTcName, 2)].Annotations[EvictLeaderBeginTime]
g.Expect(exist).To(BeFalse())
}
},
},
{
Expand All @@ -163,7 +165,13 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
oldSet.Status.UpdatedReplicas = 1
oldSet.Spec.UpdateStrategy.RollingUpdate.Partition = func() *int32 { i := int32(2); return &i }()
},
changePods: nil,
changePods: func(pods []*corev1.Pod) {
for _, pod := range pods {
if pod.GetName() == tikvPodName(upgradeTcName, 1) {
pod.Annotations = map[string]string{EvictLeaderBeginTime: time.Now().Add(-1 * time.Minute).Format(time.RFC3339)}
}
}
},
beginEvictLeaderErr: false,
endEvictLeaderErr: false,
updatePodErr: false,
Expand All @@ -172,10 +180,6 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
if pods[tikvPodName(upgradeTcName, 1)].Annotations != nil {
_, exist := pods[tikvPodName(upgradeTcName, 1)].Annotations[EvictLeaderBeginTime]
g.Expect(exist).To(BeFalse())
}
},
},
{
Expand Down
91 changes: 84 additions & 7 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tests

import (
"context"
"crypto/tls"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -128,6 +129,8 @@ type OperatorActions interface {
ScaleTidbClusterOrDie(info *TidbClusterConfig)
CheckScaleInSafely(info *TidbClusterConfig) error
CheckScaledCorrectly(info *TidbClusterConfig, podUIDsBeforeScale map[string]types.UID) error
CheckUpgradeOrDie(ctx context.Context, info *TidbClusterConfig)
CheckUpgrade(ctx context.Context, info *TidbClusterConfig) error
UpgradeTidbCluster(info *TidbClusterConfig) error
UpgradeTidbClusterOrDie(info *TidbClusterConfig)
DeployAdHocBackup(info *TidbClusterConfig) error
Expand Down Expand Up @@ -222,6 +225,7 @@ type OperatorConfig struct {
WebhookConfigName string
Context *apimachinery.CertContext
ImagePullPolicy corev1.PullPolicy
TestMode bool
}

type TidbClusterConfig struct {
Expand Down Expand Up @@ -345,6 +349,7 @@ func (oi *OperatorConfig) OperatorHelmSetString(m map[string]string) string {
"controllerManager.replicas": "2",
"scheduler.replicas": "2",
"imagePullPolicy": string(oi.ImagePullPolicy),
"testMode": strconv.FormatBool(oi.TestMode),
}
if oi.SchedulerTag != "" {
set["scheduler.kubeSchedulerImageTag"] = oi.SchedulerTag
Expand Down Expand Up @@ -672,7 +677,7 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterConfig) error

ns := info.Namespace
tcName := info.ClusterName
if err := wait.Poll(oa.pollInterval, 30*time.Minute, func() (bool, error) {
if err := wait.Poll(oa.pollInterval, 120*time.Minute, func() (bool, error) {
var tc *v1alpha1.TidbCluster
var err error
if tc, err = oa.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}); err != nil {
Expand Down Expand Up @@ -737,7 +742,7 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterConfig) error
return true, nil
}); err != nil {
glog.Errorf("check tidb cluster status failed: %s", err.Error())
return fmt.Errorf("failed to waiting for tidbcluster %s/%s ready in 30 minutes", ns, tcName)
return fmt.Errorf("failed to waiting for tidbcluster %s/%s ready in 120 minutes", ns, tcName)
}

return nil
Expand Down Expand Up @@ -891,11 +896,6 @@ func (oa *operatorActions) SetPartitionAnnotation(tcName string, nameSpace strin
}

func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterConfig) error {
// record tikv leader count in webhook first
err := webhook.GetAllKVLeaders(oa.cli, info.Namespace, info.ClusterName)
if err != nil {
return err
}
oa.EmitEvent(info, "UpgradeTidbCluster")

cmd := oa.getHelmUpgradeClusterCmd(info, nil)
Expand All @@ -913,6 +913,83 @@ func (oa *operatorActions) UpgradeTidbClusterOrDie(info *TidbClusterConfig) {
}
}

func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterConfig) error {
ns := info.Namespace
tcName := info.ClusterName

findStoreFn := func(tc *v1alpha1.TidbCluster, podName string) string {
for storeID, store := range tc.Status.TiKV.Stores {
if store.PodName == podName {
return storeID
}
}

return ""
}

for {
tc, err := oa.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get tidbcluster: %s/%s, %v", ns, tcName, err)
continue
}
pdClient := pdapi.NewDefaultPDControl().GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName())

replicas := tc.TiKVRealReplicas()
for i := replicas - 1; i >= 0; i-- {
if err := wait.PollImmediate(1*time.Second, 10*time.Minute, func() (done bool, err error) {
schedulers, err := pdClient.GetEvictLeaderSchedulers()
if err != nil {
glog.Errorf("failed to get evict leader schedulers, %v", err)
return false, nil
}
glog.V(4).Infof("index:%d,schedulers:%v,error:%v", i, schedulers, err)
if len(schedulers) > 1 {
return true, fmt.Errorf("there are too many evict leader schedulers: %v", schedulers)
}
if len(schedulers) == 0 {
return false, nil
}
podName := fmt.Sprintf("%s-tikv-%d", tcName, i)
scheduler := fmt.Sprintf("evict-leader-scheduler-%s", findStoreFn(tc, podName))
if schedulers[0] == scheduler {
glog.Infof("index: %d,the schedulers: %s = %s", i, schedulers[0], scheduler)
return true, nil
}
glog.Errorf("index: %d,the scheduler: %s != %s", i, schedulers[0], scheduler)
return false, nil
}); err != nil {
glog.Errorf("failed to check upgrade %s/%s, %v", ns, tcName, err)
return err
}
}
if err := wait.PollImmediate(1*time.Second, 6*time.Minute, func() (done bool, err error) {
schedulers, err := pdClient.GetEvictLeaderSchedulers()
if err != nil {
glog.Errorf("failed to get evict leader schedulers, %v", err)
return false, nil
}
if len(schedulers) == 0 {
return true, nil
}
glog.Errorf("schedulers: %v is not empty", schedulers)
return false, nil
}); err != nil {
glog.Errorf("failed to wait all schedulers deleted %s/%s, %v", ns, tcName, err)
return err
}
break
}

return nil
}

func (oa *operatorActions) CheckUpgradeOrDie(ctx context.Context, info *TidbClusterConfig) {
if err := oa.CheckUpgrade(ctx, info); err != nil {
slack.NotifyAndPanic(err)
}
}

func (oa *operatorActions) DeployMonitor(info *TidbClusterConfig) error { return nil }
func (oa *operatorActions) CleanMonitor(info *TidbClusterConfig) error { return nil }

Expand Down
1 change: 1 addition & 0 deletions tests/cmd/e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
WebhookSecretName: "webhook-secret",
WebhookConfigName: "webhook-config",
ImagePullPolicy: v1.PullIfNotPresent,
TestMode: true,
}

ns := os.Getenv("NAMESPACE")
Expand Down
12 changes: 8 additions & 4 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
Expand All @@ -32,7 +33,7 @@ import (
)

var cfg *tests.Config
var context *apimachinery.CertContext
var certCtx *apimachinery.CertContext
var upgradeVersions []string

func main() {
Expand All @@ -46,11 +47,11 @@ func main() {
ns := os.Getenv("NAMESPACE")

var err error
context, err = apimachinery.SetupServerCert(ns, tests.WebhookServiceName)
certCtx, err = apimachinery.SetupServerCert(ns, tests.WebhookServiceName)
if err != nil {
panic(err)
}
go tests.StartValidatingAdmissionWebhookServerOrDie(context)
go tests.StartValidatingAdmissionWebhookServerOrDie(certCtx)

c := cron.New()
if err := c.AddFunc("0 0 10 * * *", func() {
Expand Down Expand Up @@ -158,17 +159,20 @@ func run() {
}

// upgrade
oa.RegisterWebHookAndServiceOrDie(context, ocfg)
oa.RegisterWebHookAndServiceOrDie(certCtx, ocfg)
ctx, cancel := context.WithCancel(context.Background())
for idx, cluster := range clusters {
assignedNodes := oa.GetTidbMemberAssignedNodesOrDie(cluster)
cluster.UpgradeAll(upgradeVersion)
oa.UpgradeTidbClusterOrDie(cluster)
oa.CheckUpgradeOrDie(ctx, cluster)
if idx == 0 {
oa.CheckManualPauseTiDBOrDie(cluster)
}
oa.CheckTidbClusterStatusOrDie(cluster)
oa.CheckTidbMemberAssignedNodesOrDie(cluster, assignedNodes)
}
cancel()

// configuration change
for _, cluster := range clusters {
Expand Down
2 changes: 2 additions & 0 deletions tests/cmd/stability/stability.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func newOperatorConfig() *tests.OperatorConfig {
WebhookSecretName: "webhook-secret",
WebhookConfigName: "webhook-config",
ImagePullPolicy: v1.PullAlways,
TestMode: true,
}
}

Expand Down Expand Up @@ -70,6 +71,7 @@ func newTidbClusterConfig(ns, clusterName string) *tests.TidbClusterConfig {
TiKVGrpcConcurrency: 4,
TiDBTokenLimit: 1000,
PDLogLevel: "info",
TopologyKey: topologyKey,
SubValues: tests.GetAffinityConfigOrDie(clusterName, ns, topologyKey, []string{topologyKey}),
}
}
2 changes: 1 addition & 1 deletion tests/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewConfig() (*Config, error) {
flag.StringVar(&cfg.configFile, "config", "", "Config file")
flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory")
flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service")
flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0-rc.1,v3.0.0-rc.2", "tidb versions")
flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0-rc.1,v3.0.0-rc.2,v3.0.0", "tidb versions")
flag.StringVar(&cfg.OperatorTag, "operator-tag", "master", "operator tag used to choose charts")
flag.StringVar(&cfg.OperatorImage, "operator-image", "pingcap/tidb-operator:latest", "operator image")
flag.StringVar(&cfg.UpgradeOperatorTag, "upgrade-operator-tag", "", "upgrade operator tag used to choose charts")
Expand Down
6 changes: 5 additions & 1 deletion tests/dt.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (oa *operatorActions) CheckDataRegionDisasterTolerance(cluster *TidbCluster
// regionRacks is map of rackName and the peerID
regionRacks := map[string]uint64{}
for _, peer := range region.Peers {
if len(region.Peers) != 3 {
glog.Infof("cluster[%s] region[%d]'s peers not equal 3,[%v]. May be the failover happened", cluster.ClusterName, region.ID, region.Peers)
continue
}
storeID := strconv.FormatUint(peer.StoreId, 10)
nodeName, err := oa.getNodeByStoreId(storeID, cluster)
if err != nil {
Expand All @@ -189,7 +193,7 @@ func (oa *operatorActions) CheckDataRegionDisasterTolerance(cluster *TidbCluster
rackName := rackNodeMap[nodeName]
// if the rack have more than one peer of the region, return error
if otherID, exist := regionRacks[rackName]; exist {
return fmt.Errorf("region[%d]'s peer: [%d]and[%d] are in same rack:[%s]", region.ID, otherID, peer.Id, rackName)
return fmt.Errorf("cluster[%s] region[%d]'s peer: [%d]and[%d] are in same rack:[%s]", cluster.ClusterName, region.ID, otherID, peer.Id, rackName)
}
// add a new pair of rack and peer
regionRacks[rackName] = peer.Id
Expand Down
Loading