Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: add some comments #57

Merged
merged 3 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkg/engine/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Package engine holds code that drive the engine of Kusion.
//
// Kusion Engine is a middle layer between Spec and the actual infrastructure.
// The major function of this engine is to parse the Spec and to turn all actual infra resources into the desired state
// described in the Spec and almost all operations from the command line will invoke this engine to finish their jobs.
//
// It consists of 3 parts:
//
// 1. Operation Engine: this part is the entrypoint of the whole Kusion Engine and is responsible for kusion basic operations like Preview,
// Apply, Destroy, etc. The main workflow of this part is to parse resources in the Spec, figure out which resource should be modified
// according to specified operation type, and execute this operation to the real infra resources. During this workflow,
// the following two parts will be involved.
//
// 2. Runtime: it is an interface between the actual infrastructure and Kusion. All operations that trying to manipulate a resource
// should be delegated to one Runtime to make this operation effect in the actual infrastructure
//
// 3. State: state is a record of an operation's result. It is often used as a datasource for 3-way merge/diff in operations like Apply or Preview
//
// Let's get operation Preview as an example to demonstrate how the three parts cooperate in an actual operation.
//
// +-------------+
// | Operation |
// | Preview |
// +-------------+
// |
// |
// +-------------+
// | Operation |
// +---------| Engine |----------+
// | +-------------+ |
// | |
// +-------------+ +-------------+
// | State | | Runtime |
// +-------------+ +-------------+
//
// 1. parse resources in the Spec and convert into a DAG
//
// 2. Walk this DAG:
// a) get the latest state from the actual infra by the Runtime
// b) get last operation state from the State
//
// 3. Diff the two states(live state and prior state) and return the details of these diffs to cmd
package engine
4 changes: 2 additions & 2 deletions pkg/engine/models/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Resource struct {
Attributes map[string]interface{} `json:"attributes"`

// DependsOn contains all resources this resource depends on
DependsOn []string `json:"dependsOn,omitempty" yaml:"dependsOn,omitempty"`
DependsOn []string `json:"dependsOn,omitempty"`

// Extensions specifies arbitrary metadata of this resource
Extensions map[string]interface{} `json:"extensions"`
Extensions map[string]interface{} `json:"extensions,omitempty"`
}

func (r *Resource) ResourceKey() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/models/spec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package models

// Spec represent the KCL compile result
// Spec represents desired state of resources in one stack and will be applied to the actual infrastructure by the Kusion Engine
type Spec struct {
Resources Resources `json:"resources"`
}
69 changes: 41 additions & 28 deletions pkg/engine/operation/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
"sync"

opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"

"kusionstack.io/kusion/pkg/engine/operation/graph"
"kusionstack.io/kusion/pkg/engine/operation/parser"
"kusionstack.io/kusion/pkg/engine/operation/types"

"github.com/hashicorp/terraform/dag"
"github.com/hashicorp/terraform/tfdiags"

Expand All @@ -15,37 +21,43 @@ import (
)

type ApplyOperation struct {
Operation
opsmodels.Operation
}

type ApplyRequest struct {
Request `json:",inline" yaml:",inline"`
opsmodels.Request `json:",inline" yaml:",inline"`
}

type ApplyResponse struct {
State *states.State
}

func NewApplyGraph(m *models.Spec, priorState *states.State) (*dag.AcyclicGraph, status.Status) {
manifestParser := NewManifestParser(m)
graph := &dag.AcyclicGraph{}
graph.Add(&RootNode{})
specParser := parser.NewSpecParser(m)
g := &dag.AcyclicGraph{}
g.Add(&graph.RootNode{})

s := manifestParser.Parse(graph)
s := specParser.Parse(g)
if status.IsErr(s) {
return nil, s
}
deleteResourceParser := NewDeleteResourceParser(priorState.Resources)
s = deleteResourceParser.Parse(graph)
deleteResourceParser := parser.NewDeleteResourceParser(priorState.Resources)
s = deleteResourceParser.Parse(g)
if status.IsErr(s) {
return nil, s
}

return graph, s
return g, s
}

func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.Status) {
log.Infof("engine Apply start!")
// Apply means turn all actual infra resources into the desired state described in the request by invoking a specified Runtime.
// Like other operations, Apply has 3 main steps during the whole process.
// 1. parse resources and their relationship to build a DAG and should take care of those resources that will be deleted
// 2. walk this DAG and execute all graph nodes concurrently, besides the entire process should follow dependencies in this DAG
// 3. during the execution of each node, it will invoke different runtime according to the resource type
func (ao *ApplyOperation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.Status) {
log.Infof("engine: Apply start!")
o := ao.Operation

defer func() {
close(o.MsgCh)
Expand All @@ -69,32 +81,32 @@ func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.
}

// 1. init & build Indexes
priorState, resultState := initStates(o.StateStorage, &request.Request)
priorState, resultState := o.InitStates(&request.Request)
priorStateResourceIndex := priorState.Resources.Index()

// 2. build & walk DAG
graph, s := NewApplyGraph(request.Manifest, priorState)
applyGraph, s := NewApplyGraph(request.Spec, priorState)
if status.IsErr(s) {
return nil, s
}
log.Infof("Apply Graph:%s", graph.String())
log.Infof("Apply Graph:%s", applyGraph.String())

applyOperation := &ApplyOperation{
Operation: Operation{
OperationType: Apply,
Operation: opsmodels.Operation{
OperationType: types.Apply,
StateStorage: o.StateStorage,
CtxResourceIndex: map[string]*models.Resource{},
PriorStateResourceIndex: priorStateResourceIndex,
StateResourceIndex: priorStateResourceIndex,
Runtime: o.Runtime,
MsgCh: o.MsgCh,
resultState: resultState,
lock: &sync.Mutex{},
ResultState: resultState,
Lock: &sync.Mutex{},
},
}

w := &dag.Walker{Callback: applyOperation.applyWalkFun}
w.Update(graph)
w.Update(applyGraph)
// Wait
if diags := w.Wait(); diags.HasErrors() {
st = status.NewErrorStatus(diags.Err())
Expand All @@ -104,11 +116,12 @@ func (o *Operation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st status.
return &ApplyResponse{State: resultState}, nil
}

func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) {
func (ao *ApplyOperation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) {
var s status.Status
if v == nil {
return nil
}
o := &ao.Operation

defer func() {
if e := recover(); e != nil {
Expand All @@ -127,15 +140,15 @@ func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) {
}
}()

if node, ok := v.(ExecutableNode); ok {
if rn, ok2 := v.(*ResourceNode); ok2 {
o.MsgCh <- Message{rn.Hashcode().(string), "", nil}
if node, ok := v.(graph.ExecutableNode); ok {
if rn, ok2 := v.(*graph.ResourceNode); ok2 {
o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string)}

s = node.Execute(o)
if status.IsErr(s) {
o.MsgCh <- Message{rn.Hashcode().(string), Failed, fmt.Errorf("node execte failed, status: %v", s)}
o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string), OpResult: opsmodels.Failed, OpErr: fmt.Errorf("node execte failed, status: %v", s)}
} else {
o.MsgCh <- Message{rn.Hashcode().(string), Success, nil}
o.MsgCh <- opsmodels.Message{ResourceID: rn.Hashcode().(string), OpResult: opsmodels.Success}
}
} else {
s = node.Execute(o)
Expand All @@ -147,19 +160,19 @@ func (o *Operation) applyWalkFun(v dag.Vertex) (diags tfdiags.Diagnostics) {
return diags
}

func validateRequest(request *Request) status.Status {
func validateRequest(request *opsmodels.Request) status.Status {
var s status.Status

if request == nil {
return status.NewErrorStatusWithMsg(status.InvalidArgument, "request is nil")
}
if request.Manifest == nil {
if request.Spec == nil {
return status.NewErrorStatusWithMsg(status.InvalidArgument,
"request.Spec is empty. If you want to delete all resources, please use command 'destroy'")
}
resourceKeyMap := make(map[string]bool)

for _, resource := range request.Manifest.Resources {
for _, resource := range request.Spec.Resources {
key := resource.ResourceKey()
if _, ok := resourceKeyMap[key]; ok {
return status.NewErrorStatusWithMsg(status.InvalidArgument, fmt.Sprintf("Duplicate resource:%s in request.", key))
Expand Down
44 changes: 26 additions & 18 deletions pkg/engine/operation/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"sync"
"testing"

opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"

"kusionstack.io/kusion/pkg/engine/operation/graph"
"kusionstack.io/kusion/pkg/engine/operation/types"

"bou.ke/monkey"
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
Expand All @@ -21,7 +26,7 @@ import (

func Test_validateRequest(t *testing.T) {
type args struct {
request *Request
request *opsmodels.Request
}
tests := []struct {
name string
Expand All @@ -31,16 +36,16 @@ func Test_validateRequest(t *testing.T) {
{
name: "t1",
args: args{
request: &Request{},
request: &opsmodels.Request{},
},
want: status.NewErrorStatusWithMsg(status.InvalidArgument,
"request.Spec is empty. If you want to delete all resources, please use command 'destroy'"),
},
{
name: "t2",
args: args{
request: &Request{
Manifest: &models.Spec{Resources: []models.Resource{}},
request: &opsmodels.Request{
Spec: &models.Spec{Resources: []models.Resource{}},
},
},
want: nil,
Expand All @@ -57,14 +62,14 @@ func Test_validateRequest(t *testing.T) {

func TestOperation_Apply(t *testing.T) {
type fields struct {
OperationType Type
OperationType types.OperationType
StateStorage states.StateStorage
CtxResourceIndex map[string]*models.Resource
PriorStateResourceIndex map[string]*models.Resource
StateResourceIndex map[string]*models.Resource
Order *ChangeOrder
Order *opsmodels.ChangeOrder
Runtime runtime.Runtime
MsgCh chan Message
MsgCh chan opsmodels.Message
resultState *states.State
lock *sync.Mutex
}
Expand Down Expand Up @@ -113,17 +118,17 @@ func TestOperation_Apply(t *testing.T) {
{
name: "apply test",
fields: fields{
OperationType: Apply,
OperationType: types.Apply,
StateStorage: &states.FileSystemState{Path: filepath.Join("test_data", states.KusionState)},
Runtime: &runtime.KubernetesRuntime{},
MsgCh: make(chan Message, 5),
MsgCh: make(chan opsmodels.Message, 5),
},
args: args{applyRequest: &ApplyRequest{Request{
args: args{applyRequest: &ApplyRequest{opsmodels.Request{
Tenant: "fakeTenant",
Stack: "fakeStack",
Project: "fakeProject",
Operator: "faker",
Manifest: mf,
Spec: mf,
}}},
wantRsp: &ApplyResponse{rs},
wantSt: nil,
Expand All @@ -132,25 +137,28 @@ func TestOperation_Apply(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := &Operation{
o := &opsmodels.Operation{
OperationType: tt.fields.OperationType,
StateStorage: tt.fields.StateStorage,
CtxResourceIndex: tt.fields.CtxResourceIndex,
PriorStateResourceIndex: tt.fields.PriorStateResourceIndex,
StateResourceIndex: tt.fields.StateResourceIndex,
Order: tt.fields.Order,
ChangeOrder: tt.fields.Order,
Runtime: tt.fields.Runtime,
MsgCh: tt.fields.MsgCh,
resultState: tt.fields.resultState,
lock: tt.fields.lock,
ResultState: tt.fields.resultState,
Lock: tt.fields.lock,
}
ao := &ApplyOperation{
Operation: *o,
}

monkey.Patch((*ResourceNode).Execute, func(rn *ResourceNode, operation *Operation) status.Status {
o.resultState = rs
monkey.Patch((*graph.ResourceNode).Execute, func(rn *graph.ResourceNode, operation *opsmodels.Operation) status.Status {
o.ResultState = rs
return nil
})

gotRsp, gotSt := o.Apply(tt.args.applyRequest)
gotRsp, gotSt := ao.Apply(tt.args.applyRequest)
assert.Equalf(t, tt.wantRsp.State.Stack, gotRsp.State.Stack, "Apply(%v)", tt.args.applyRequest)
assert.Equalf(t, tt.wantSt, gotSt, "Apply(%v)", tt.args.applyRequest)
})
Expand Down
Loading