Skip to content
This repository has been archived by the owner on Oct 24, 2023. It is now read-only.

Commit

Permalink
feat: Make cordon drain timeout configurable with --upgrade (#1276)
Browse files Browse the repository at this point in the history
  • Loading branch information
aramase authored and jackfrancis committed May 10, 2019
1 parent 3a5985c commit 8656825
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 90 deletions.
29 changes: 19 additions & 10 deletions cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ type upgradeCmd struct {
authProvider

// user input
resourceGroupName string
apiModelPath string
deploymentDirectory string
upgradeVersion string
location string
timeoutInMinutes int
force bool
resourceGroupName string
apiModelPath string
deploymentDirectory string
upgradeVersion string
location string
timeoutInMinutes int
cordonDrainTimeoutInMinutes int
force bool

// derived
containerService *api.ContainerService
Expand All @@ -49,6 +50,7 @@ type upgradeCmd struct {
nameSuffix string
agentPoolsToUpgrade map[string]bool
timeout *time.Duration
cordonDrainTimeout *time.Duration
}

func newUpgradeCmd() *cobra.Command {
Expand All @@ -70,6 +72,7 @@ func newUpgradeCmd() *cobra.Command {
f.StringVar(&uc.deploymentDirectory, "deployment-dir", "", "the location of the output from `generate`")
f.StringVarP(&uc.upgradeVersion, "upgrade-version", "k", "", "desired kubernetes version (required)")
f.IntVar(&uc.timeoutInMinutes, "vm-timeout", -1, "how long to wait for each vm to be upgraded in minutes")
f.IntVar(&uc.cordonDrainTimeoutInMinutes, "cordon-drain-timeout", -1, "how long to wait for each vm to be cordoned in minutes")
f.BoolVarP(&uc.force, "force", "f", false, "force upgrading the cluster to desired version. Allows same version upgrades and downgrades.")
addAuthFlags(uc.getAuthArgs(), f)

Expand Down Expand Up @@ -102,6 +105,11 @@ func (uc *upgradeCmd) validate(cmd *cobra.Command) error {
uc.timeout = &timeout
}

if uc.cordonDrainTimeoutInMinutes != -1 {
cordonDrainTimeout := time.Duration(uc.cordonDrainTimeoutInMinutes) * time.Minute
uc.cordonDrainTimeout = &cordonDrainTimeout
}

if uc.upgradeVersion == "" {
cmd.Usage()
return errors.New("--upgrade-version must be specified")
Expand Down Expand Up @@ -238,9 +246,10 @@ func (uc *upgradeCmd) run(cmd *cobra.Command, args []string) error {
Translator: &i18n.Translator{
Locale: uc.locale,
},
Logger: log.NewEntry(log.New()),
Client: uc.client,
StepTimeout: uc.timeout,
Logger: log.NewEntry(log.New()),
Client: uc.client,
StepTimeout: uc.timeout,
CordonDrainTimeout: uc.cordonDrainTimeout,
}

upgradeCluster.ClusterTopology = kubernetesupgrade.ClusterTopology{}
Expand Down
135 changes: 73 additions & 62 deletions cmd/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,56 +37,61 @@ func TestUpgradeCommandShouldBeValidated(t *testing.T) {
}{
{
uc: &upgradeCmd{
resourceGroupName: "",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "1.8.9",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "1.8.9",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
},
expectedErr: errors.New("--resource-group must be specified"),
},
{
uc: &upgradeCmd{
resourceGroupName: "test",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "1.8.9",
location: "",
timeoutInMinutes: 60,
resourceGroupName: "test",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "1.8.9",
location: "",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
},
expectedErr: errors.New("--location must be specified"),
},
{
uc: &upgradeCmd{
resourceGroupName: "test",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "",
location: "southcentralus",
timeoutInMinutes: 60,
resourceGroupName: "test",
apiModelPath: "./not/used",
deploymentDirectory: "",
upgradeVersion: "",
location: "southcentralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
},
expectedErr: errors.New("--upgrade-version must be specified"),
},
{
uc: &upgradeCmd{
resourceGroupName: "test",
apiModelPath: "",
deploymentDirectory: "",
upgradeVersion: "1.9.0",
location: "southcentralus",
timeoutInMinutes: 60,
resourceGroupName: "test",
apiModelPath: "",
deploymentDirectory: "",
upgradeVersion: "1.9.0",
location: "southcentralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
},
expectedErr: errors.New("--api-model must be specified"),
},
{
uc: &upgradeCmd{
resourceGroupName: "test",
apiModelPath: "./somefile",
deploymentDirectory: "aDir/anotherDir",
upgradeVersion: "1.9.0",
location: "southcentralus",
timeoutInMinutes: 60,
resourceGroupName: "test",
apiModelPath: "./somefile",
deploymentDirectory: "aDir/anotherDir",
upgradeVersion: "1.9.0",
location: "southcentralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
},
expectedErr: errors.New("ambiguous, please specify only one of --api-model and --deployment-dir"),
},
Expand Down Expand Up @@ -137,11 +142,12 @@ func TestUpgradeShouldFailForSameVersion(t *testing.T) {
})
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,

client: &armhelpers.MockAKSEngineClient{},
}
Expand All @@ -162,11 +168,12 @@ func TestUpgradeShouldFailForInvalidUpgradePath(t *testing.T) {
})
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,

client: &armhelpers.MockAKSEngineClient{},
}
Expand All @@ -186,11 +193,12 @@ func TestUpgradeShouldSuceedForValidUpgradePath(t *testing.T) {
})
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,

client: &armhelpers.MockAKSEngineClient{},
}
Expand All @@ -206,13 +214,14 @@ func TestUpgradeShouldSuceedForValidUpgradePath(t *testing.T) {
func TestUpgradeFailWithPathWhenAzureDeployJsonIsInvalid(t *testing.T) {
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.13.3",
location: "centralus",
timeoutInMinutes: 60,
force: true,
client: &armhelpers.MockAKSEngineClient{},
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.13.3",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,
force: true,
client: &armhelpers.MockAKSEngineClient{},
}

containerServiceMock := api.CreateMockContainerService("testcluster", "1.13.2", 3, 2, false)
Expand All @@ -228,11 +237,12 @@ func TestUpgradeForceSameVersionShouldSucceed(t *testing.T) {
})
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.13",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,

client: &armhelpers.MockAKSEngineClient{},
}
Expand All @@ -253,11 +263,12 @@ func TestUpgradeForceDowngradeShouldSetVersionOnContainerService(t *testing.T) {
})
g := NewGomegaWithT(t)
upgradeCmd := &upgradeCmd{
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.12",
location: "centralus",
timeoutInMinutes: 60,
resourceGroupName: "rg",
apiModelPath: "./not/used",
upgradeVersion: "1.10.12",
location: "centralus",
timeoutInMinutes: 60,
cordonDrainTimeoutInMinutes: 60,

client: &armhelpers.MockAKSEngineClient{},
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/operations/kubernetesupgrade/upgradeagentnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
)

const (
interval = time.Second * 1
retry = time.Second * 5
cordonDrainTimeout = time.Minute * 20
interval = time.Second * 1
retry = time.Second * 5
)

// Compiler to verify QueueMessageProcessor implements OperationsProcessor
Expand All @@ -42,6 +41,7 @@ type UpgradeAgentNode struct {
Client armhelpers.AKSEngineClient
kubeConfig string
timeout time.Duration
cordonDrainTimeout time.Duration
}

// DeleteNode takes state/resources of the master/agent node from ListNodeResources
Expand Down Expand Up @@ -69,7 +69,7 @@ func (kan *UpgradeAgentNode) DeleteNode(vmName *string, drain bool) error {
}
// Cordon and drain the node
if drain {
err = operations.SafelyDrainNodeWithClient(client, kan.logger, nodeName, cordonDrainTimeout)
err = operations.SafelyDrainNodeWithClient(client, kan.logger, nodeName, kan.cordonDrainTimeout)
if err != nil {
kan.logger.Warningf("Error draining agent VM %s. Proceeding with deletion. Error: %v", *vmName, err)
// Proceed with deletion anyways
Expand Down
11 changes: 6 additions & 5 deletions pkg/operations/kubernetesupgrade/upgradecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ type UpgradeCluster struct {
Translator *i18n.Translator
Logger *logrus.Entry
ClusterTopology
Client armhelpers.AKSEngineClient
StepTimeout *time.Duration
UpgradeWorkFlow UpgradeWorkFlow
Force bool
Client armhelpers.AKSEngineClient
StepTimeout *time.Duration
CordonDrainTimeout *time.Duration
UpgradeWorkFlow UpgradeWorkFlow
Force bool
}

// MasterVMNamePrefix is the prefix for all master VM names for Kubernetes clusters
Expand Down Expand Up @@ -108,7 +109,7 @@ func (uc *UpgradeCluster) getUpgradeWorkflow(kubeConfig string, aksEngineVersion
return uc.UpgradeWorkFlow
}
u := &Upgrader{}
u.Init(uc.Translator, uc.Logger, uc.ClusterTopology, uc.Client, kubeConfig, uc.StepTimeout, aksEngineVersion)
u.Init(uc.Translator, uc.Logger, uc.ClusterTopology, uc.Client, kubeConfig, uc.StepTimeout, uc.CordonDrainTimeout, aksEngineVersion)
return u
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/operations/kubernetesupgrade/upgradecluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ var _ = Describe("Upgrade Kubernetes cluster tests", func() {
}

u := &Upgrader{}
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, TestAKSEngineVersion)
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, nil, TestAKSEngineVersion)

vmname, err := u.getLastVMNameInVMSS(ctx, "resourcegroup", "scalesetName")
Expect(vmname).To(Equal("aks-agentnode1-123456-vmss000005"))
Expand All @@ -604,7 +604,7 @@ var _ = Describe("Upgrade Kubernetes cluster tests", func() {
mockClient.MakeFakeVirtualMachineScaleSetVMWithGivenName("Kubernetes:1.9.10", ""),
}
}
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, TestAKSEngineVersion)
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, nil, TestAKSEngineVersion)

vmname, err = u.getLastVMNameInVMSS(ctx, "resourcegroup", "scalesetName")
Expect(vmname).To(Equal(""))
Expand All @@ -613,7 +613,7 @@ var _ = Describe("Upgrade Kubernetes cluster tests", func() {
mockClient.FakeListVirtualMachineScaleSetVMsResult = func() []compute.VirtualMachineScaleSetVM {
return []compute.VirtualMachineScaleSetVM{}
}
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, TestAKSEngineVersion)
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, &mockClient, "", nil, nil, TestAKSEngineVersion)

vmname, err = u.getLastVMNameInVMSS(ctx, "resourcegroup", "scalesetName")
Expect(vmname).To(Equal(""))
Expand All @@ -626,7 +626,7 @@ var _ = Describe("Upgrade Kubernetes cluster tests", func() {
mockClient := &armhelpers.MockAKSEngineClient{MockKubernetesClient: &armhelpers.MockKubernetesClient{}}
mockClient.MockKubernetesClient.FailGetNode = true

u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, nil, "", nil, TestAKSEngineVersion)
u.Init(&i18n.Translator{}, log.NewEntry(log.New()), ClusterTopology{}, nil, "", nil, nil, TestAKSEngineVersion)
err := u.copyCustomPropertiesToNewNode(mockClient.MockKubernetesClient, "oldNodeName", "newNodeName")
Expect(err).Should(HaveOccurred())

Expand Down
Loading

0 comments on commit 8656825

Please sign in to comment.