Skip to content
This repository has been archived by the owner on Jul 14, 2019. It is now read-only.

Commit

Permalink
Fixes to update for master and worker nodes (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
oneilcin authored and davidewatson committed Sep 20, 2018
1 parent 64f808a commit d90167e
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 50 deletions.
18 changes: 6 additions & 12 deletions cloud/ssh/actuators/machine/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"github.com/golang/glog"
clustercommon "sigs.k8s.io/cluster-api/pkg/apis/cluster/common"

"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh"
"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh/providerconfig/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh"
"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh/providerconfig/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
apierrors "sigs.k8s.io/cluster-api/pkg/errors"
Expand Down Expand Up @@ -270,20 +270,14 @@ func (a *Actuator) Update(c *clusterv1.Cluster, goalMachine *clusterv1.Machine)
if util.IsMaster(currentMachine) {
glog.Infof("Performing an in-place upgrade for master %s.", currentMachineName)
// TODO: should we support custom CAs here?
if err = a.updateMasterInplace(c, currentMachine, goalMachine); err != nil {
if err = a.updateMasterInPlace(c, currentMachine, goalMachine); err != nil {
glog.Errorf("master in-place update failed for %s: %v", currentMachineName, err)
return err
}
} else {
glog.Infof("Deleting machine %s for update.", currentMachineName)
if err = a.Delete(c, currentMachine); err != nil {
glog.Errorf("deleting machine %s for update failed: %v", currentMachineName, err)
return err
}

glog.Infof("Re-creating machine %s for update. ", currentMachineName)
if err = a.Create(c, goalMachine); err != nil {
glog.Errorf("creating machine %s for update failed: %v", goalMachineName, err)
glog.Infof("Performing upgrade for worker %s.", currentMachineName)
if err = a.updateWorkerInPlace(c, currentMachine, goalMachine); err != nil {
glog.Errorf("worker update failed for %s: v%", currentMachineName, err)
return err
}
}
Expand Down
149 changes: 112 additions & 37 deletions cloud/ssh/actuators/machine/actuator_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ const (
deleteEventAction = "Delete"
noEventAction = ""
// TODO should this move to the cluster controller?
apiServerPort = 443
apiServerPort = 443
upgradeControlPlaneCmd = "sudo curl -o /usr/bin/kubeadm -sSL https://dl.k8s.io/release/v%[1]s/bin/linux/amd64/kubeadm && " +
"sudo chmod a+rx /usr/bin/kubeadm && " +
"sudo kubeadm upgrade apply v%[1]s -y"
upgradeMasterPackagesCmd = "sudo kubectl drain %[1]s --ignore-daemonsets --kubeconfig /etc/kubernetes/admin.conf && " +
"sudo apt-get update && " +
"sudo apt-get upgrade -y kubelet=%[2]s-00 kubeadm=%[2]s-00 && " +
"sudo systemctl restart kubelet && sudo kubectl uncordon %[1]s --kubeconfig /etc/kubernetes/admin.conf"
getNodeCmd = "sudo kubectl get no -o go-template='{{range .items}}{{.metadata.name}}:{{.metadata.annotations.machine}}{{\"\\n\"}}{{end}}' --kubeconfig /etc/kubernetes/admin.conf"
drainWorkerCmd = "sudo kubectl drain %[1]s --ignore-daemonsets --kubeconfig /etc/kubernetes/admin.conf"
uncordonWorkerCmd = "sudo kubectl uncordon %[1]s --kubeconfig /etc/kubernetes/admin.conf"
upgradeWorkerPackagesCmd = "sudo apt-get update && " +
"sudo apt-get upgrade -y kubelet=%[1]s-00 kubeadm=%[1]s-00 && " +
"sudo kubeadm upgrade node config --kubelet-version $(kubelet --version | cut -d ' ' -f 2) && " +
"sudo systemctl restart kubelet"
)

func (a *Actuator) machineProviderConfig(providerConfig clusterv1.ProviderConfig) (*v1alpha1.SSHMachineProviderConfig, error) {
Expand Down Expand Up @@ -115,13 +129,33 @@ func (a *Actuator) getMetadata(c *clusterv1.Cluster, m *clusterv1.Machine, machi
return &metadata, nil
}

func (a *Actuator) createKubeconfigSecret(c *clusterv1.Cluster, m *clusterv1.Machine, sshClient ssh.SSHProviderClientInterface) error {
func (a *Actuator) getNodeForMachine(c *clusterv1.Cluster, m *clusterv1.Machine) (string, error) {
masterSSHClient, err := a.getMasterSSHClient(c, m)
if err != nil {
glog.Error("Error getting master sshClient")
return "", err
}
nodeCmd := getNodeCmd + " | grep " + m.Namespace + "/" + m.Name
glog.Infof("nodeCmd = %s", nodeCmd)
output, err := masterSSHClient.ProcessCMDWithOutput(nodeCmd)
if err != nil {
glog.Errorf("Error getting node: cmd = %s, error = %s", nodeCmd, err)
return "", err
}
strs := strings.Split(string(output), ":")
if len(strs) == 0 {
return "", errors.New("Error getting node name for machine")
}
return strs[0], nil
}

func (a *Actuator) createKubeconfigSecret(c *clusterv1.Cluster, m *clusterv1.Machine, sshMasterClient ssh.SSHProviderClientInterface) error {
if a.kubeClient == nil {
return fmt.Errorf("kubeclient is nil, should not happen")
}

glog.Infof("Getting kubeconfig from master, machine %s cluster %s", m.Name, c.Name)
output, err := sshClient.GetKubeConfigBytes()
output, err := sshMasterClient.GetKubeConfigBytes()
if err != nil {
glog.Errorf("Error getting kubeconfig from master for machine %s cluster %s error: %v", m.Name, c.Name, err)
return err
Expand Down Expand Up @@ -164,27 +198,34 @@ func (a *Actuator) createKubeconfigSecret(c *clusterv1.Cluster, m *clusterv1.Mac
return err
}

func (a *Actuator) getKubeadmTokenOnMaster(c *clusterv1.Cluster, m *clusterv1.Machine) (string, error) {
if m.ObjectMeta.DeletionTimestamp != nil {
// No need to create token on a delete.
return "", nil
}
func (a *Actuator) getMasterSSHClient(c *clusterv1.Cluster, m *clusterv1.Machine) (ssh.SSHProviderClientInterface, error) {
machineConfig, err := a.machineProviderConfig(m.Spec.ProviderConfig)
if err != nil {
return "", err
return nil, err
}
privateKey, passPhrase, err := a.getPrivateKey(c, m.Namespace, machineConfig.SSHConfig.SecretName)
if err != nil {
return "", err
return nil, err
}
// init master ip address
masterSSHConfig := v1alpha1.SSHConfig{Username: machineConfig.SSHConfig.Username,
Host: c.Status.APIEndpoints[0].Host,
Port: machineConfig.SSHConfig.Port,
}

masterSSHClient := ssh.NewSSHProviderClient(privateKey, passPhrase, masterSSHConfig)
return masterSSHClient, nil
}

func (a *Actuator) getKubeadmTokenOnMaster(c *clusterv1.Cluster, m *clusterv1.Machine) (string, error) {
if m.ObjectMeta.DeletionTimestamp != nil {
// No need to create token on a delete.
return "", nil
}
masterSSHClient, err := a.getMasterSSHClient(c, m)
if err != nil {
glog.Error("Error getting master sshClient")
return "", err
}
glog.Infof("Creating token on master, machine %s cluster %s", m.Name, c.Name)
// TODO use kubeadm ttl option and try without full path
output, err := masterSSHClient.ProcessCMDWithOutput("sudo /usr/bin/kubeadm token create")
Expand Down Expand Up @@ -224,66 +265,100 @@ func (a *Actuator) updateClusterObjectEndpoint(c *clusterv1.Cluster, m *clusterv
return nil
}

func (a *Actuator) updateMasterInplace(c *clusterv1.Cluster, oldMachine *clusterv1.Machine, newMachine *clusterv1.Machine) error {
func (a *Actuator) updateMasterInPlace(c *clusterv1.Cluster, oldMachine *clusterv1.Machine, newMachine *clusterv1.Machine) error {
glog.Infof("updating master node %s", oldMachine.Name)
machineConfig, err := a.machineProviderConfig(newMachine.Spec.ProviderConfig)
if err != nil {
return err
}

privateKey, passPhrase, err := a.getPrivateKey(c, newMachine.Namespace, machineConfig.SSHConfig.SecretName)
if err != nil {
return err
}

sshClient := ssh.NewSSHProviderClient(privateKey, passPhrase, machineConfig.SSHConfig)

// Upgrade ControlPlane items
// Perform kubeadm upgrade on the ControlPlane
if oldMachine.Spec.Versions.ControlPlane != newMachine.Spec.Versions.ControlPlane {
glog.Infof("Updating master node %s; controlplane version from %s to %s.", oldMachine.Name, oldMachine.Spec.Versions.ControlPlane, newMachine.Spec.Versions.ControlPlane)
cmd := fmt.Sprintf(upgradeControlPlaneCmd, newMachine.Spec.Versions.ControlPlane)
glog.Infof("updateControlPlaneCmd = %s", cmd)

cmd := fmt.Sprintf(
"curl -sSL https://dl.k8s.io/release/v%s/bin/linux/amd64/kubeadm | sudo tee /usr/bin/kubeadm > /dev/null; "+
"sudo chmod a+rx /usr/bin/kubeadm", newMachine.Spec.Versions.ControlPlane)
err := sshClient.ProcessCMD(cmd)
if err != nil {
glog.Errorf("could not install kubeadm binary: %v", err)
glog.Errorf("Could not perform kubeadm upgrade on ControlPlane: %v", err)
return err
}
}
// Upgrade ControlPlane packages (kubelet)
if oldMachine.Spec.Versions.Kubelet != newMachine.Spec.Versions.Kubelet {
glog.Infof("updating master node %s; kubelet version from %s to %s.", oldMachine.Name, oldMachine.Spec.Versions.Kubelet, newMachine.Spec.Versions.Kubelet)

node, err := a.getNodeForMachine(c, newMachine)
if err != nil {
return errors.New("updateMasterInPlace Error getting node name for machine")
}
cmd := fmt.Sprintf(upgradeMasterPackagesCmd, node, newMachine.Spec.Versions.Kubelet)
glog.Infof("upgradeMasterPackagesCmd = %s", cmd)

// Upgrade control plane.
cmd = fmt.Sprintf("sudo kubeadm upgrade apply %s -y", "v"+newMachine.Spec.Versions.ControlPlane)
err = sshClient.ProcessCMD(cmd)
if err != nil {
glog.Errorf("failed to upgrade to new version %s: %v", newMachine.Spec.Versions.ControlPlane, err)
glog.Errorf("Could not upgrade Kubelet version: %s-00 on ControlPlane %s: %s", newMachine.Spec.Versions.Kubelet, newMachine.Name, err)
return err
}
}
glog.Infof("updating master node %s; done.", oldMachine.Name)

return nil
}

func (a *Actuator) updateWorkerInPlace(c *clusterv1.Cluster, oldMachine *clusterv1.Machine, newMachine *clusterv1.Machine) error {
glog.Infof("updating worker node %s", oldMachine.Name)
machineConfig, err := a.machineProviderConfig(newMachine.Spec.ProviderConfig)
if err != nil {
return err
}
privateKey, passPhrase, err := a.getPrivateKey(c, newMachine.Namespace, machineConfig.SSHConfig.SecretName)
if err != nil {
return err
}
sshClient := ssh.NewSSHProviderClient(privateKey, passPhrase, machineConfig.SSHConfig)
masterSSHClient, err := a.getMasterSSHClient(c, newMachine)
if err != nil {
glog.Error("updateWorkerInPlace Error getting master sshClient")
return err
}

// Upgrade Kubelet.
// Upgrade Worker packages (kubelet)
if oldMachine.Spec.Versions.Kubelet != newMachine.Spec.Versions.Kubelet {
glog.Infof("updating master node %s; kubelet version from %s to %s.", oldMachine.Name, oldMachine.Spec.Versions.Kubelet, newMachine.Spec.Versions.Kubelet)
cmd := fmt.Sprintf("sudo kubectl drain %s --kubeconfig /etc/kubernetes/admin.conf --ignore-daemonsets", newMachine.Name)
// The errors are intentionally ignored as master has static pods.
_ = sshClient.ProcessCMD(cmd)
glog.Infof("updating worker node %s; kubelet version from %s to %s.", oldMachine.Name, oldMachine.Spec.Versions.Kubelet, newMachine.Spec.Versions.Kubelet)

// Upgrade kubelet to desired version.
cmd = fmt.Sprintf("sudo apt install kubelet=%s-00", newMachine.Spec.Versions.Kubelet)
err = sshClient.ProcessCMD(cmd)
node, err := a.getNodeForMachine(c, newMachine)
if err != nil {
return errors.New("updateWorkerInPlace Error getting node name for machine")
}
drainCmd := fmt.Sprintf(drainWorkerCmd, node)
glog.Infof("drainWorkerCmd = %s", drainCmd)
err = masterSSHClient.ProcessCMD(drainCmd)
if err != nil {
glog.Errorf("could not apt install Kubelet version: %s-00", newMachine.Spec.Versions.Kubelet+"-00: %v", err)
glog.Errorf("Failed to drain worker node %s for machine %s: %s", node, newMachine.Name, err)
return err
}

cmd = fmt.Sprintf("sudo kubectl uncordon %s --kubeconfig /etc/kubernetes/admin.conf", newMachine.Name)
err = sshClient.ProcessCMD(cmd)
upgradePkgCmd := fmt.Sprintf(upgradeWorkerPackagesCmd, newMachine.Spec.Versions.Kubelet)
glog.Infof("upgradeWorkerPackagesCmd = %s", upgradePkgCmd)
err = sshClient.ProcessCMD(upgradePkgCmd)
if err != nil {
glog.Errorf("Could not upgrade Kubelet version: %s-00 on Worker %s: %s", newMachine.Spec.Versions.Kubelet, newMachine.Name, err)
return err
}
uncordonCmd := fmt.Sprintf(uncordonWorkerCmd, node)
glog.Infof("uncordonWorkerCmd = %s", uncordonCmd)
err = masterSSHClient.ProcessCMD(uncordonCmd)
if err != nil {
glog.Errorf("failed to uncordon the node: %s: %v", newMachine.Name, err)
glog.Errorf("Failed to uncordon worker node %s for machine %s: %s", node, newMachine.Name, err)
return err
}
}

glog.Infof("updating master node %s; done.", oldMachine.Name)
glog.Infof("updating worker node %s; done.", oldMachine.Name)

return nil
}
Expand Down
3 changes: 2 additions & 1 deletion cloud/ssh/sshproviderclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"os"

"github.com/golang/glog"
"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh/providerconfig/v1alpha1"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
"github.com/samsung-cnct/cluster-api-provider-ssh/cloud/ssh/providerconfig/v1alpha1"
)

const (
Expand All @@ -17,6 +17,7 @@ const (

type SSHProviderClientInterface interface {
ProcessCMD(cmd string) error
ProcessCMDWithOutput(cmd string) ([]byte, error)
WritePublicKeys(machineSSHConfig v1alpha1.SSHConfig) error
DeletePublicKeys(machineSSHConfig v1alpha1.SSHConfig) error
GetKubeConfig() (string, error)
Expand Down

0 comments on commit d90167e

Please sign in to comment.