diff --git a/pkg/engine/doc.go b/pkg/engine/doc.go new file mode 100644 index 00000000..eba2add3 --- /dev/null +++ b/pkg/engine/doc.go @@ -0,0 +1,43 @@ +// Package engine holds code that drive the engine of Kusion. +// +// Kusion Engine is a middle layer between Spec and the actual infrastructure. +// The major function of this engine is to parse the Spec and to turn all actual infra resources into the desired state +// described in the Spec and almost all operations from the command line will invoke this engine to finish their jobs. +// +// It consists of 3 parts: +// +// 1. Operation Engine: this part is the entrypoint of the whole Kusion Engine and is responsible for kusion basic operations like Preview, +// Apply, Destroy, etc. The main workflow of this part is to parse resources in the Spec, figure out which resource should be modified +// according to specified operation type, and execute this operation to the real infra resources. During this workflow, +// the following two parts will be involved. +// +// 2. Runtime: it is an interface between the actual infrastructure and Kusion. All operations that trying to manipulate a resource +// should be delegated to one Runtime to make this operation effect in the actual infrastructure +// +// 3. State: state is a record of an operation's result. It is often used as a datasource for 3-way merge/diff in operations like Apply or Preview +// +// Let's get operation Preview as an example to demonstrate how the three parts cooperate in an actual operation. +// +// +-------------+ +// | Operation | +// | Preview | +// +-------------+ +// | +// | +// +-------------+ +// | Operation | +// +---------| Engine |----------+ +// | +-------------+ | +// | | +// +-------------+ +-------------+ +// | State | | Runtime | +// +-------------+ +-------------+ +// +// 1. parse resources in the Spec and convert into a DAG +// +// 2. Walk this DAG: +// a) get the latest state from the actual infra by the Runtime +// b) get last operation state from the State +// +// 3. Diff the two states(live state and prior state) and return the details of these diffs to cmd +package engine diff --git a/pkg/engine/models/resource.go b/pkg/engine/models/resource.go index 729574ee..a923539c 100644 --- a/pkg/engine/models/resource.go +++ b/pkg/engine/models/resource.go @@ -15,10 +15,10 @@ type Resource struct { Attributes map[string]interface{} `json:"attributes"` // DependsOn contains all resources this resource depends on - DependsOn []string `json:"dependsOn,omitempty" yaml:"dependsOn,omitempty"` + DependsOn []string `json:"dependsOn,omitempty"` // Extensions specifies arbitrary metadata of this resource - Extensions map[string]interface{} `json:"extensions"` + Extensions map[string]interface{} `json:"extensions,omitempty"` } func (r *Resource) ResourceKey() string { diff --git a/pkg/engine/models/spec.go b/pkg/engine/models/spec.go index e61404a5..8800c8d9 100644 --- a/pkg/engine/models/spec.go +++ b/pkg/engine/models/spec.go @@ -1,6 +1,6 @@ package models -// Spec represent the KCL compile result +// Spec represents desired state of resources in one stack and will be applied to the actual infrastructure by the Kusion Engine type Spec struct { Resources Resources `json:"resources"` } diff --git a/pkg/engine/operation/apply.go b/pkg/engine/operation/apply.go index 17181f28..8e0b8d1c 100644 --- a/pkg/engine/operation/apply.go +++ b/pkg/engine/operation/apply.go @@ -5,6 +5,12 @@ import ( "fmt" "sync" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/graph" + "kusionstack.io/kusion/pkg/engine/operation/parser" + "kusionstack.io/kusion/pkg/engine/operation/types" + "github.com/hashicorp/terraform/dag" "github.com/hashicorp/terraform/tfdiags" @@ -15,11 +21,11 @@ import ( ) type ApplyOperation struct { - Operation + opsmodels.Operation } type ApplyRequest struct { - Request `json:",inline" yaml:",inline"` + opsmodels.Request `json:",inline" yaml:",inline"` } type ApplyResponse struct { @@ -27,25 +33,31 @@ type ApplyResponse struct { } func NewApplyGraph(m *models.Spec, priorState *states.State) (*dag.AcyclicGraph, status.Status) { - manifestParser := NewManifestParser(m) - graph := &dag.AcyclicGraph{} - graph.Add(&RootNode{}) + specParser := parser.NewSpecParser(m) + g := &dag.AcyclicGraph{} + g.Add(&graph.RootNode{}) - s := manifestParser.Parse(graph) + s := specParser.Parse(g) if status.IsErr(s) { return nil, s } - deleteResourceParser := NewDeleteResourceParser(priorState.Resources) - s = deleteResourceParser.Parse(graph) + deleteResourceParser := parser.NewDeleteResourceParser(priorState.Resources) + s = deleteResourceParser.Parse(g) if status.IsErr(s) { return nil, s } - return graph, s + return g, s } -func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.Status) { - log.Infof("engine Apply start!") +// Apply means turn all actual infra resources into the desired state described in the request by invoking a specified Runtime. +// Like other operations, Apply has 3 main steps during the whole process. +// 1. parse resources and their relationship to build a DAG and should take care of those resources that will be deleted +// 2. walk this DAG and execute all graph nodes concurrently, besides the entire process should follow dependencies in this DAG +// 3. during the execution of each node, it will invoke different runtime according to the resource type +func (ao *ApplyOperation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.Status) { + log.Infof("engine: Apply start!") + o := ao.Operation defer func() { close(o.MsgCh) @@ -69,32 +81,32 @@ func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status. } // 1. init & build Indexes - priorState, resultState := initStates(o.StateStorage, &request.Request) + priorState, resultState := o.InitStates(&request.Request) priorStateResourceIndex := priorState.Resources.Index() // 2. build & walk DAG - graph, s := NewApplyGraph(request.Manifest, priorState) + applyGraph, s := NewApplyGraph(request.Spec, priorState) if status.IsErr(s) { return nil, s } - log.Infof("Apply Graph:%s", graph.String()) + log.Infof("Apply Graph:%s", applyGraph.String()) applyOperation := &ApplyOperation{ - Operation: Operation{ - OperationType: Apply, + Operation: opsmodels.Operation{ + OperationType: types.Apply, StateStorage: o.StateStorage, CtxResourceIndex: map[string]*models.Resource{}, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, Runtime: o.Runtime, MsgCh: o.MsgCh, - resultState: resultState, - lock: &sync.Mutex{}, + ResultState: resultState, + Lock: &sync.Mutex{}, }, } w := &dag.Walker{Callback: applyOperation.applyWalkFun} - w.Update(graph) + w.Update(applyGraph) // Wait if diags := w.Wait(); diags.HasErrors() { st = status.NewErrorStatus(diags.Err()) @@ -104,11 +116,12 @@ func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status. return &ApplyResponse{State: resultState}, nil } -func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { +func (ao *ApplyOperation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { var s status.Status if v == nil { return nil } + o := &ao.Operation defer func() { if e := recover(); e != nil { @@ -127,15 +140,15 @@ func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { } }() - if node, ok := v.(ExecutableNode); ok { - if rn, ok2 := v.(*ResourceNode); ok2 { - o.MsgCh <- Message{rn.Hashcode().(string), "", nil} + if node, ok := v.(graph.ExecutableNode); ok { + if rn, ok2 := v.(*graph.ResourceNode); ok2 { + o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string)} s = node.Execute(o) if status.IsErr(s) { - o.MsgCh <- Message{rn.Hashcode().(string), Failed, fmt.Errorf("node execte failed, status: %v", s)} + o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string), OpResult: opsmodels.Failed, OpErr: fmt.Errorf("node execte failed, status: %v", s)} } else { - o.MsgCh <- Message{rn.Hashcode().(string), Success, nil} + o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string), OpResult: opsmodels.Success} } } else { s = node.Execute(o) @@ -147,19 +160,19 @@ func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { return diags } -func validateRequest(request *Request) status.Status { +func validateRequest(request *opsmodels.Request) status.Status { var s status.Status if request == nil { return status.NewErrorStatusWithMsg(status.InvalidArgument, "request is nil") } - if request.Manifest == nil { + if request.Spec == nil { return status.NewErrorStatusWithMsg(status.InvalidArgument, "request.Spec is empty. If you want to delete all resources, please use command 'destroy'") } resourceKeyMap := make(map[string]bool) - for _, resource := range request.Manifest.Resources { + for _, resource := range request.Spec.Resources { key := resource.ResourceKey() if _, ok := resourceKeyMap[key]; ok { return status.NewErrorStatusWithMsg(status.InvalidArgument, fmt.Sprintf("Duplicate resource:%s in request.", key)) diff --git a/pkg/engine/operation/apply_test.go b/pkg/engine/operation/apply_test.go index 7264f4d7..c644bee7 100644 --- a/pkg/engine/operation/apply_test.go +++ b/pkg/engine/operation/apply_test.go @@ -9,6 +9,11 @@ import ( "sync" "testing" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/graph" + "kusionstack.io/kusion/pkg/engine/operation/types" + "bou.ke/monkey" _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" @@ -21,7 +26,7 @@ import ( func Test_validateRequest(t *testing.T) { type args struct { - request *Request + request *opsmodels.Request } tests := []struct { name string @@ -31,7 +36,7 @@ func Test_validateRequest(t *testing.T) { { name: "t1", args: args{ - request: &Request{}, + request: &opsmodels.Request{}, }, want: status.NewErrorStatusWithMsg(status.InvalidArgument, "request.Spec is empty. If you want to delete all resources, please use command 'destroy'"), @@ -39,8 +44,8 @@ func Test_validateRequest(t *testing.T) { { name: "t2", args: args{ - request: &Request{ - Manifest: &models.Spec{Resources: []models.Resource{}}, + request: &opsmodels.Request{ + Spec: &models.Spec{Resources: []models.Resource{}}, }, }, want: nil, @@ -57,14 +62,14 @@ func Test_validateRequest(t *testing.T) { func TestOperation_Apply(t *testing.T) { type fields struct { - OperationType Type + OperationType types.OperationType StateStorage states.StateStorage CtxResourceIndex map[string]*models.Resource PriorStateResourceIndex map[string]*models.Resource StateResourceIndex map[string]*models.Resource - Order *ChangeOrder + Order *opsmodels.ChangeOrder Runtime runtime.Runtime - MsgCh chan Message + MsgCh chan opsmodels.Message resultState *states.State lock *sync.Mutex } @@ -113,17 +118,17 @@ func TestOperation_Apply(t *testing.T) { { name: "apply test", fields: fields{ - OperationType: Apply, + OperationType: types.Apply, StateStorage: &states.FileSystemState{Path: filepath.Join("test_data", states.KusionState)}, Runtime: &runtime.KubernetesRuntime{}, - MsgCh: make(chan Message, 5), + MsgCh: make(chan opsmodels.Message, 5), }, - args: args{applyRequest: &ApplyRequest{Request{ + args: args{applyRequest: &ApplyRequest{opsmodels.Request{ Tenant: "fakeTenant", Stack: "fakeStack", Project: "fakeProject", Operator: "faker", - Manifest: mf, + Spec: mf, }}}, wantRsp: &ApplyResponse{rs}, wantSt: nil, @@ -132,25 +137,28 @@ func TestOperation_Apply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - o := &Operation{ + o := &opsmodels.Operation{ OperationType: tt.fields.OperationType, StateStorage: tt.fields.StateStorage, CtxResourceIndex: tt.fields.CtxResourceIndex, PriorStateResourceIndex: tt.fields.PriorStateResourceIndex, StateResourceIndex: tt.fields.StateResourceIndex, - Order: tt.fields.Order, + ChangeOrder: tt.fields.Order, Runtime: tt.fields.Runtime, MsgCh: tt.fields.MsgCh, - resultState: tt.fields.resultState, - lock: tt.fields.lock, + ResultState: tt.fields.resultState, + Lock: tt.fields.lock, + } + ao := &ApplyOperation{ + Operation: *o, } - monkey.Patch((*ResourceNode).Execute, func(rn *ResourceNode, operation *Operation) status.Status { - o.resultState = rs + monkey.Patch((*graph.ResourceNode).Execute, func(rn *graph.ResourceNode, operation *opsmodels.Operation) status.Status { + o.ResultState = rs return nil }) - gotRsp, gotSt := o.Apply(tt.args.applyRequest) + gotRsp, gotSt := ao.Apply(tt.args.applyRequest) assert.Equalf(t, tt.wantRsp.State.Stack, gotRsp.State.Stack, "Apply(%v)", tt.args.applyRequest) assert.Equalf(t, tt.wantSt, gotSt, "Apply(%v)", tt.args.applyRequest) }) diff --git a/pkg/engine/operation/destory.go b/pkg/engine/operation/destory.go index 40522e25..0821442f 100644 --- a/pkg/engine/operation/destory.go +++ b/pkg/engine/operation/destory.go @@ -5,6 +5,15 @@ import ( "fmt" "sync" + "kusionstack.io/kusion/pkg/engine/operation/graph" + + "github.com/hashicorp/terraform/tfdiags" + + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/parser" + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "github.com/hashicorp/terraform/dag" @@ -14,26 +23,30 @@ import ( ) type DestroyOperation struct { - Operation + opsmodels.Operation } type DestroyRequest struct { - Request `json:",inline" yaml:",inline"` + opsmodels.Request `json:",inline" yaml:",inline"` } func NewDestroyGraph(resource models.Resources) (*dag.AcyclicGraph, status.Status) { - graph := &dag.AcyclicGraph{} - graph.Add(&RootNode{}) - deleteResourceParser := NewDeleteResourceParser(resource) - s := deleteResourceParser.Parse(graph) + ag := &dag.AcyclicGraph{} + ag.Add(&graph.RootNode{}) + deleteResourceParser := parser.NewDeleteResourceParser(resource) + s := deleteResourceParser.Parse(ag) if status.IsErr(s) { return nil, s } - return graph, s + return ag, s } -func (o *Operation) Destroy(request *DestroyRequest) (st status.Status) { +// Destroy will delete all resources in this request. The whole process is similar to the operation Apply, +// but every node's execution is deleting the resource. +func (do *DestroyOperation) Destroy(request *DestroyRequest) (st status.Status) { + o := do.Operation + defer func() { close(o.MsgCh) if e := recover(); e != nil { @@ -55,33 +68,33 @@ func (o *Operation) Destroy(request *DestroyRequest) (st status.Status) { } // 1. init & build Indexes - _, resultState := initStates(o.StateStorage, &request.Request) + _, resultState := o.InitStates(&request.Request) // replace priorState.Resources with models.Resources, so we do Delete in all nodes - resources := request.Request.Manifest.Resources + resources := request.Request.Spec.Resources priorStateResourceIndex := resources.Index() // 2. build & walk DAG - graph, s := NewDestroyGraph(resources) + destroyGraph, s := NewDestroyGraph(resources) if status.IsErr(s) { return s } - do := &DestroyOperation{ - Operation: Operation{ - OperationType: Destroy, + newDo := &DestroyOperation{ + Operation: opsmodels.Operation{ + OperationType: types.Destroy, StateStorage: o.StateStorage, CtxResourceIndex: map[string]*models.Resource{}, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, Runtime: o.Runtime, MsgCh: o.MsgCh, - resultState: resultState, - lock: &sync.Mutex{}, + ResultState: resultState, + Lock: &sync.Mutex{}, }, } - w := &dag.Walker{Callback: do.applyWalkFun} - w.Update(graph) + w := &dag.Walker{Callback: newDo.destroyWalkFun} + w.Update(destroyGraph) // Wait if diags := w.Wait(); diags.HasErrors() { st = status.NewErrorStatus(diags.Err()) @@ -89,3 +102,10 @@ func (o *Operation) Destroy(request *DestroyRequest) (st status.Status) { } return nil } + +func (do *DestroyOperation) destroyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { + ao := &ApplyOperation{ + Operation: do.Operation, + } + return ao.applyWalkFun(v) +} diff --git a/pkg/engine/operation/destory_test.go b/pkg/engine/operation/destory_test.go index ec9d8c5a..9ee558e3 100644 --- a/pkg/engine/operation/destory_test.go +++ b/pkg/engine/operation/destory_test.go @@ -9,6 +9,11 @@ import ( "path/filepath" "testing" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/graph" + "kusionstack.io/kusion/pkg/engine/operation/types" + "bou.ke/monkey" "github.com/stretchr/testify/assert" @@ -34,27 +39,29 @@ func TestOperation_Destroy(t *testing.T) { DependsOn: nil, } mf := &models.Spec{Resources: []models.Resource{resourceState}} - o := &Operation{ - OperationType: Destroy, - StateStorage: &states.FileSystemState{Path: filepath.Join("test_data", states.KusionState)}, - Runtime: &runtime.KubernetesRuntime{}, + o := &DestroyOperation{ + opsmodels.Operation{ + OperationType: types.Destroy, + StateStorage: &states.FileSystemState{Path: filepath.Join("test_data", states.KusionState)}, + Runtime: &runtime.KubernetesRuntime{}, + }, } r := &DestroyRequest{ - Request{ + opsmodels.Request{ Tenant: tenant, Stack: stack, Project: project, Operator: operator, - Manifest: mf, + Spec: mf, }, } t.Run("destroy success", func(t *testing.T) { defer monkey.UnpatchAll() - monkey.Patch((*ResourceNode).Execute, func(rn *ResourceNode, operation *Operation) status.Status { + monkey.Patch((*graph.ResourceNode).Execute, func(rn *graph.ResourceNode, operation *opsmodels.Operation) status.Status { return nil }) - o.MsgCh = make(chan Message, 1) + o.MsgCh = make(chan opsmodels.Message, 1) go readMsgCh(o.MsgCh) st := o.Destroy(r) assert.Nil(t, st) @@ -62,18 +69,18 @@ func TestOperation_Destroy(t *testing.T) { t.Run("destroy failed", func(t *testing.T) { defer monkey.UnpatchAll() - monkey.Patch((*ResourceNode).Execute, func(rn *ResourceNode, operation *Operation) status.Status { + monkey.Patch((*graph.ResourceNode).Execute, func(rn *graph.ResourceNode, operation *opsmodels.Operation) status.Status { return status.NewErrorStatus(errors.New("mock error")) }) - o.MsgCh = make(chan Message, 1) + o.MsgCh = make(chan opsmodels.Message, 1) go readMsgCh(o.MsgCh) st := o.Destroy(r) assert.True(t, status.IsErr(st)) }) } -func readMsgCh(ch chan Message) { +func readMsgCh(ch chan opsmodels.Message) { for { select { case msg, ok := <-ch: diff --git a/pkg/engine/operation/diff.go b/pkg/engine/operation/diff.go index fb32efd8..afe4258f 100644 --- a/pkg/engine/operation/diff.go +++ b/pkg/engine/operation/diff.go @@ -5,8 +5,11 @@ import ( "fmt" "strings" + "kusionstack.io/kusion/pkg/engine/operation/utils" + + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + "github.com/gonvenience/wrap" - "github.com/gonvenience/ytbx" "github.com/pkg/errors" yamlv3 "gopkg.in/yaml.v3" @@ -15,7 +18,7 @@ import ( "kusionstack.io/kusion/pkg/kusionctl/cmd/diff" "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/util" - jsonUtil "kusionstack.io/kusion/pkg/util/json" + jsonutil "kusionstack.io/kusion/pkg/util/json" "kusionstack.io/kusion/third_party/dyff" ) @@ -24,7 +27,7 @@ type Diff struct { } type DiffRequest struct { - Request + opsmodels.Request } func (d *Diff) Diff(request *DiffRequest) (string, error) { @@ -37,10 +40,10 @@ func (d *Diff) Diff(request *DiffRequest) (string, error) { }() util.CheckNotNil(request, "request is nil") - util.CheckNotNil(request.Manifest, "resource is nil") + util.CheckNotNil(request.Spec, "resource is nil") // Get plan state resources - plan := request.Manifest + plan := request.Spec // Get the latest state resources latestState, err := d.StateStorage.GetLatestState( @@ -54,29 +57,29 @@ func (d *Diff) Diff(request *DiffRequest) (string, error) { return "", errors.Wrap(err, "GetLatestState failed") } if latestState == nil { - log.Infof("can't find states by request: %v.", jsonUtil.MustMarshal2String(request)) + log.Infof("can't find states by request: %v.", jsonutil.MustMarshal2String(request)) } // Get diff result return DiffWithRequestResourceAndState(plan, latestState) } func DiffWithRequestResourceAndState(plan *models.Spec, latest *states.State) (string, error) { - planString := jsonUtil.MustMarshal2String(plan.Resources) + planString := jsonutil.MustMarshal2String(plan.Resources) if latest == nil { return DiffReport("", planString, diff.OutputHuman) } else { latestResources := latest.Resources - priorString := jsonUtil.MustMarshal2String(latestResources) + priorString := jsonutil.MustMarshal2String(latestResources) return DiffReport(priorString, planString, diff.OutputHuman) } } func DiffReport(prior, plan, mode string) (string, error) { - from, err := LoadFile(prior, "Last State") + from, err := utils.LoadFile(prior, "Last State") if err != nil { return "", err } - to, err := LoadFile(plan, "Request State") + to, err := utils.LoadFile(plan, "Request State") if err != nil { return "", err } @@ -107,26 +110,6 @@ func buildReport(mode string, report dyff.Report) (string, error) { } } -// LoadFile processes the provided input location to load it as one of the -// supported document formats, or plain text if nothing else works. -func LoadFile(yaml, location string) (ytbx.InputFile, error) { - var ( - documents []*yamlv3.Node - data []byte - err error - ) - - data = []byte(yaml) - if documents, err = ytbx.LoadDocuments(data); err != nil { - return ytbx.InputFile{}, wrap.Errorf(err, "unable to parse data %v", data) - } - - return ytbx.InputFile{ - Location: location, - Documents: documents, - }, nil -} - // WriteReport writes a human-readable report to the provided writer func writeReport(report dyff.Report) (string, error) { reportWriter := &dyff.HumanReport{ diff --git a/pkg/engine/operation/doc.go b/pkg/engine/operation/doc.go new file mode 100644 index 00000000..5b3dd5c2 --- /dev/null +++ b/pkg/engine/operation/doc.go @@ -0,0 +1,2 @@ +// Package operation contains code for basic operations like Apply, Preview and Destroy +package operation diff --git a/pkg/engine/operation/executable_node.go b/pkg/engine/operation/executable_node.go deleted file mode 100644 index bed9b7fd..00000000 --- a/pkg/engine/operation/executable_node.go +++ /dev/null @@ -1,7 +0,0 @@ -package operation - -import "kusionstack.io/kusion/pkg/status" - -type ExecutableNode interface { - Execute(operation *Operation) status.Status -} diff --git a/pkg/engine/operation/graph/executable_node.go b/pkg/engine/operation/graph/executable_node.go new file mode 100644 index 00000000..556241b5 --- /dev/null +++ b/pkg/engine/operation/graph/executable_node.go @@ -0,0 +1,10 @@ +package graph + +import ( + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/status" +) + +type ExecutableNode interface { + Execute(operation *models.Operation) status.Status +} diff --git a/pkg/engine/operation/graph/node.go b/pkg/engine/operation/graph/node.go new file mode 100644 index 00000000..76502718 --- /dev/null +++ b/pkg/engine/operation/graph/node.go @@ -0,0 +1,32 @@ +package graph + +import "kusionstack.io/kusion/pkg/status" + +type baseNode struct { + ID string +} + +func NewBaseNode(id string) (*baseNode, status.Status) { + if id == "" { + return nil, status.NewErrorStatusWithMsg(status.InvalidArgument, "node id can not be nil") + } + return &baseNode{ID: id}, nil +} + +func (b *baseNode) Hashcode() interface{} { + return b.ID +} + +func (b *baseNode) Name() string { + return b.ID +} + +type RootNode struct{} + +func (r *RootNode) Hashcode() interface{} { + return "RootNode" +} + +func (r *RootNode) Name() string { + return "root" +} diff --git a/pkg/engine/operation/resource_node.go b/pkg/engine/operation/graph/resource_node.go similarity index 54% rename from pkg/engine/operation/resource_node.go rename to pkg/engine/operation/graph/resource_node.go index 3cd04302..bafba200 100644 --- a/pkg/engine/operation/resource_node.go +++ b/pkg/engine/operation/graph/resource_node.go @@ -1,4 +1,4 @@ -package operation +package graph import ( "context" @@ -6,24 +6,34 @@ import ( "reflect" "strings" + "kusionstack.io/kusion/pkg/util" + + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/status" - jsonUtil "kusionstack.io/kusion/pkg/util/json" + jsonutil "kusionstack.io/kusion/pkg/util/json" ) type ResourceNode struct { - BaseNode - Action ActionType + *baseNode + Action types.ActionType state *models.Resource } var _ ExecutableNode = (*ResourceNode)(nil) -func (rn *ResourceNode) Execute(operation *Operation) status.Status { +const ( + ImplicitRefPrefix = "$kusion_path." +) + +func (rn *ResourceNode) Execute(operation *opsmodels.Operation) status.Status { log.Debugf("execute node:%s", rn.ID) - if operation.OperationType == Apply { + if operation.OperationType == types.Apply { // replace implicit references value := reflect.ValueOf(rn.state.Attributes) _, implicitValue, s := ParseImplicitRef(value, operation.CtxResourceIndex, ImplicitReplaceFun) @@ -48,17 +58,17 @@ func (rn *ResourceNode) Execute(operation *Operation) status.Status { // 4. compute ActionType of current resource node between planState and liveState switch operation.OperationType { - case Destroy, DestroyPreview: - rn.Action = Delete - case Apply, ApplyPreview: + case types.Destroy, types.DestroyPreview: + rn.Action = types.Delete + case types.Apply, types.ApplyPreview: if liveState == nil { - rn.Action = Create + rn.Action = types.Create } else if planedState == nil { - rn.Action = Delete + rn.Action = types.Delete } else if reflect.DeepEqual(liveState, planedState) { // TODO: need a better comparable func - rn.Action = UnChange + rn.Action = types.UnChange } else { - rn.Action = Update + rn.Action = types.Update } default: return status.NewErrorStatus(fmt.Errorf("unknown operation: %v", operation.OperationType)) @@ -66,16 +76,16 @@ func (rn *ResourceNode) Execute(operation *Operation) status.Status { // 5. apply or return switch operation.OperationType { - case ApplyPreview, DestroyPreview: + case types.ApplyPreview, types.DestroyPreview: fillResponseChangeSteps(operation, rn, priorState, planedState, liveState) - case Apply, Destroy: + case types.Apply, types.Destroy: switch rn.Action { - case Create, Delete, Update: + case types.Create, types.Delete, types.Update: s := rn.applyResource(operation, priorState, planedState) if status.IsErr(s) { return s } - case UnChange: + case types.UnChange: log.Infof("PriorAttributes and PlanAttributes are equal.") default: return status.NewErrorStatus(fmt.Errorf("unknown action:%s", rn.Action.PrettyString())) @@ -87,21 +97,21 @@ func (rn *ResourceNode) Execute(operation *Operation) status.Status { return nil } -func (rn *ResourceNode) applyResource(operation *Operation, priorState, planedState *models.Resource) status.Status { - log.Infof("operation:%v, prior:%v, plan:%v, live:%v", rn.Action, - jsonUtil.Marshal2String(priorState), jsonUtil.Marshal2String(planedState)) +func (rn *ResourceNode) applyResource(operation *opsmodels.Operation, priorState, planedState *models.Resource) status.Status { + log.Infof("operation:%v, prior:%v, plan:%v, live:%v", rn.Action, jsonutil.Marshal2String(priorState), + jsonutil.Marshal2String(planedState)) var res *models.Resource var s status.Status switch rn.Action { - case Create, Update: + case types.Create, types.Update: res, s = operation.Runtime.Apply(context.Background(), priorState, planedState) - log.Debugf("apply resource:%s, result: %v", planedState.ID, jsonUtil.Marshal2String(res)) + log.Debugf("apply resource:%s, result: %v", planedState.ID, jsonutil.Marshal2String(res)) if s != nil { log.Debugf("apply status: %v", s.String()) } - case Delete: + case types.Delete: s = operation.Runtime.Delete(context.Background(), priorState) if s != nil { log.Debugf("delete state: %v", s.String()) @@ -132,27 +142,31 @@ func (rn *ResourceNode) State() *models.Resource { return rn.state } -func NewResourceNode(key string, state *models.Resource, action ActionType) *ResourceNode { - return &ResourceNode{BaseNode: BaseNode{ID: key}, Action: action, state: state} +func NewResourceNode(key string, state *models.Resource, action types.ActionType) (*ResourceNode, status.Status) { + node, s := NewBaseNode(key) + if status.IsErr(s) { + return nil, s + } + return &ResourceNode{baseNode: node, Action: action, state: state}, nil } // save change steps in DAG walking order so that we can preview a full applying list -func fillResponseChangeSteps(operation *Operation, rn *ResourceNode, prior, plan, live interface{}) { - defer operation.lock.Unlock() - operation.lock.Lock() +func fillResponseChangeSteps(ops *opsmodels.Operation, rn *ResourceNode, prior, plan, live interface{}) { + defer ops.Lock.Unlock() + ops.Lock.Lock() - order := operation.Order + order := ops.ChangeOrder if order == nil { - order = &ChangeOrder{ + order = &opsmodels.ChangeOrder{ StepKeys: []string{}, - ChangeSteps: make(map[string]*ChangeStep), + ChangeSteps: make(map[string]*opsmodels.ChangeStep), } } if order.ChangeSteps == nil { - order.ChangeSteps = make(map[string]*ChangeStep) + order.ChangeSteps = make(map[string]*opsmodels.ChangeStep) } order.StepKeys = append(order.StepKeys, rn.ID) - order.ChangeSteps[rn.ID] = NewChangeStep(rn.ID, rn.Action, prior, plan, live) + order.ChangeSteps[rn.ID] = opsmodels.NewChangeStep(rn.ID, rn.Action, prior, plan, live) } var ImplicitReplaceFun = func(resourceIndex map[string]*models.Resource, refPath string) (reflect.Value, status.Status) { @@ -183,3 +197,73 @@ var ImplicitReplaceFun = func(resourceIndex map[string]*models.Resource, refPath } return reflect.ValueOf(valueMap), nil } + +func ParseImplicitRef(v reflect.Value, resourceIndex map[string]*models.Resource, + replaceFun func(resourceIndex map[string]*models.Resource, refPath string) (reflect.Value, status.Status), +) ([]string, reflect.Value, status.Status) { + var result []string + if !v.IsValid() { + return nil, v, status.NewErrorStatusWithMsg(status.InvalidArgument, "invalid implicit reference") + } + + switch v.Type().Kind() { + case reflect.Ptr, reflect.Interface: + if v.IsNil() { + return nil, v, nil + } + return ParseImplicitRef(v.Elem(), resourceIndex, replaceFun) + case reflect.String: + vStr := v.String() + if strings.HasPrefix(vStr, ImplicitRefPrefix) { + ref := strings.TrimPrefix(vStr, ImplicitRefPrefix) + util.CheckArgument(len(ref) > 0, + fmt.Sprintf("illegal implicit ref:%s. Implicit ref format: %sresourceKey.attribute", ref, ImplicitRefPrefix)) + split := strings.Split(ref, ".") + result = append(result, split[0]) + log.Infof("add implicit ref:%s", split[0]) + // replace v with output + tv, s := replaceFun(resourceIndex, ref) + if status.IsErr(s) { + return nil, v, s + } + v = tv + } + case reflect.Slice, reflect.Array: + if v.Len() == 0 { + return nil, v, nil + } + + vs := reflect.MakeSlice(v.Type(), 0, 0) + + for i := 0; i < v.Len(); i++ { + ref, tv, s := ParseImplicitRef(v.Index(i), resourceIndex, replaceFun) + if status.IsErr(s) { + return nil, tv, s + } + vs = reflect.Append(vs, tv) + if ref != nil { + result = append(result, ref...) + } + } + v = vs + case reflect.Map: + if v.Len() == 0 { + return nil, v, nil + } + makeMap := reflect.MakeMap(v.Type()) + + iter := v.MapRange() + for iter.Next() { + ref, tv, s := ParseImplicitRef(iter.Value(), resourceIndex, replaceFun) + if status.IsErr(s) { + return nil, tv, s + } + if ref != nil { + result = append(result, ref...) + } + makeMap.SetMapIndex(iter.Key(), tv) + } + v = makeMap + } + return result, v, nil +} diff --git a/pkg/engine/operation/resource_node_test.go b/pkg/engine/operation/graph/resource_node_test.go similarity index 78% rename from pkg/engine/operation/resource_node_test.go rename to pkg/engine/operation/graph/resource_node_test.go index e8d865a3..34e29aa2 100644 --- a/pkg/engine/operation/resource_node_test.go +++ b/pkg/engine/operation/graph/resource_node_test.go @@ -1,7 +1,7 @@ //go:build !arm64 // +build !arm64 -package operation +package graph import ( "context" @@ -9,6 +9,10 @@ import ( "sync" "testing" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "bou.ke/monkey" "github.com/hashicorp/terraform/dag" "github.com/stretchr/testify/assert" @@ -21,12 +25,12 @@ import ( func TestResourceNode_Execute(t *testing.T) { type fields struct { - BaseNode BaseNode - Action ActionType + BaseNode baseNode + Action types.ActionType state *models.Resource } type args struct { - operation Operation + operation opsmodels.Operation } const Jack = "jack" @@ -96,19 +100,19 @@ func TestResourceNode_Execute(t *testing.T) { { name: "update", fields: fields{ - BaseNode: BaseNode{ID: Jack}, - Action: Update, + BaseNode: baseNode{ID: Jack}, + Action: types.Update, state: newResourceState, }, - args: args{operation: Operation{ - OperationType: Apply, + args: args{operation: opsmodels.Operation{ + OperationType: types.Apply, StateStorage: states.NewFileSystemState(), CtxResourceIndex: priorStateResourceIndex, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, - MsgCh: make(chan Message), - resultState: states.NewState(), - lock: &sync.Mutex{}, + MsgCh: make(chan opsmodels.Message), + ResultState: states.NewState(), + Lock: &sync.Mutex{}, Runtime: &runtime.KubernetesRuntime{}, }}, want: nil, @@ -116,19 +120,19 @@ func TestResourceNode_Execute(t *testing.T) { { name: "delete", fields: fields{ - BaseNode: BaseNode{ID: Jack}, - Action: Delete, + BaseNode: baseNode{ID: Jack}, + Action: types.Delete, state: newResourceState, }, - args: args{operation: Operation{ - OperationType: Apply, + args: args{operation: opsmodels.Operation{ + OperationType: types.Apply, StateStorage: states.NewFileSystemState(), CtxResourceIndex: priorStateResourceIndex, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, - MsgCh: make(chan Message), - resultState: states.NewState(), - lock: &sync.Mutex{}, + MsgCh: make(chan opsmodels.Message), + ResultState: states.NewState(), + Lock: &sync.Mutex{}, Runtime: &runtime.KubernetesRuntime{}, }}, want: nil, @@ -136,19 +140,19 @@ func TestResourceNode_Execute(t *testing.T) { { name: "illegalRef", fields: fields{ - BaseNode: BaseNode{ID: Jack}, - Action: Update, + BaseNode: baseNode{ID: Jack}, + Action: types.Update, state: illegalResourceState, }, - args: args{operation: Operation{ - OperationType: Apply, + args: args{operation: opsmodels.Operation{ + OperationType: types.Apply, StateStorage: states.NewFileSystemState(), CtxResourceIndex: priorStateResourceIndex, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, - MsgCh: make(chan Message), - resultState: states.NewState(), - lock: &sync.Mutex{}, + MsgCh: make(chan opsmodels.Message), + ResultState: states.NewState(), + Lock: &sync.Mutex{}, Runtime: &runtime.KubernetesRuntime{}, }}, want: status.NewErrorStatusWithMsg(status.IllegalManifest, "can't find specified value in resource:jack by ref:jack.notExist"), @@ -157,7 +161,7 @@ func TestResourceNode_Execute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rn := &ResourceNode{ - BaseNode: tt.fields.BaseNode, + baseNode: &tt.fields.BaseNode, Action: tt.fields.Action, state: tt.fields.state, } diff --git a/pkg/engine/operation/manifest_parser.go b/pkg/engine/operation/manifest_parser.go deleted file mode 100644 index f4c17faf..00000000 --- a/pkg/engine/operation/manifest_parser.go +++ /dev/null @@ -1,154 +0,0 @@ -package operation - -import ( - "fmt" - "reflect" - "strings" - - "kusionstack.io/kusion/pkg/engine/models" - "kusionstack.io/kusion/pkg/log" - "kusionstack.io/kusion/pkg/status" - "kusionstack.io/kusion/pkg/util" - "kusionstack.io/kusion/pkg/util/json" - - "github.com/hashicorp/terraform/dag" -) - -type ManifestParser struct { - manifest *models.Spec -} - -func NewManifestParser(manifest *models.Spec) *ManifestParser { - return &ManifestParser{manifest: manifest} -} - -var _ Parser = (*ManifestParser)(nil) - -const ( - ImplicitRefPrefix = "$kusion_path." -) - -func (m *ManifestParser) Parse(graph *dag.AcyclicGraph) (s status.Status) { - util.CheckNotNil(graph, "dag is nil") - mf := m.manifest - util.CheckNotNil(mf, "models is nil") - if mf.Resources == nil { - sprintf := fmt.Sprintf("no resources in models:%s", json.Marshal2String(mf)) - return status.NewBaseStatus(status.Warning, status.NotFound, sprintf) - } - - root, err := graph.Root() - util.CheckNotError(err, "get graph root node error") - util.CheckNotNil(root, fmt.Sprintf("No root in this DAG:%s", json.Marshal2String(graph))) - resourceIndex := mf.Resources.Index() - for key, resourceState := range resourceIndex { - rn := NewResourceNode(key, resourceIndex[key], Update) - - // add this resource to dag at first time - if !graph.HasVertex(rn) { - graph.Add(rn) - graph.Connect(dag.BasicEdge(root, rn)) - } else { - // always get the latest vertex in this graph otherwise you will get subtle mistake in walking this graph - rn = GetVertex(graph, rn).(*ResourceNode) - graph.Connect(dag.BasicEdge(root, rn)) - } - // handle explicate dependency - refNodeKeys := resourceState.DependsOn - - // handle implicit dependency - v := reflect.ValueOf(resourceState.Attributes) - implicitRefKeys, _, s := ParseImplicitRef(v, nil, func(map[string]*models.Resource, string) (reflect.Value, status.Status) { - return v, nil - }) - if status.IsErr(s) { - return s - } - refNodeKeys = append(refNodeKeys, implicitRefKeys...) - - // Deduplicate - refNodeKeys = Deduplicate(refNodeKeys) - - // linkRefNodes - s = LinkRefNodes(graph, refNodeKeys, resourceIndex, rn, Update, nil) - if status.IsErr(s) { - return s - } - } - - if err = graph.Validate(); err != nil { - return status.NewErrorStatusWithMsg(status.IllegalManifest, "Found circle dependency in models:"+err.Error()) - } - graph.TransitiveReduction() - return s -} - -func ParseImplicitRef(v reflect.Value, resourceIndex map[string]*models.Resource, - replaceFun func(resourceIndex map[string]*models.Resource, refPath string) (reflect.Value, status.Status), -) ([]string, reflect.Value, status.Status) { - var result []string - if !v.IsValid() { - return nil, v, status.NewErrorStatusWithMsg(status.InvalidArgument, "invalid implicit reference") - } - - switch v.Type().Kind() { - case reflect.Ptr, reflect.Interface: - if v.IsNil() { - return nil, v, nil - } - return ParseImplicitRef(v.Elem(), resourceIndex, replaceFun) - case reflect.String: - vStr := v.String() - if strings.HasPrefix(vStr, ImplicitRefPrefix) { - ref := strings.TrimPrefix(vStr, ImplicitRefPrefix) - util.CheckArgument(len(ref) > 0, - fmt.Sprintf("illegal implicit ref:%s. Implicit ref format: %sresourceKey.attribute", ref, ImplicitRefPrefix)) - split := strings.Split(ref, ".") - result = append(result, split[0]) - log.Infof("add implicit ref:%s", split[0]) - // replace v with output - tv, s := replaceFun(resourceIndex, ref) - if status.IsErr(s) { - return nil, v, s - } - v = tv - } - case reflect.Slice, reflect.Array: - if v.Len() == 0 { - return nil, v, nil - } - - vs := reflect.MakeSlice(v.Type(), 0, 0) - - for i := 0; i < v.Len(); i++ { - ref, tv, s := ParseImplicitRef(v.Index(i), resourceIndex, replaceFun) - if status.IsErr(s) { - return nil, tv, s - } - vs = reflect.Append(vs, tv) - if ref != nil { - result = append(result, ref...) - } - } - v = vs - case reflect.Map: - if v.Len() == 0 { - return nil, v, nil - } - makeMap := reflect.MakeMap(v.Type()) - - iter := v.MapRange() - for iter.Next() { - ref, tv, s := ParseImplicitRef(iter.Value(), resourceIndex, replaceFun) - if status.IsErr(s) { - return nil, tv, s - } - if ref != nil { - result = append(result, ref...) - } - makeMap.SetMapIndex(iter.Key(), tv) - } - v = makeMap - } - return result, v, nil -} diff --git a/pkg/engine/operation/change.go b/pkg/engine/operation/models/change.go similarity index 86% rename from pkg/engine/operation/change.go rename to pkg/engine/operation/models/change.go index e00a9ce4..7eed5ee4 100644 --- a/pkg/engine/operation/change.go +++ b/pkg/engine/operation/models/change.go @@ -1,10 +1,14 @@ -package operation +package models import ( "bytes" "fmt" "strings" + "kusionstack.io/kusion/pkg/engine/operation/utils" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "github.com/AlecAivazis/survey/v2" @@ -19,11 +23,11 @@ import ( ) type ChangeStep struct { - ID string // the resource id - Action ActionType // the operation performed by this step. - Original interface{} // local stored resource - Modified interface{} // planed resource - Current interface{} // live resource + ID string // the resource id + Action types.ActionType // the operation performed by this step. + Original interface{} // local stored resource + Modified interface{} // planed resource + Current interface{} // live resource } // TODO: 3-way diff @@ -47,12 +51,12 @@ func (cs *ChangeStep) Diff() (string, error) { buf.WriteString(pretty.GreenBold("ID: ")) buf.WriteString(pretty.Green("%s\n", cs.ID)) } - if cs.Action != Undefined { + if cs.Action != types.Undefined { buf.WriteString(pretty.GreenBold("Plan: ")) buf.WriteString(pterm.Sprintf("%s\n", cs.Action.PrettyString())) } buf.WriteString(pretty.GreenBold("Diff: ")) - if len(strings.TrimSpace(reportString)) == 0 && cs.Action == UnChange { + if len(strings.TrimSpace(reportString)) == 0 && cs.Action == types.UnChange { buf.WriteString(pretty.Gray("")) } else { buf.WriteString("\n" + strings.TrimSpace(reportString)) @@ -61,7 +65,7 @@ func (cs *ChangeStep) Diff() (string, error) { return buf.String(), nil } -func NewChangeStep(id string, op ActionType, original, modified, current interface{}) *ChangeStep { +func NewChangeStep(id string, op types.ActionType, original, modified, current interface{}) *ChangeStep { return &ChangeStep{ ID: id, Action: op, @@ -74,10 +78,10 @@ func NewChangeStep(id string, op ActionType, original, modified, current interfa type ChangeStepFilterFunc func(*ChangeStep) bool var ( - CreateChangeStepFilter = func(c *ChangeStep) bool { return c.Action == Create } - UpdateChangeStepFilter = func(c *ChangeStep) bool { return c.Action == Update } - DeleteChangeStepFilter = func(c *ChangeStep) bool { return c.Action == Delete } - UnChangeChangeStepFilter = func(c *ChangeStep) bool { return c.Action == UnChange } + CreateChangeStepFilter = func(c *ChangeStep) bool { return c.Action == types.Create } + UpdateChangeStepFilter = func(c *ChangeStep) bool { return c.Action == types.Update } + DeleteChangeStepFilter = func(c *ChangeStep) bool { return c.Action == types.Delete } + UnChangeChangeStepFilter = func(c *ChangeStep) bool { return c.Action == types.UnChange } ) type Changes struct { @@ -238,12 +242,12 @@ func buildResourceStateMap(rs []*models.Resource) map[string]*models.Resource { } func diffToReport(oldData, newData interface{}) (*dyff.Report, error) { - from, err := LoadFile(yaml.MergeToOneYAML(oldData), "Old item") + from, err := utils.LoadFile(yaml.MergeToOneYAML(oldData), "Old item") if err != nil { return nil, err } - to, err := LoadFile(yaml.MergeToOneYAML(newData), "New item") + to, err := utils.LoadFile(yaml.MergeToOneYAML(newData), "New item") if err != nil { return nil, err } diff --git a/pkg/engine/operation/change_test.go b/pkg/engine/operation/models/change_test.go similarity index 91% rename from pkg/engine/operation/change_test.go rename to pkg/engine/operation/models/change_test.go index ebaa1dcb..c58070fc 100644 --- a/pkg/engine/operation/change_test.go +++ b/pkg/engine/operation/models/change_test.go @@ -1,9 +1,11 @@ -package operation +package models import ( "reflect" "testing" + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/projectstack" @@ -11,10 +13,10 @@ import ( ) var ( - TestChangeStepOpCreate = NewChangeStep("id", Create, nil, nil, nil) - TestChangeStepOpDelete = NewChangeStep("id", Delete, nil, nil, nil) - TestChangeStepOpUpdate = NewChangeStep("id", Update, nil, nil, nil) - TestChangeStepOpUnChange = NewChangeStep("id", UnChange, nil, nil, nil) + TestChangeStepOpCreate = NewChangeStep("id", types.Create, nil, nil, nil) + TestChangeStepOpDelete = NewChangeStep("id", types.Delete, nil, nil, nil) + TestChangeStepOpUpdate = NewChangeStep("id", types.Update, nil, nil, nil) + TestChangeStepOpUnChange = NewChangeStep("id", types.UnChange, nil, nil, nil) TestStepKeys = []string{"test-key-1", "test-key-2", "test-key-3", "test-key-4"} TestChangeSteps = map[string]*ChangeStep{ "test-key-1": TestChangeStepOpCreate, @@ -27,27 +29,27 @@ var ( func TestOpType_Ing(t *testing.T) { tests := []struct { name string - op ActionType + op types.ActionType want string }{ { name: "t1", - op: Create, + op: types.Create, want: "Creating", }, { name: "t2", - op: Delete, + op: types.Delete, want: "Deleting", }, { name: "t3", - op: Update, + op: types.Update, want: "Updating", }, { name: "t4", - op: UnChange, + op: types.UnChange, want: "Unchanged", }, } @@ -63,28 +65,28 @@ func TestOpType_Ing(t *testing.T) { func TestOpType_PrettyString(t *testing.T) { tests := []struct { name string - op ActionType + op types.ActionType want string }{ { name: "t1", - op: Create, - want: pretty.Green(Create.Ing()), + op: types.Create, + want: pretty.Green(types.Create.Ing()), }, { name: "t2", - op: Delete, - want: pretty.Red(Delete.Ing()), + op: types.Delete, + want: pretty.Red(types.Delete.Ing()), }, { name: "t3", - op: Update, - want: pretty.Blue(Update.Ing()), + op: types.Update, + want: pretty.Blue(types.Update.Ing()), }, { name: "t4", - op: UnChange, - want: pretty.Gray(UnChange.Ing()), + op: types.UnChange, + want: pretty.Gray(types.UnChange.Ing()), }, } for _, tt := range tests { @@ -99,7 +101,7 @@ func TestOpType_PrettyString(t *testing.T) { func TestChangeStep_Diff(t *testing.T) { type fields struct { ID string - Op ActionType + Op types.ActionType Old interface{} New interface{} } @@ -113,7 +115,7 @@ func TestChangeStep_Diff(t *testing.T) { name: "t1", fields: fields{ ID: "id", - Op: Create, + Op: types.Create, Old: nil, New: nil, }, diff --git a/pkg/engine/operation/models/doc.go b/pkg/engine/operation/models/doc.go new file mode 100644 index 00000000..3669447a --- /dev/null +++ b/pkg/engine/operation/models/doc.go @@ -0,0 +1,3 @@ +// Package models contains internal models of operations +// todo CLI imports this package directly. We need to make this pkg internal +package models diff --git a/pkg/engine/operation/operation.go b/pkg/engine/operation/models/operation_context.go similarity index 61% rename from pkg/engine/operation/operation.go rename to pkg/engine/operation/models/operation_context.go index f7d1375c..5629fcf4 100644 --- a/pkg/engine/operation/operation.go +++ b/pkg/engine/operation/models/operation_context.go @@ -1,9 +1,11 @@ -package operation +package models import ( "fmt" "sync" + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/util/kdump" @@ -15,21 +17,38 @@ import ( "kusionstack.io/kusion/pkg/util" ) +// Operation is the base model for all operations type Operation struct { - OperationType Type - StateStorage states.StateStorage - // CtxResourceIndex represents resources updated by func apply + // OperationType represents the OperationType of this operation + OperationType types.OperationType + + // StateStorage represents the storage where state will be saved during this operation + StateStorage states.StateStorage + + // CtxResourceIndex represents resources updated by this operation CtxResourceIndex map[string]*models.Resource - // PriorStateResourceIndex represents prior states.StateStorage state + + // PriorStateResourceIndex represents resource state saved during the last operation PriorStateResourceIndex map[string]*models.Resource + // StateResourceIndex represents resources that will be saved in states.StateStorage StateResourceIndex map[string]*models.Resource - // Order contains id to action of resource node in preview order - Order *ChangeOrder - Runtime runtime.Runtime - MsgCh chan Message - resultState *states.State - lock *sync.Mutex + + // ChangeOrder is resources' change order during this operation + ChangeOrder *ChangeOrder + + // Runtime is the resource infrastructure runtime of this operation + Runtime runtime.Runtime + + // MsgCh is used to send operation status like Success, Failed or Skip to Kusion CTl, + // and this message will be displayed in the terminal + MsgCh chan Message + + // Lock is the operation-wide mutex + Lock *sync.Mutex + + // ResultState is the final State build by this operation, and this State will be saved in the StateStorage + ResultState *states.State } type Message struct { @@ -43,7 +62,7 @@ type Request struct { Stack string `json:"stack"` Project string `json:"project"` Operator string `json:"operator"` - Manifest *models.Spec `json:"models"` + Spec *models.Spec `json:"spec"` } type OpResult string @@ -56,15 +75,15 @@ const ( ) // RefreshResourceIndex refresh resources in CtxResourceIndex & StateResourceIndex -func (o *Operation) RefreshResourceIndex(resourceKey string, resource *models.Resource, actionType ActionType) error { - o.lock.Lock() - defer o.lock.Unlock() +func (o *Operation) RefreshResourceIndex(resourceKey string, resource *models.Resource, actionType types.ActionType) error { + o.Lock.Lock() + defer o.Lock.Unlock() switch actionType { - case Delete: + case types.Delete: o.CtxResourceIndex[resourceKey] = nil o.StateResourceIndex[resourceKey] = nil - case Create, Update: + case types.Create, types.Update: o.CtxResourceIndex[resourceKey] = resource o.StateResourceIndex[resourceKey] = resource default: @@ -73,8 +92,8 @@ func (o *Operation) RefreshResourceIndex(resourceKey string, resource *models.Re return nil } -func initStates(storage states.StateStorage, request *Request) (*states.State, *states.State) { - latestState, err := storage.GetLatestState( +func (o *Operation) InitStates(request *Request) (*states.State, *states.State) { + latestState, err := o.StateStorage.GetLatestState( &states.StateQuery{ Tenant: request.Tenant, Stack: request.Stack, @@ -89,21 +108,21 @@ func initStates(storage states.StateStorage, request *Request) (*states.State, * resultState := states.NewState() resultState.Serial = latestState.Serial err = copier.Copy(resultState, request) - util.CheckNotError(err, "Copy request to resultState, request") + util.CheckNotError(err, "Copy request to ResultState, request") resultState.Resources = nil return latestState, resultState } func (o *Operation) UpdateState(resourceIndex map[string]*models.Resource) error { - o.lock.Lock() - defer o.lock.Unlock() + o.Lock.Lock() + defer o.Lock.Unlock() - state := o.resultState + state := o.ResultState state.Serial += 1 state.Resources = nil - res := []models.Resource{} + res := make([]models.Resource, 0, len(resourceIndex)) for key := range resourceIndex { // {key -> nil} represents Deleted action if resourceIndex[key] == nil { diff --git a/pkg/engine/operation/node.go b/pkg/engine/operation/node.go deleted file mode 100644 index 05f6bf43..00000000 --- a/pkg/engine/operation/node.go +++ /dev/null @@ -1,23 +0,0 @@ -package operation - -type BaseNode struct { - ID string -} - -func (b *BaseNode) Hashcode() interface{} { - return b.ID -} - -func (b *BaseNode) Name() string { - return b.ID -} - -type RootNode struct{} - -func (r *RootNode) Hashcode() interface{} { - return "RootNode" -} - -func (r *RootNode) Name() string { - return "root" -} diff --git a/pkg/engine/operation/delete_resource_parser.go b/pkg/engine/operation/parser/delete_resource_parser.go similarity index 64% rename from pkg/engine/operation/delete_resource_parser.go rename to pkg/engine/operation/parser/delete_resource_parser.go index fec37bd3..a90adb5d 100644 --- a/pkg/engine/operation/delete_resource_parser.go +++ b/pkg/engine/operation/parser/delete_resource_parser.go @@ -1,8 +1,12 @@ -package operation +package parser import ( "fmt" + "kusionstack.io/kusion/pkg/engine/operation/graph" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/log" @@ -21,16 +25,16 @@ func NewDeleteResourceParser(resources models.Resources) *DeleteResourceParser { return &DeleteResourceParser{resources: resources} } -func (d *DeleteResourceParser) Parse(graph *dag.AcyclicGraph) (s status.Status) { - util.CheckNotNil(graph, "graph is nil") - if len(graph.Vertices()) == 0 { - log.Infof("no vertices in dag when parsing deleted resources. dag:%s", json.Marshal2String(graph)) +func (d *DeleteResourceParser) Parse(g *dag.AcyclicGraph) (s status.Status) { + util.CheckNotNil(g, "graph is nil") + if len(g.Vertices()) == 0 { + log.Infof("no vertices in dag when parsing deleted resources. dag:%s", json.Marshal2String(g)) return status.NewErrorStatusWithMsg(status.InvalidArgument, "no vertices in dag when parsing deleted resources") } manifestGraphMap := make(map[string]interface{}) - for _, v := range graph.Vertices() { - if rn, ok := v.(*ResourceNode); ok { + for _, v := range g.Vertices() { + if rn, ok := v.(*graph.ResourceNode); ok { id := rn.Hashcode().(string) manifestGraphMap[id] = v } @@ -38,8 +42,8 @@ func (d *DeleteResourceParser) Parse(graph *dag.AcyclicGraph) (s status.Status) // diff resources to delete resourceIndex := d.resources.Index() - root, err := graph.Root() - util.CheckNotError(err, "root get error") + root, err := g.Root() + util.CheckNotError(err, "get dag root error") priorDependsOn := make(map[string][]string) for key, v := range resourceIndex { @@ -49,10 +53,13 @@ func (d *DeleteResourceParser) Parse(graph *dag.AcyclicGraph) (s status.Status) } for key, resource := range resourceIndex { - rn := NewResourceNode(key, resourceIndex[key], Delete) + rn, s := graph.NewResourceNode(key, resourceIndex[key], types.Delete) + if status.IsErr(s) { + return s + } rnID := rn.Hashcode().(string) - if !graph.HasVertex(rn) && manifestGraphMap[rnID] == nil { + if !g.HasVertex(rn) && manifestGraphMap[rnID] == nil { log.Infof("resource:%v not found in models. Mark as delete node", key) // we cannot delete this node if any node dependsOn this node for _, v := range priorDependsOn[rnID] { @@ -61,20 +68,20 @@ func (d *DeleteResourceParser) Parse(graph *dag.AcyclicGraph) (s status.Status) return status.NewErrorStatusWithMsg(status.Internal, msg) } } - graph.Add(rn) - graph.Connect(dag.BasicEdge(root, rn)) + g.Add(rn) + g.Connect(dag.BasicEdge(root, rn)) } - // always get the latest vertex in the graph. - rn = GetVertex(graph, rn).(*ResourceNode) - s = LinkRefNodes(graph, resource.DependsOn, resourceIndex, rn, Delete, manifestGraphMap) + // always get the latest vertex in the g. + rn = GetVertex(g, rn).(*graph.ResourceNode) + s = LinkRefNodes(g, resource.DependsOn, resourceIndex, rn, types.Delete, manifestGraphMap) if status.IsErr(s) { return s } } - if err = graph.Validate(); err != nil { + if err = g.Validate(); err != nil { return status.NewErrorStatusWithMsg(status.IllegalManifest, "Found circle dependency in models."+err.Error()) } - graph.TransitiveReduction() + g.TransitiveReduction() return s } diff --git a/pkg/engine/operation/delete_resource_parser_test.go b/pkg/engine/operation/parser/delete_resource_parser_test.go similarity index 85% rename from pkg/engine/operation/delete_resource_parser_test.go rename to pkg/engine/operation/parser/delete_resource_parser_test.go index 476b2d49..71167659 100644 --- a/pkg/engine/operation/delete_resource_parser_test.go +++ b/pkg/engine/operation/parser/delete_resource_parser_test.go @@ -1,9 +1,11 @@ -package operation +package parser import ( "strings" "testing" + "kusionstack.io/kusion/pkg/engine/operation/graph" + "kusionstack.io/kusion/pkg/engine/models" "github.com/hashicorp/terraform/dag" @@ -49,15 +51,15 @@ func TestDeleteResourceParser_Parse(t *testing.T) { }, } - graph := &dag.AcyclicGraph{} - graph.Add(&RootNode{}) + ag := &dag.AcyclicGraph{} + ag.Add(&graph.RootNode{}) deleteResourceParser := &DeleteResourceParser{ resources: resources, } - _ = deleteResourceParser.Parse(graph) - actual := strings.TrimSpace(graph.String()) + _ = deleteResourceParser.Parse(ag) + actual := strings.TrimSpace(ag.String()) expected := strings.TrimSpace(testGraphTransReductionMultiple) if actual != expected { diff --git a/pkg/engine/operation/parser.go b/pkg/engine/operation/parser/parser.go similarity index 55% rename from pkg/engine/operation/parser.go rename to pkg/engine/operation/parser/parser.go index 22253136..69540622 100644 --- a/pkg/engine/operation/parser.go +++ b/pkg/engine/operation/parser/parser.go @@ -1,8 +1,12 @@ -package operation +package parser import ( "fmt" + "kusionstack.io/kusion/pkg/engine/operation/graph" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/status" @@ -14,8 +18,8 @@ type Parser interface { Parse(dag *dag.AcyclicGraph) status.Status } -func LinkRefNodes(graph *dag.AcyclicGraph, refNodeKeys []string, resourceIndex map[string]*models.Resource, - rn dag.Vertex, defaultAction ActionType, manifestGraphMap map[string]interface{}, +func LinkRefNodes(ag *dag.AcyclicGraph, refNodeKeys []string, resourceIndex map[string]*models.Resource, + rn dag.Vertex, defaultAction types.ActionType, manifestGraphMap map[string]interface{}, ) status.Status { if len(refNodeKeys) == 0 { return nil @@ -25,33 +29,40 @@ func LinkRefNodes(graph *dag.AcyclicGraph, refNodeKeys []string, resourceIndex m return status.NewErrorStatusWithMsg(status.IllegalManifest, fmt.Sprintf("can't find resource by key:%s in models or state.", parentKey)) } - parentNode := NewResourceNode(parentKey, resourceIndex[parentKey], defaultAction) + parentNode, s := graph.NewResourceNode(parentKey, resourceIndex[parentKey], defaultAction) + if status.IsErr(s) { + return s + } + baseNode, s := graph.NewBaseNode(parentKey) + if status.IsErr(s) { + return s + } switch defaultAction { - case Delete: + case types.Delete: // if the parent node is a deleteNode, we will add an edge from child node to parent node. // if parent node is not a deleteNode and manifestGraph contains parent node, // we will add an edge from parent node to child node if manifestGraphMap[parentKey] == nil { - if graph.HasVertex(parentNode) { - parentNode = GetVertex(graph, &BaseNode{ID: parentKey}).(*ResourceNode) - graph.Connect(dag.BasicEdge(rn, parentNode)) + if ag.HasVertex(parentNode) { + parentNode = GetVertex(ag, baseNode).(*graph.ResourceNode) + ag.Connect(dag.BasicEdge(rn, parentNode)) } else { - graph.Add(parentNode) - graph.Connect(dag.BasicEdge(rn, parentNode)) + ag.Add(parentNode) + ag.Connect(dag.BasicEdge(rn, parentNode)) } } else { - parentNode = GetVertex(graph, &BaseNode{ID: parentKey}).(*ResourceNode) - graph.Connect(dag.BasicEdge(parentNode, rn)) + parentNode = GetVertex(ag, baseNode).(*graph.ResourceNode) + ag.Connect(dag.BasicEdge(parentNode, rn)) } default: - hasParent := graph.HasVertex(parentNode) + hasParent := ag.HasVertex(parentNode) if hasParent { - parentNode = GetVertex(graph, &BaseNode{ID: parentKey}).(*ResourceNode) - graph.Connect(dag.BasicEdge(parentNode, rn)) + parentNode = GetVertex(ag, baseNode).(*graph.ResourceNode) + ag.Connect(dag.BasicEdge(parentNode, rn)) } else { - graph.Add(parentNode) - graph.Connect(dag.BasicEdge(parentNode, rn)) + ag.Add(parentNode) + ag.Connect(dag.BasicEdge(parentNode, rn)) } } } diff --git a/pkg/engine/operation/parser/spec_parser.go b/pkg/engine/operation/parser/spec_parser.go new file mode 100644 index 00000000..7039eac9 --- /dev/null +++ b/pkg/engine/operation/parser/spec_parser.go @@ -0,0 +1,85 @@ +package parser + +import ( + "fmt" + "reflect" + + "kusionstack.io/kusion/pkg/engine/operation/graph" + + "kusionstack.io/kusion/pkg/engine/operation/types" + + "kusionstack.io/kusion/pkg/engine/models" + "kusionstack.io/kusion/pkg/status" + "kusionstack.io/kusion/pkg/util" + "kusionstack.io/kusion/pkg/util/json" + + "github.com/hashicorp/terraform/dag" +) + +type SpecParser struct { + spec *models.Spec +} + +func NewSpecParser(spec *models.Spec) *SpecParser { + return &SpecParser{spec: spec} +} + +var _ Parser = (*SpecParser)(nil) + +func (m *SpecParser) Parse(g *dag.AcyclicGraph) (s status.Status) { + util.CheckNotNil(g, "dag is nil") + mf := m.spec + util.CheckNotNil(mf, "models is nil") + if mf.Resources == nil { + sprintf := fmt.Sprintf("no resources in models:%s", json.Marshal2String(mf)) + return status.NewBaseStatus(status.Warning, status.NotFound, sprintf) + } + + root, err := g.Root() + util.CheckNotError(err, "get dag root error") + util.CheckNotNil(root, fmt.Sprintf("No root in this DAG:%s", json.Marshal2String(g))) + resourceIndex := mf.Resources.Index() + for key, resourceState := range resourceIndex { + rn, s := graph.NewResourceNode(key, resourceIndex[key], types.Update) + if status.IsErr(s) { + return s + } + + // add this resource to dag at first time + if !g.HasVertex(rn) { + g.Add(rn) + g.Connect(dag.BasicEdge(root, rn)) + } else { + // always get the latest vertex in this g otherwise you will get subtle mistake in walking this g + rn = GetVertex(g, rn).(*graph.ResourceNode) + g.Connect(dag.BasicEdge(root, rn)) + } + // handle explicate dependency + refNodeKeys := resourceState.DependsOn + + // handle implicit dependency + v := reflect.ValueOf(resourceState.Attributes) + implicitRefKeys, _, s := graph.ParseImplicitRef(v, nil, func(map[string]*models.Resource, string) (reflect.Value, status.Status) { + return v, nil + }) + if status.IsErr(s) { + return s + } + refNodeKeys = append(refNodeKeys, implicitRefKeys...) + + // Deduplicate + refNodeKeys = Deduplicate(refNodeKeys) + + // linkRefNodes + s = LinkRefNodes(g, refNodeKeys, resourceIndex, rn, types.Update, nil) + if status.IsErr(s) { + return s + } + } + + if err = g.Validate(); err != nil { + return status.NewErrorStatusWithMsg(status.IllegalManifest, "Found circle dependency in models:"+err.Error()) + } + g.TransitiveReduction() + return s +} diff --git a/pkg/engine/operation/manifest_parser_test.go b/pkg/engine/operation/parser/spec_parser_test.go similarity index 72% rename from pkg/engine/operation/manifest_parser_test.go rename to pkg/engine/operation/parser/spec_parser_test.go index c4b58414..2269ade4 100644 --- a/pkg/engine/operation/manifest_parser_test.go +++ b/pkg/engine/operation/parser/spec_parser_test.go @@ -1,14 +1,16 @@ -package operation +package parser import ( "strings" "testing" + "kusionstack.io/kusion/pkg/engine/operation/graph" + "github.com/hashicorp/terraform/dag" "kusionstack.io/kusion/pkg/engine/models" ) -func TestManifestParser_Parse(t *testing.T) { +func TestSpecParser_Parse(t *testing.T) { const Jack = "jack" const Pony = "pony" const Eric = "eric" @@ -25,7 +27,7 @@ func TestManifestParser_Parse(t *testing.T) { ID: Eric, Attributes: map[string]interface{}{ - "a": ImplicitRefPrefix + "jack.a", + "a": graph.ImplicitRefPrefix + "jack.a", }, DependsOn: []string{Pony}, }, @@ -39,15 +41,15 @@ func TestManifestParser_Parse(t *testing.T) { }, }} - graph := &dag.AcyclicGraph{} - graph.Add(&RootNode{}) + ag := &dag.AcyclicGraph{} + ag.Add(&graph.RootNode{}) - manifest := &ManifestParser{ - manifest: mf, + spec := &SpecParser{ + spec: mf, } - _ = manifest.Parse(graph) - actual := strings.TrimSpace(graph.String()) + _ = spec.Parse(ag) + actual := strings.TrimSpace(ag.String()) expected := strings.TrimSpace(testGraphTransReductionMultipleRootsStr) if actual != expected { diff --git a/pkg/engine/operation/preview.go b/pkg/engine/operation/preview.go index 0986e30c..c6151bb3 100644 --- a/pkg/engine/operation/preview.go +++ b/pkg/engine/operation/preview.go @@ -5,6 +5,11 @@ import ( "fmt" "sync" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/graph" + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "github.com/hashicorp/terraform/dag" @@ -16,18 +21,22 @@ import ( ) type PreviewOperation struct { - Operation + opsmodels.Operation } type PreviewRequest struct { - Request `json:",inline" yaml:",inline"` + opsmodels.Request `json:",inline" yaml:",inline"` } type PreviewResponse struct { - Order *ChangeOrder + Order *opsmodels.ChangeOrder } -func (o *Operation) Preview(request *PreviewRequest) (rsp *PreviewResponse, s status.Status) { +// Preview compute all changes between resources in request and the actual infrastructure. +// The whole process is similar to the operation Apply, but the execution of each node is mocked and will not actually invoke the Runtime +func (po *PreviewOperation) Preview(request *PreviewRequest) (rsp *PreviewResponse, s status.Status) { + o := po.Operation + defer func() { if e := recover(); e != nil { log.Error("preview panic:%v", e) @@ -50,20 +59,20 @@ func (o *Operation) Preview(request *PreviewRequest) (rsp *PreviewResponse, s st var ( priorState, resultState *states.State priorStateResourceIndex map[string]*models.Resource - graph *dag.AcyclicGraph + ag *dag.AcyclicGraph ) // 1. init & build Indexes - priorState, resultState = initStates(o.StateStorage, &request.Request) + priorState, resultState = po.InitStates(&request.Request) switch o.OperationType { - case ApplyPreview: + case types.ApplyPreview: priorStateResourceIndex = priorState.Resources.Index() - graph, s = NewApplyGraph(request.Manifest, priorState) - case DestroyPreview: - resources := request.Request.Manifest.Resources + ag, s = NewApplyGraph(request.Spec, priorState) + case types.DestroyPreview: + resources := request.Request.Spec.Resources priorStateResourceIndex = resources.Index() - graph, s = NewDestroyGraph(resources) + ag, s = NewDestroyGraph(resources) } if status.IsErr(s) { return nil, s @@ -73,27 +82,27 @@ func (o *Operation) Preview(request *PreviewRequest) (rsp *PreviewResponse, s st log.Info("walking DAG and preview resources ...") previewOperation := &PreviewOperation{ - Operation: Operation{ + Operation: opsmodels.Operation{ OperationType: o.OperationType, StateStorage: o.StateStorage, CtxResourceIndex: map[string]*models.Resource{}, PriorStateResourceIndex: priorStateResourceIndex, StateResourceIndex: priorStateResourceIndex, - Order: o.Order, - Runtime: o.Runtime, // preview need get the latest manifest from runtime - resultState: resultState, - lock: &sync.Mutex{}, + ChangeOrder: o.ChangeOrder, + Runtime: o.Runtime, // preview need get the latest spec from runtime + ResultState: resultState, + Lock: &sync.Mutex{}, }, } w := &dag.Walker{Callback: previewOperation.previewWalkFun} - w.Update(graph) + w.Update(ag) // Wait if diags := w.Wait(); diags.HasErrors() { return nil, status.NewErrorStatus(diags.Err()) } - return &PreviewResponse{Order: previewOperation.Order}, nil + return &PreviewResponse{Order: previewOperation.ChangeOrder}, nil } func (po *PreviewOperation) previewWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { @@ -118,10 +127,10 @@ func (po *PreviewOperation) previewWalkFun(v dag.Vertex) (diags tfdiags.Diagnost } }() - if node, ok := v.(ExecutableNode); ok { + if node, ok := v.(graph.ExecutableNode); ok { s = node.Execute(&po.Operation) if status.IsErr(s) { - diags = diags.Append(fmt.Errorf("node execute failed, status: %v", s)) + diags = diags.Append(fmt.Errorf("node execute failed.\n%v", s)) return diags } } diff --git a/pkg/engine/operation/preview_test.go b/pkg/engine/operation/preview_test.go index ce6abaa7..62a532d0 100644 --- a/pkg/engine/operation/preview_test.go +++ b/pkg/engine/operation/preview_test.go @@ -6,6 +6,10 @@ import ( "sync" "testing" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "kusionstack.io/kusion/pkg/engine/models" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/states" @@ -60,14 +64,14 @@ func (f *fakePreviewRuntime) Watch(ctx context.Context, resourceState *models.Re func TestOperation_Preview(t *testing.T) { type fields struct { - OperationType Type + OperationType types.OperationType StateStorage states.StateStorage CtxResourceIndex map[string]*models.Resource PriorStateResourceIndex map[string]*models.Resource StateResourceIndex map[string]*models.Resource - Order *ChangeOrder + Order *opsmodels.ChangeOrder Runtime runtime.Runtime - MsgCh chan Message + MsgCh chan opsmodels.Message resultState *states.State lock *sync.Mutex } @@ -84,19 +88,19 @@ func TestOperation_Preview(t *testing.T) { { name: "success-when-apply", fields: fields{ - OperationType: ApplyPreview, + OperationType: types.ApplyPreview, Runtime: &fakePreviewRuntime{}, StateStorage: &states.FileSystemState{Path: states.KusionState}, - Order: &ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*ChangeStep{}}, + Order: &opsmodels.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*opsmodels.ChangeStep{}}, }, args: args{ request: &PreviewRequest{ - Request: Request{ + Request: opsmodels.Request{ Tenant: "fake-tenant", Stack: "fake-stack", Project: "fake-project", Operator: "fake-operator", - Manifest: &models.Spec{ + Spec: &models.Spec{ Resources: []models.Resource{ FakeResourceState, }, @@ -105,12 +109,12 @@ func TestOperation_Preview(t *testing.T) { }, }, wantRsp: &PreviewResponse{ - Order: &ChangeOrder{ + Order: &opsmodels.ChangeOrder{ StepKeys: []string{"fake-id"}, - ChangeSteps: map[string]*ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ "fake-id": { ID: "fake-id", - Action: Create, + Action: types.Create, Original: (*models.Resource)(nil), Modified: &FakeResourceState, Current: (*models.Resource)(nil), @@ -123,19 +127,19 @@ func TestOperation_Preview(t *testing.T) { { name: "success-when-destroy", fields: fields{ - OperationType: DestroyPreview, + OperationType: types.DestroyPreview, Runtime: &fakePreviewRuntime{}, StateStorage: &states.FileSystemState{Path: states.KusionState}, - Order: &ChangeOrder{}, + Order: &opsmodels.ChangeOrder{}, }, args: args{ request: &PreviewRequest{ - Request: Request{ + Request: opsmodels.Request{ Tenant: "fake-tenant", Stack: "fake-stack", Project: "fake-project", Operator: "fake-operator", - Manifest: &models.Spec{ + Spec: &models.Spec{ Resources: []models.Resource{ FakeResourceState2, }, @@ -144,12 +148,12 @@ func TestOperation_Preview(t *testing.T) { }, }, wantRsp: &PreviewResponse{ - Order: &ChangeOrder{ + Order: &opsmodels.ChangeOrder{ StepKeys: []string{"fake-id-2"}, - ChangeSteps: map[string]*ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ "fake-id-2": { ID: "fake-id-2", - Action: Delete, + Action: types.Delete, Original: &FakeResourceState2, Modified: &FakeResourceState2, Current: &FakeResourceState2, @@ -162,15 +166,15 @@ func TestOperation_Preview(t *testing.T) { { name: "fail-because-empty-models", fields: fields{ - OperationType: ApplyPreview, + OperationType: types.ApplyPreview, Runtime: &fakePreviewRuntime{}, StateStorage: &states.FileSystemState{Path: states.KusionState}, - Order: &ChangeOrder{}, + Order: &opsmodels.ChangeOrder{}, }, args: args{ request: &PreviewRequest{ - Request: Request{ - Manifest: nil, + Request: opsmodels.Request{ + Spec: nil, }, }, }, @@ -180,19 +184,19 @@ func TestOperation_Preview(t *testing.T) { { name: "fail-because-nonexistent-id", fields: fields{ - OperationType: ApplyPreview, + OperationType: types.ApplyPreview, Runtime: &fakePreviewRuntime{}, StateStorage: &states.FileSystemState{Path: states.KusionState}, - Order: &ChangeOrder{}, + Order: &opsmodels.ChangeOrder{}, }, args: args{ request: &PreviewRequest{ - Request: Request{ + Request: opsmodels.Request{ Tenant: "fake-tennat", Stack: "fake-stack", Project: "fake-project", Operator: "fake-operator", - Manifest: &models.Spec{ + Spec: &models.Spec{ Resources: []models.Resource{ { ID: "fake-id", @@ -211,17 +215,19 @@ func TestOperation_Preview(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - o := &Operation{ - OperationType: tt.fields.OperationType, - StateStorage: tt.fields.StateStorage, - CtxResourceIndex: tt.fields.CtxResourceIndex, - PriorStateResourceIndex: tt.fields.PriorStateResourceIndex, - StateResourceIndex: tt.fields.StateResourceIndex, - Order: tt.fields.Order, - Runtime: tt.fields.Runtime, - MsgCh: tt.fields.MsgCh, - resultState: tt.fields.resultState, - lock: tt.fields.lock, + o := &PreviewOperation{ + Operation: opsmodels.Operation{ + OperationType: tt.fields.OperationType, + StateStorage: tt.fields.StateStorage, + CtxResourceIndex: tt.fields.CtxResourceIndex, + PriorStateResourceIndex: tt.fields.PriorStateResourceIndex, + StateResourceIndex: tt.fields.StateResourceIndex, + ChangeOrder: tt.fields.Order, + Runtime: tt.fields.Runtime, + MsgCh: tt.fields.MsgCh, + ResultState: tt.fields.resultState, + Lock: tt.fields.lock, + }, } gotRsp, gotS := o.Preview(tt.args.request) if !reflect.DeepEqual(gotRsp, tt.wantRsp) { diff --git a/pkg/engine/operation/types.go b/pkg/engine/operation/types.go deleted file mode 100644 index 8fe68291..00000000 --- a/pkg/engine/operation/types.go +++ /dev/null @@ -1,12 +0,0 @@ -package operation - -type Type int64 - -// Operation type -const ( - UndefinedOperation Type = iota // invalidate value - Apply - ApplyPreview - Destroy - DestroyPreview -) diff --git a/pkg/engine/operation/action.go b/pkg/engine/operation/types/action.go similarity index 98% rename from pkg/engine/operation/action.go rename to pkg/engine/operation/types/action.go index 09321695..e7d77427 100644 --- a/pkg/engine/operation/action.go +++ b/pkg/engine/operation/types/action.go @@ -1,4 +1,4 @@ -package operation +package types import "kusionstack.io/kusion/pkg/util/pretty" diff --git a/pkg/engine/operation/types/operation_type.go b/pkg/engine/operation/types/operation_type.go new file mode 100644 index 00000000..ac7291d2 --- /dev/null +++ b/pkg/engine/operation/types/operation_type.go @@ -0,0 +1,12 @@ +package types + +type OperationType int64 + +// Operation type +const ( + UndefinedOperation OperationType = iota // invalidate value + Apply + ApplyPreview + Destroy + DestroyPreview +) diff --git a/pkg/engine/operation/utils/file.go b/pkg/engine/operation/utils/file.go new file mode 100644 index 00000000..d4253279 --- /dev/null +++ b/pkg/engine/operation/utils/file.go @@ -0,0 +1,27 @@ +package utils + +import ( + "github.com/gonvenience/wrap" + "github.com/gonvenience/ytbx" + yamlv3 "gopkg.in/yaml.v3" +) + +// LoadFile processes the provided input location to load it as one of the +// supported document formats, or plain text if nothing else works. +func LoadFile(yaml, location string) (ytbx.InputFile, error) { + var ( + documents []*yamlv3.Node + data []byte + err error + ) + + data = []byte(yaml) + if documents, err = ytbx.LoadDocuments(data); err != nil { + return ytbx.InputFile{}, wrap.Errorf(err, "unable to parse data %v", data) + } + + return ytbx.InputFile{ + Location: location, + Documents: documents, + }, nil +} diff --git a/pkg/engine/runtime/doc.go b/pkg/engine/runtime/doc.go new file mode 100644 index 00000000..d521f708 --- /dev/null +++ b/pkg/engine/runtime/doc.go @@ -0,0 +1,2 @@ +// Package runtime contains code of all infrastructure runtimes supported by Kusion. +package runtime diff --git a/pkg/engine/runtime/runtime.go b/pkg/engine/runtime/runtime.go index e840477e..84dd8506 100644 --- a/pkg/engine/runtime/runtime.go +++ b/pkg/engine/runtime/runtime.go @@ -8,16 +8,26 @@ import ( "kusionstack.io/kusion/pkg/status" ) +// Runtime represents an actual infrastructure runtime managed by Kusion and every runtime implements this interface can be orchestrated +// by Kusion like normal K8s resources. All methods in this interface are designed for manipulating one resource at a time and will be +// invoked in operations like Apply, Preview, Destroy, etc. type Runtime interface { - // Apply resource with planState. priorState is given to Runtime for three-way-merge if it needs + // Apply means modify this resource to the desired state described in the request, + // and it will turn into creating or updating a resource in most scenarios. + // If the infrastructure runtime already provides an Apply method that conform to this method's semantics meaning, + // like the Kubernetes Runtime, you can directly invoke this method without any conversion. + // PlanState and priorState are given in this method for the runtime which would make a + // three-way-merge (planState,priorState and live state) when implementing this interface Apply(ctx context.Context, priorState, planState *models.Resource) (*models.Resource, status.Status) // Read the latest state of this resource Read(ctx context.Context, resourceState *models.Resource) (*models.Resource, status.Status) - // Delete resource + // Delete this resource in the actual infrastructure and return success if this resource is not exist Delete(ctx context.Context, resourceState *models.Resource) status.Status - // Watch the latest state or event of this resource. This is very helpful for us to know what is happening when apply resources + // Watch the latest state or event of this resource. + // This is an optional method for the Runtime to implement, + // but it will be very helpful for us to know what is happening when applying this resource Watch(ctx context.Context, resourceState *models.Resource) (*models.Resource, status.Status) } diff --git a/pkg/engine/states/doc.go b/pkg/engine/states/doc.go new file mode 100644 index 00000000..f7a785a0 --- /dev/null +++ b/pkg/engine/states/doc.go @@ -0,0 +1,2 @@ +// Package states contains code for all storage medium supported by Kusion. +package states diff --git a/pkg/engine/states/state.go b/pkg/engine/states/state.go index b821cbb5..8aa5fac4 100644 --- a/pkg/engine/states/state.go +++ b/pkg/engine/states/state.go @@ -32,19 +32,31 @@ type StateQuery struct { Project string `json:"project"` } -// State represent all resources state in one apply operation. +// State is a record of an operation's result. It is a mapping between resources in KCL and the actual infra resource and often used as a +// datasource for 3-way merge/diff in operations like Apply or Preview. type State struct { - ID int64 `json:"id"` - Tenant string `json:"tenant"` - Stack string `json:"stack"` - Project string `json:"project"` - Version int `json:"version"` - KusionVersion string `json:"kusionVersion"` - Serial uint64 `json:"serial"` - Operator string `json:"operator"` - Resources models.Resources `json:"resources"` - CreatTime time.Time `json:"creatTime"` - ModifiedTime time.Time `json:"modifiedTime,omitempty"` + // State ID + ID int64 `json:"id"` + // Tenant is designed for multi-tenant scenario + Tenant string `json:"tenant,omitempty"` + // Stack name + Stack string `json:"stack"` + // Project name + Project string `json:"project"` + // State version + Version int `json:"version"` + // KusionVersion represents the Kusion's version when this State is created + KusionVersion string `json:"kusionVersion"` + // Serial is an auto-increase number that represents how many times this State is modified + Serial uint64 `json:"serial"` + // Operator represents the person who triggered this operation + Operator string `json:"operator,omitempty"` + // Resources records all resources in this operation + Resources models.Resources `json:"resources"` + // CreatTime is the time State is created + CreatTime time.Time `json:"creatTime"` + // ModifiedTime is the time State is modified each time + ModifiedTime time.Time `json:"modifiedTime,omitempty"` } func NewState() *State { diff --git a/pkg/kusionctl/cmd/apply/options.go b/pkg/kusionctl/cmd/apply/options.go index c0f5f9fb..cdc0f909 100644 --- a/pkg/kusionctl/cmd/apply/options.go +++ b/pkg/kusionctl/cmd/apply/options.go @@ -8,12 +8,16 @@ import ( "strings" "sync" + "kusionstack.io/kusion/pkg/engine/operation" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "github.com/AlecAivazis/survey/v2" "github.com/pterm/pterm" "kusionstack.io/kusion/pkg/compile" "kusionstack.io/kusion/pkg/engine/models" - "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/states" compilecmd "kusionstack.io/kusion/pkg/kusionctl/cmd/compile" @@ -158,6 +162,7 @@ func (o *ApplyOptions) Run() error { // if err != nil { // return err // } +// todo @elliotxx io.Writer is not used now func Preview( o *ApplyOptions, runtime runtime.Runtime, @@ -166,35 +171,35 @@ func Preview( project *projectstack.Project, stack *projectstack.Stack, out io.Writer, -) (*operation.Changes, error) { +) (*opsmodels.Changes, error) { log.Info("Start compute preview changes ...") // Construct the preview operation pc := &operation.PreviewOperation{ - Operation: operation.Operation{ - OperationType: operation.ApplyPreview, + Operation: opsmodels.Operation{ + OperationType: types.ApplyPreview, Runtime: runtime, StateStorage: storage, - Order: &operation.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*operation.ChangeStep{}}, + ChangeOrder: &opsmodels.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*opsmodels.ChangeStep{}}, }, } log.Info("Start call pc.Preview() ...") rsp, s := pc.Preview(&operation.PreviewRequest{ - Request: operation.Request{ + Request: opsmodels.Request{ Tenant: project.Tenant, Project: project.Name, Operator: o.Operator, Stack: stack.Name, - Manifest: planResources, + Spec: planResources, }, }) if status.IsErr(s) { return nil, fmt.Errorf("preview failed.\n%s", s.String()) } - return operation.NewChanges(project, stack, rsp.Order), nil + return opsmodels.NewChanges(project, stack, rsp.Order), nil } // The Apply function will apply the resources changes @@ -223,15 +228,15 @@ func Apply( runtime runtime.Runtime, storage states.StateStorage, planResources *models.Spec, - changes *operation.Changes, + changes *opsmodels.Changes, out io.Writer, ) error { // Construct the apply operation ac := &operation.ApplyOperation{ - Operation: operation.Operation{ + Operation: opsmodels.Operation{ Runtime: runtime, StateStorage: storage, - MsgCh: make(chan operation.Message), + MsgCh: make(chan opsmodels.Message), }, } @@ -264,13 +269,13 @@ func Apply( changeStep := changes.Get(msg.ResourceID) switch msg.OpResult { - case operation.Success, operation.Skip: + case opsmodels.Success, opsmodels.Skip: var title string - if changeStep.Action == operation.UnChange { + if changeStep.Action == types.UnChange { title = fmt.Sprintf("%s %s, %s", changeStep.Action.String(), pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(operation.Skip)), + strings.ToLower(string(opsmodels.Skip)), ) } else { title = fmt.Sprintf("%s %s %s", @@ -283,7 +288,7 @@ func Apply( progressbar.UpdateTitle(title) progressbar.Increment() ls.Count(changeStep.Action) - case operation.Failed: + case opsmodels.Failed: title := fmt.Sprintf("%s %s %s", changeStep.Action.String(), pterm.Bold.Sprint(changeStep.ID), @@ -304,21 +309,21 @@ func Apply( if o.DryRun { for _, r := range planResources.Resources { - ac.MsgCh <- operation.Message{ + ac.MsgCh <- opsmodels.Message{ ResourceID: r.ResourceKey(), - OpResult: operation.Success, + OpResult: opsmodels.Success, OpErr: nil, } } close(ac.MsgCh) } else { _, st := ac.Apply(&operation.ApplyRequest{ - Request: operation.Request{ + Request: opsmodels.Request{ Tenant: changes.Project().Tenant, Project: changes.Project().Name, Operator: o.Operator, Stack: changes.Stack().Name, - Manifest: planResources, + Spec: planResources, }, }) if status.IsErr(st) { @@ -338,20 +343,20 @@ type lineSummary struct { created, updated, deleted int } -func (ls *lineSummary) Count(op operation.ActionType) { +func (ls *lineSummary) Count(op types.ActionType) { switch op { - case operation.Create: + case types.Create: ls.created++ - case operation.Update: + case types.Update: ls.updated++ - case operation.Delete: + case types.Delete: ls.deleted++ } } -func allUnChange(changes *operation.Changes) bool { +func allUnChange(changes *opsmodels.Changes) bool { for _, v := range changes.ChangeSteps { - if v.Action != operation.UnChange { + if v.Action != types.UnChange { return false } } diff --git a/pkg/kusionctl/cmd/apply/options_test.go b/pkg/kusionctl/cmd/apply/options_test.go index 1f3478a1..214e38c8 100644 --- a/pkg/kusionctl/cmd/apply/options_test.go +++ b/pkg/kusionctl/cmd/apply/options_test.go @@ -13,6 +13,11 @@ import ( "testing" "time" + "kusionstack.io/kusion/pkg/engine/operation" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "bou.ke/monkey" "github.com/AlecAivazis/survey/v2" "github.com/pterm/pterm" @@ -21,7 +26,6 @@ import ( "kusionstack.io/kusion/pkg/compile" "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/models" - "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/states" "kusionstack.io/kusion/pkg/projectstack" @@ -68,7 +72,7 @@ func TestApplyOptions_Run(t *testing.T) { mockCompileWithSpinner() mockNewKubernetesRuntime() mockOperationPreview() - mockOperationApply(operation.Success) + mockOperationApply(opsmodels.Success) o := NewApplyOptions() o.DryRun = true @@ -153,27 +157,27 @@ func (f *fakerRuntime) Watch(ctx context.Context, resourceState *models.Resource } func mockOperationPreview() { - monkey.Patch((*operation.Operation).Preview, - func(*operation.Operation, *operation.PreviewRequest) (rsp *operation.PreviewResponse, s status.Status) { + monkey.Patch((*operation.PreviewOperation).Preview, + func(*operation.PreviewOperation, *operation.PreviewRequest) (rsp *operation.PreviewResponse, s status.Status) { return &operation.PreviewResponse{ - Order: &operation.ChangeOrder{ + Order: &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID, sa2.ID, sa3.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Create, + Action: types.Create, Original: nil, Modified: &sa1, }, sa2.ID: { ID: sa2.ID, - Action: operation.UnChange, + Action: types.UnChange, Original: &sa2, Modified: &sa2, }, sa3.ID: { ID: sa3.ID, - Action: operation.Undefined, + Action: types.Undefined, Original: &sa3, Modified: &sa1, }, @@ -217,18 +221,18 @@ func Test_apply(t *testing.T) { defer monkey.UnpatchAll() planResources := &models.Spec{Resources: []models.Resource{sa1}} - order := &operation.ChangeOrder{ + order := &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Create, + Action: types.Create, Original: nil, Modified: sa1, }, }, } - changes := operation.NewChanges(project, stack, order) + changes := opsmodels.NewChanges(project, stack, order) o := NewApplyOptions() o.DryRun = true err := Apply(o, &fakerRuntime{}, stateStorage, planResources, changes, os.Stdout) @@ -236,78 +240,78 @@ func Test_apply(t *testing.T) { }) t.Run("apply success", func(t *testing.T) { defer monkey.UnpatchAll() - mockOperationApply(operation.Success) + mockOperationApply(opsmodels.Success) o := NewApplyOptions() planResources := &models.Spec{Resources: []models.Resource{sa1, sa2}} - order := &operation.ChangeOrder{ + order := &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID, sa2.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Create, + Action: types.Create, Original: nil, Modified: &sa1, }, sa2.ID: { ID: sa2.ID, - Action: operation.UnChange, + Action: types.UnChange, Original: &sa2, Modified: &sa2, }, }, } - changes := operation.NewChanges(project, stack, order) + changes := opsmodels.NewChanges(project, stack, order) err := Apply(o, &fakerRuntime{}, stateStorage, planResources, changes, os.Stdout) assert.Nil(t, err) }) t.Run("apply failed", func(t *testing.T) { defer monkey.UnpatchAll() - mockOperationApply(operation.Failed) + mockOperationApply(opsmodels.Failed) o := NewApplyOptions() planResources := &models.Spec{Resources: []models.Resource{sa1}} - order := &operation.ChangeOrder{ + order := &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Create, + Action: types.Create, Original: nil, Modified: &sa1, }, }, } - changes := operation.NewChanges(project, stack, order) + changes := opsmodels.NewChanges(project, stack, order) err := Apply(o, &fakerRuntime{}, stateStorage, planResources, changes, os.Stdout) assert.NotNil(t, err) }) } -func mockOperationApply(res operation.OpResult) { - monkey.Patch((*operation.Operation).Apply, - func(o *operation.Operation, request *operation.ApplyRequest) (*operation.ApplyResponse, status.Status) { +func mockOperationApply(res opsmodels.OpResult) { + monkey.Patch((*operation.ApplyOperation).Apply, + func(o *operation.ApplyOperation, request *operation.ApplyRequest) (*operation.ApplyResponse, status.Status) { var err error - if res == operation.Failed { + if res == opsmodels.Failed { err = errors.New("mock error") } - for _, r := range request.Manifest.Resources { + for _, r := range request.Spec.Resources { // ing -> $res - o.MsgCh <- operation.Message{ + o.MsgCh <- opsmodels.Message{ ResourceID: r.ResourceKey(), OpResult: "", OpErr: nil, } - o.MsgCh <- operation.Message{ + o.MsgCh <- opsmodels.Message{ ResourceID: r.ResourceKey(), OpResult: res, OpErr: err, } } close(o.MsgCh) - if res == operation.Failed { + if res == opsmodels.Failed { return nil, status.NewErrorStatus(err) } return &operation.ApplyResponse{}, nil diff --git a/pkg/kusionctl/cmd/destroy/options.go b/pkg/kusionctl/cmd/destroy/options.go index 6ec010ae..964f1a8c 100644 --- a/pkg/kusionctl/cmd/destroy/options.go +++ b/pkg/kusionctl/cmd/destroy/options.go @@ -6,12 +6,16 @@ import ( "strings" "sync" + "kusionstack.io/kusion/pkg/engine/operation" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "github.com/AlecAivazis/survey/v2" "github.com/pterm/pterm" "kusionstack.io/kusion/pkg/compile" "kusionstack.io/kusion/pkg/engine/models" - "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/states" compilecmd "kusionstack.io/kusion/pkg/kusionctl/cmd/compile" @@ -117,7 +121,7 @@ func (o *DestroyOptions) Run() error { func (o *DestroyOptions) preview(planResources *models.Spec, project *projectstack.Project, stack *projectstack.Stack, -) (*operation.Changes, error) { +) (*opsmodels.Changes, error) { log.Info("Start compute preview changes ...") kubernetesRuntime, err := runtime.NewKubernetesRuntime() @@ -126,33 +130,33 @@ func (o *DestroyOptions) preview(planResources *models.Spec, } pc := &operation.PreviewOperation{ - Operation: operation.Operation{ - OperationType: operation.DestroyPreview, + Operation: opsmodels.Operation{ + OperationType: types.DestroyPreview, Runtime: kubernetesRuntime, StateStorage: &states.FileSystemState{Path: filepath.Join(o.WorkDir, states.KusionState)}, - Order: &operation.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*operation.ChangeStep{}}, + ChangeOrder: &opsmodels.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*opsmodels.ChangeStep{}}, }, } log.Info("Start call pc.Preview() ...") rsp, s := pc.Preview(&operation.PreviewRequest{ - Request: operation.Request{ + Request: opsmodels.Request{ Tenant: project.Tenant, Project: project.Name, Operator: o.Operator, Stack: stack.Name, - Manifest: planResources, + Spec: planResources, }, }) if status.IsErr(s) { return nil, fmt.Errorf("preview failed, status: %v", s) } - return operation.NewChanges(project, stack, rsp.Order), nil + return opsmodels.NewChanges(project, stack, rsp.Order), nil } -func (o *DestroyOptions) destroy(planResources *models.Spec, changes *operation.Changes) error { +func (o *DestroyOptions) destroy(planResources *models.Spec, changes *opsmodels.Changes) error { // Build apply operation kubernetesRuntime, err := runtime.NewKubernetesRuntime() if err != nil { @@ -160,10 +164,10 @@ func (o *DestroyOptions) destroy(planResources *models.Spec, changes *operation. } do := &operation.DestroyOperation{ - Operation: operation.Operation{ + Operation: opsmodels.Operation{ Runtime: kubernetesRuntime, StateStorage: &states.FileSystemState{Path: filepath.Join(o.WorkDir, states.KusionState)}, - MsgCh: make(chan operation.Message), + MsgCh: make(chan opsmodels.Message), }, } @@ -196,13 +200,13 @@ func (o *DestroyOptions) destroy(planResources *models.Spec, changes *operation. changeStep := changes.Get(msg.ResourceID) switch msg.OpResult { - case operation.Success, operation.Skip: + case opsmodels.Success, opsmodels.Skip: var title string - if changeStep.Action == operation.UnChange { + if changeStep.Action == types.UnChange { title = fmt.Sprintf("%s %s, %s", changeStep.Action.String(), pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(operation.Skip)), + strings.ToLower(string(opsmodels.Skip)), ) } else { title = fmt.Sprintf("%s %s %s", @@ -215,7 +219,7 @@ func (o *DestroyOptions) destroy(planResources *models.Spec, changes *operation. progressbar.UpdateTitle(title) progressbar.Increment() deleted++ - case operation.Failed: + case opsmodels.Failed: title := fmt.Sprintf("%s %s %s", changeStep.Action.String(), pterm.Bold.Sprint(changeStep.ID), @@ -235,12 +239,12 @@ func (o *DestroyOptions) destroy(planResources *models.Spec, changes *operation. }() st := do.Destroy(&operation.DestroyRequest{ - Request: operation.Request{ + Request: opsmodels.Request{ Tenant: changes.Project().Tenant, Project: changes.Project().Name, Operator: o.Operator, Stack: changes.Stack().Name, - Manifest: planResources, + Spec: planResources, }, }) if status.IsErr(st) { diff --git a/pkg/kusionctl/cmd/destroy/options_test.go b/pkg/kusionctl/cmd/destroy/options_test.go index 59c32027..4ad1202f 100644 --- a/pkg/kusionctl/cmd/destroy/options_test.go +++ b/pkg/kusionctl/cmd/destroy/options_test.go @@ -11,6 +11,11 @@ import ( "testing" "time" + "kusionstack.io/kusion/pkg/engine/operation" + opsmodels "kusionstack.io/kusion/pkg/engine/operation/models" + + "kusionstack.io/kusion/pkg/engine/operation/types" + "bou.ke/monkey" "github.com/AlecAivazis/survey/v2" "github.com/pterm/pterm" @@ -19,7 +24,6 @@ import ( "kusionstack.io/kusion/pkg/compile" "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/models" - "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/projectstack" "kusionstack.io/kusion/pkg/status" @@ -58,7 +62,7 @@ func TestDestroyOptions_Run(t *testing.T) { mockCompileWithSpinner() mockNewKubernetesRuntime() mockOperationPreview() - mockOperationDestroy(operation.Success) + mockOperationDestroy(opsmodels.Success) o := NewDestroyOptions() mockPromptOutput("yes") @@ -142,15 +146,15 @@ func (f *fakerRuntime) Watch(ctx context.Context, resourceState *models.Resource } func mockOperationPreview() { - monkey.Patch((*operation.Operation).Preview, - func(*operation.Operation, *operation.PreviewRequest) (rsp *operation.PreviewResponse, s status.Status) { + monkey.Patch((*operation.PreviewOperation).Preview, + func(*operation.PreviewOperation, *operation.PreviewRequest) (rsp *operation.PreviewResponse, s status.Status) { return &operation.PreviewResponse{ - Order: &operation.ChangeOrder{ + Order: &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Delete, + Action: types.Delete, Original: &sa1, Modified: nil, }, @@ -191,28 +195,28 @@ func Test_destroy(t *testing.T) { t.Run("destroy success", func(t *testing.T) { defer monkey.UnpatchAll() mockNewKubernetesRuntime() - mockOperationDestroy(operation.Success) + mockOperationDestroy(opsmodels.Success) o := NewDestroyOptions() planResources := &models.Spec{Resources: []models.Resource{sa2}} - order := &operation.ChangeOrder{ + order := &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID, sa2.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Delete, + Action: types.Delete, Original: &sa1, Modified: nil, }, sa2.ID: { ID: sa2.ID, - Action: operation.UnChange, + Action: types.UnChange, Original: &sa2, Modified: &sa2, }, }, } - changes := operation.NewChanges(project, stack, order) + changes := opsmodels.NewChanges(project, stack, order) err := o.destroy(planResources, changes) assert.Nil(t, err) @@ -220,50 +224,50 @@ func Test_destroy(t *testing.T) { t.Run("destroy failed", func(t *testing.T) { defer monkey.UnpatchAll() mockNewKubernetesRuntime() - mockOperationDestroy(operation.Failed) + mockOperationDestroy(opsmodels.Failed) o := NewDestroyOptions() planResources := &models.Spec{Resources: []models.Resource{sa1}} - order := &operation.ChangeOrder{ + order := &opsmodels.ChangeOrder{ StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*operation.ChangeStep{ + ChangeSteps: map[string]*opsmodels.ChangeStep{ sa1.ID: { ID: sa1.ID, - Action: operation.Delete, + Action: types.Delete, Original: &sa1, Modified: nil, }, }, } - changes := operation.NewChanges(project, stack, order) + changes := opsmodels.NewChanges(project, stack, order) err := o.destroy(planResources, changes) assert.NotNil(t, err) }) } -func mockOperationDestroy(res operation.OpResult) { - monkey.Patch((*operation.Operation).Destroy, - func(o *operation.Operation, request *operation.DestroyRequest) status.Status { +func mockOperationDestroy(res opsmodels.OpResult) { + monkey.Patch((*operation.DestroyOperation).Destroy, + func(o *operation.DestroyOperation, request *operation.DestroyRequest) status.Status { var err error - if res == operation.Failed { + if res == opsmodels.Failed { err = errors.New("mock error") } - for _, r := range request.Manifest.Resources { + for _, r := range request.Spec.Resources { // ing -> $res - o.MsgCh <- operation.Message{ + o.MsgCh <- opsmodels.Message{ ResourceID: r.ResourceKey(), OpResult: "", OpErr: nil, } - o.MsgCh <- operation.Message{ + o.MsgCh <- opsmodels.Message{ ResourceID: r.ResourceKey(), OpResult: res, OpErr: err, } } close(o.MsgCh) - if res == operation.Failed { + if res == opsmodels.Failed { return status.NewErrorStatus(err) } return nil