Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(main): using new runtime interface #3691

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/apply/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewClusterFromGenArgs(cmd *cobra.Command, args *RunArgs, imageNames []strin
if err != nil {
return nil, err
}
return rtInterface.GetKubeadmConfig()
return rtInterface.GetConfig()
}

func genImageInfo(imageName string) (*v1beta1.MountImage, error) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/apply/processor/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ func (c *CreateProcessor) Init(_ *v2.Cluster) error {

func (c *CreateProcessor) Join(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Join in CreateProcessor.")
err := c.Runtime.JoinMasters(cluster.GetMasterIPAndPortList()[1:])
if err != nil {
return err
}
err = c.Runtime.JoinNodes(cluster.GetNodeIPAndPortList())
err := c.Runtime.ScaleUp(cluster.GetMasterIPAndPortList()[1:], cluster.GetNodeIPAndPortList())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/processor/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *InstallProcessor) UpgradeIfNeed(cluster *v2.Cluster) error {
if version == "" {
continue
}
err := c.Runtime.UpgradeCluster(version)
err := c.Runtime.Upgrade(version)
if err != nil {
logger.Error("upgrade cluster failed")
return err
Expand Down
11 changes: 2 additions & 9 deletions pkg/apply/processor/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,10 @@ func (c *ScaleProcessor) RunGuest(cluster *v2.Cluster) error {

func (c *ScaleProcessor) Delete(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Delete in ScaleProcessor.")
err := c.Runtime.DeleteMasters(c.MastersToDelete)
err := c.Runtime.ScaleDown(c.MastersToDelete, c.NodesToDelete)
if err != nil {
return err
}
if err = c.Runtime.DeleteNodes(c.NodesToDelete); err != nil {
return err
}
if len(c.MastersToDelete) > 0 {
return c.Runtime.SyncNodeIPVS(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList())
}
Expand All @@ -131,11 +128,7 @@ func (c *ScaleProcessor) Delete(cluster *v2.Cluster) error {

func (c *ScaleProcessor) Join(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Join in ScaleProcessor.")
err := c.Runtime.JoinMasters(c.MastersToJoin)
if err != nil {
return err
}
err = c.Runtime.JoinNodes(c.NodesToJoin)
err := c.Runtime.ScaleUp(c.MastersToJoin, c.NodesToJoin)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client-go/kubernetes/expansion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGetKubeadmConfig(t *testing.T) {
ke := NewKubeExpansion(tt.args.client)
got, err := ke.FetchKubeadmConfig(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("GetKubeadmConfig() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
t.Logf("%+v", got)
Expand Down
63 changes: 31 additions & 32 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (k *KubeadmRuntime) Init() error {
return k.pipeline("init", pipeline)
}

func (k *KubeadmRuntime) GetKubeadmConfig() ([]byte, error) {
func (k *KubeadmRuntime) GetConfig() ([]byte, error) {
k.KubeadmConfig = k.ClusterFileKubeConfig
if err := k.ConvertInitConfigConversion(); err != nil {
return nil, err
Expand All @@ -80,49 +80,48 @@ func (k *KubeadmRuntime) GetKubeadmConfig() ([]byte, error) {
type Interface interface {
Init() error
Reset() error
JoinNodes(newNodesIPList []string) error
DeleteNodes(nodeIPList []string) error
JoinMasters(newMastersIPList []string) error
DeleteMasters(mastersIPList []string) error
ScaleUp(newMasterIPList []string, newNodeIPList []string) error
ScaleDown(deleteMastersIPList []string, deleteNodesIPList []string) error
SyncNodeIPVS(mastersIPList, nodeIPList []string) error
Upgrade(version string) error
GetConfig() ([]byte, error)

UpdateCert(certs []string) error
UpgradeCluster(version string) error
GetKubeadmConfig() ([]byte, error)
}

func (k *KubeadmRuntime) Reset() error {
logger.Info("start to delete Cluster: master %s, node %s", k.getMasterIPList(), k.getNodeIPList())
return k.reset()
}

func (k *KubeadmRuntime) JoinNodes(newNodesIPList []string) error {
if len(newNodesIPList) != 0 {
logger.Info("%s will be added as worker", newNodesIPList)
}
if err := k.joinNodes(newNodesIPList); err != nil {
return err
}
return k.copyNodeKubeConfig(newNodesIPList)
}
func (k *KubeadmRuntime) DeleteNodes(nodesIPList []string) error {
if len(nodesIPList) != 0 {
logger.Info("worker %s will be deleted", nodesIPList)
func (k *KubeadmRuntime) ScaleUp(newMasterIPList []string, newNodeIPList []string) error {
if len(newMasterIPList) != 0 {
logger.Info("%s will be added as master", newMasterIPList)
if err := k.joinMasters(newMasterIPList); err != nil {
return err
}
}
if len(newNodeIPList) != 0 {
logger.Info("%s will be added as worker", newNodeIPList)
if err := k.joinNodes(newNodeIPList); err != nil {
return err
}
return k.copyNodeKubeConfig(newNodeIPList)
}
return k.deleteNodes(nodesIPList)
return nil
}

func (k *KubeadmRuntime) JoinMasters(newMastersIPList []string) error {
if len(newMastersIPList) != 0 {
logger.Info("%s will be added as master", newMastersIPList)
func (k *KubeadmRuntime) ScaleDown(deleteMastersIPList []string, deleteNodesIPList []string) error {
if len(deleteMastersIPList) != 0 {
logger.Info("master %s will be deleted", deleteMastersIPList)
if err := k.deleteMasters(deleteMastersIPList); err != nil {
return err
}
}
return k.joinMasters(newMastersIPList)
}

func (k *KubeadmRuntime) DeleteMasters(mastersIPList []string) error {
if len(mastersIPList) != 0 {
logger.Info("master %s will be deleted", mastersIPList)
if len(deleteNodesIPList) != 0 {
logger.Info("worker %s will be deleted", deleteNodesIPList)
return k.deleteNodes(deleteNodesIPList)
}
return k.deleteMasters(mastersIPList)
return nil
}

func newKubeadmRuntime(cluster *v2.Cluster, kubeadm *KubeadmConfig) (Interface, error) {
Expand Down Expand Up @@ -168,7 +167,7 @@ func (k *KubeadmRuntime) Validate() error {
return nil
}

func (k *KubeadmRuntime) UpgradeCluster(version string) error {
func (k *KubeadmRuntime) Upgrade(version string) error {
currVersion := k.getKubeVersionFromImage()

v0, err := semver.NewVersion(currVersion)
Expand Down