From 9dbed1adc47ea0e48c3b1d8d89f9776418d7ae21 Mon Sep 17 00:00:00 2001 From: cuisongliu Date: Tue, 22 Aug 2023 13:50:54 +0800 Subject: [PATCH] refactor(main): using new runtime interface (#3691) * refactor(main): using new runtime interface Signed-off-by: cuisongliu * feature(main): using up and down interface Signed-off-by: cuisongliu * feature(main): using up and down interface Signed-off-by: cuisongliu * feature(main): using up and down interface Signed-off-by: cuisongliu --------- Signed-off-by: cuisongliu --- pkg/apply/gen.go | 2 +- pkg/apply/processor/create.go | 6 +-- pkg/apply/processor/install.go | 2 +- pkg/apply/processor/scale.go | 11 +--- pkg/client-go/kubernetes/expansion_test.go | 2 +- pkg/runtime/runtime.go | 63 +++++++++++----------- 6 files changed, 37 insertions(+), 49 deletions(-) diff --git a/pkg/apply/gen.go b/pkg/apply/gen.go index 091aa6299d0..166506100ff 100644 --- a/pkg/apply/gen.go +++ b/pkg/apply/gen.go @@ -56,7 +56,7 @@ func NewClusterFromGenArgs(cmd *cobra.Command, args *RunArgs, imageNames []strin if err != nil { return nil, err } - return rtInterface.GetKubeadmConfig() + return rtInterface.GetConfig() } func genImageInfo(imageName string) (*v1beta1.MountImage, error) { diff --git a/pkg/apply/processor/create.go b/pkg/apply/processor/create.go index 44c107224f7..fa4e8f33f8b 100644 --- a/pkg/apply/processor/create.go +++ b/pkg/apply/processor/create.go @@ -151,11 +151,7 @@ func (c *CreateProcessor) Init(_ *v2.Cluster) error { func (c *CreateProcessor) Join(cluster *v2.Cluster) error { logger.Info("Executing pipeline Join in CreateProcessor.") - err := c.Runtime.JoinMasters(cluster.GetMasterIPAndPortList()[1:]) - if err != nil { - return err - } - err = c.Runtime.JoinNodes(cluster.GetNodeIPAndPortList()) + err := c.Runtime.ScaleUp(cluster.GetMasterIPAndPortList()[1:], cluster.GetNodeIPAndPortList()) if err != nil { return err } diff --git a/pkg/apply/processor/install.go b/pkg/apply/processor/install.go index cc119daf3a4..391ff29bf54 100644 --- a/pkg/apply/processor/install.go +++ b/pkg/apply/processor/install.go @@ -184,7 +184,7 @@ func (c *InstallProcessor) UpgradeIfNeed(cluster *v2.Cluster) error { if version == "" { continue } - err := c.Runtime.UpgradeCluster(version) + err := c.Runtime.Upgrade(version) if err != nil { logger.Error("upgrade cluster failed") return err diff --git a/pkg/apply/processor/scale.go b/pkg/apply/processor/scale.go index 24733575bff..ed247010a54 100644 --- a/pkg/apply/processor/scale.go +++ b/pkg/apply/processor/scale.go @@ -116,13 +116,10 @@ func (c *ScaleProcessor) RunGuest(cluster *v2.Cluster) error { func (c *ScaleProcessor) Delete(cluster *v2.Cluster) error { logger.Info("Executing pipeline Delete in ScaleProcessor.") - err := c.Runtime.DeleteMasters(c.MastersToDelete) + err := c.Runtime.ScaleDown(c.MastersToDelete, c.NodesToDelete) if err != nil { return err } - if err = c.Runtime.DeleteNodes(c.NodesToDelete); err != nil { - return err - } if len(c.MastersToDelete) > 0 { return c.Runtime.SyncNodeIPVS(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()) } @@ -131,11 +128,7 @@ func (c *ScaleProcessor) Delete(cluster *v2.Cluster) error { func (c *ScaleProcessor) Join(cluster *v2.Cluster) error { logger.Info("Executing pipeline Join in ScaleProcessor.") - err := c.Runtime.JoinMasters(c.MastersToJoin) - if err != nil { - return err - } - err = c.Runtime.JoinNodes(c.NodesToJoin) + err := c.Runtime.ScaleUp(c.MastersToJoin, c.NodesToJoin) if err != nil { return err } diff --git a/pkg/client-go/kubernetes/expansion_test.go b/pkg/client-go/kubernetes/expansion_test.go index b689140073f..3c71b82089a 100644 --- a/pkg/client-go/kubernetes/expansion_test.go +++ b/pkg/client-go/kubernetes/expansion_test.go @@ -46,7 +46,7 @@ func TestGetKubeadmConfig(t *testing.T) { ke := NewKubeExpansion(tt.args.client) got, err := ke.FetchKubeadmConfig(context.Background()) if (err != nil) != tt.wantErr { - t.Errorf("GetKubeadmConfig() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetConfig() error = %v, wantErr %v", err, tt.wantErr) return } t.Logf("%+v", got) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index b12068ce4f2..8a799c2ced8 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -57,7 +57,7 @@ func (k *KubeadmRuntime) Init() error { return k.pipeline("init", pipeline) } -func (k *KubeadmRuntime) GetKubeadmConfig() ([]byte, error) { +func (k *KubeadmRuntime) GetConfig() ([]byte, error) { k.KubeadmConfig = k.ClusterFileKubeConfig if err := k.ConvertInitConfigConversion(); err != nil { return nil, err @@ -80,49 +80,48 @@ func (k *KubeadmRuntime) GetKubeadmConfig() ([]byte, error) { type Interface interface { Init() error Reset() error - JoinNodes(newNodesIPList []string) error - DeleteNodes(nodeIPList []string) error - JoinMasters(newMastersIPList []string) error - DeleteMasters(mastersIPList []string) error + ScaleUp(newMasterIPList []string, newNodeIPList []string) error + ScaleDown(deleteMastersIPList []string, deleteNodesIPList []string) error SyncNodeIPVS(mastersIPList, nodeIPList []string) error + Upgrade(version string) error + GetConfig() ([]byte, error) + UpdateCert(certs []string) error - UpgradeCluster(version string) error - GetKubeadmConfig() ([]byte, error) } func (k *KubeadmRuntime) Reset() error { logger.Info("start to delete Cluster: master %s, node %s", k.getMasterIPList(), k.getNodeIPList()) return k.reset() } - -func (k *KubeadmRuntime) JoinNodes(newNodesIPList []string) error { - if len(newNodesIPList) != 0 { - logger.Info("%s will be added as worker", newNodesIPList) - } - if err := k.joinNodes(newNodesIPList); err != nil { - return err - } - return k.copyNodeKubeConfig(newNodesIPList) -} -func (k *KubeadmRuntime) DeleteNodes(nodesIPList []string) error { - if len(nodesIPList) != 0 { - logger.Info("worker %s will be deleted", nodesIPList) +func (k *KubeadmRuntime) ScaleUp(newMasterIPList []string, newNodeIPList []string) error { + if len(newMasterIPList) != 0 { + logger.Info("%s will be added as master", newMasterIPList) + if err := k.joinMasters(newMasterIPList); err != nil { + return err + } + } + if len(newNodeIPList) != 0 { + logger.Info("%s will be added as worker", newNodeIPList) + if err := k.joinNodes(newNodeIPList); err != nil { + return err + } + return k.copyNodeKubeConfig(newNodeIPList) } - return k.deleteNodes(nodesIPList) + return nil } -func (k *KubeadmRuntime) JoinMasters(newMastersIPList []string) error { - if len(newMastersIPList) != 0 { - logger.Info("%s will be added as master", newMastersIPList) +func (k *KubeadmRuntime) ScaleDown(deleteMastersIPList []string, deleteNodesIPList []string) error { + if len(deleteMastersIPList) != 0 { + logger.Info("master %s will be deleted", deleteMastersIPList) + if err := k.deleteMasters(deleteMastersIPList); err != nil { + return err + } } - return k.joinMasters(newMastersIPList) -} - -func (k *KubeadmRuntime) DeleteMasters(mastersIPList []string) error { - if len(mastersIPList) != 0 { - logger.Info("master %s will be deleted", mastersIPList) + if len(deleteNodesIPList) != 0 { + logger.Info("worker %s will be deleted", deleteNodesIPList) + return k.deleteNodes(deleteNodesIPList) } - return k.deleteMasters(mastersIPList) + return nil } func newKubeadmRuntime(cluster *v2.Cluster, kubeadm *KubeadmConfig) (Interface, error) { @@ -168,7 +167,7 @@ func (k *KubeadmRuntime) Validate() error { return nil } -func (k *KubeadmRuntime) UpgradeCluster(version string) error { +func (k *KubeadmRuntime) Upgrade(version string) error { currVersion := k.getKubeVersionFromImage() v0, err := semver.NewVersion(currVersion)