Skip to content

Commit

Permalink
Support manipulating multi-runtime resources in one Stack
Browse files Browse the repository at this point in the history
upgrade all operations to support manipulating multi-runtime resources in one Stack. This commit consist of two major changes

update the Runtime field in Operation to RuntimeMap to contains multi-runtimes
move Runtime initialization from cmd to engine because we can figure out all Runtime types in one Stack only if merging plan resources and prior resources together
  • Loading branch information
SparkYuan committed Feb 1, 2023
1 parent 9d43e0a commit bb4df7a
Show file tree
Hide file tree
Showing 20 changed files with 165 additions and 143 deletions.
11 changes: 10 additions & 1 deletion pkg/engine/operation/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"kusionstack.io/kusion/pkg/engine/operation/graph"
opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"
"kusionstack.io/kusion/pkg/engine/operation/parser"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/engine/states"
"kusionstack.io/kusion/pkg/log"
"kusionstack.io/kusion/pkg/status"
Expand Down Expand Up @@ -80,6 +81,14 @@ func (ao *ApplyOperation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st s
priorState, resultState := o.InitStates(&request.Request)
priorStateResourceIndex := priorState.Resources.Index()

resources := request.Spec.Resources
resources = append(resources, priorState.Resources...)
runtimesMap, s := runtimeinit.Runtimes(resources)
if status.IsErr(s) {
return nil, s
}
o.RuntimeMap = runtimesMap

// 2. build & walk DAG
applyGraph, s := NewApplyGraph(request.Spec, priorState)
if status.IsErr(s) {
Expand All @@ -94,7 +103,7 @@ func (ao *ApplyOperation) Apply(request *ApplyRequest) (rsp *ApplyResponse, st s
CtxResourceIndex: map[string]*models.Resource{},
PriorStateResourceIndex: priorStateResourceIndex,
StateResourceIndex: priorStateResourceIndex,
Runtime: o.Runtime,
RuntimeMap: o.RuntimeMap,
Stack: o.Stack,
MsgCh: o.MsgCh,
ResultState: resultState,
Expand Down
16 changes: 11 additions & 5 deletions pkg/engine/operation/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"kusionstack.io/kusion/pkg/engine/operation/graph"
opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"
"kusionstack.io/kusion/pkg/engine/runtime"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/engine/states"
"kusionstack.io/kusion/pkg/engine/states/local"
"kusionstack.io/kusion/pkg/projectstack"
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestOperation_Apply(t *testing.T) {
PriorStateResourceIndex map[string]*models.Resource
StateResourceIndex map[string]*models.Resource
Order *opsmodels.ChangeOrder
Runtime runtime.Runtime
RuntimeMap map[models.Type]runtime.Runtime
Stack *projectstack.Stack
MsgCh chan opsmodels.Message
resultState *states.State
Expand All @@ -80,7 +81,8 @@ func TestOperation_Apply(t *testing.T) {
const Jack = "jack"
mf := &models.Spec{Resources: []models.Resource{
{
ID: Jack,
ID: Jack,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": "b",
},
Expand All @@ -99,7 +101,8 @@ func TestOperation_Apply(t *testing.T) {
Operator: "faker",
Resources: []models.Resource{
{
ID: Jack,
ID: Jack,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": "b",
},
Expand Down Expand Up @@ -134,7 +137,7 @@ func TestOperation_Apply(t *testing.T) {
fields: fields{
OperationType: opsmodels.Apply,
StateStorage: &local.FileSystemState{Path: filepath.Join("test_data", local.KusionState)},
Runtime: &runtime.KubernetesRuntime{},
RuntimeMap: map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}},
MsgCh: make(chan opsmodels.Message, 5),
},
args: args{applyRequest: &ApplyRequest{opsmodels.Request{
Expand All @@ -158,7 +161,7 @@ func TestOperation_Apply(t *testing.T) {
PriorStateResourceIndex: tt.fields.PriorStateResourceIndex,
StateResourceIndex: tt.fields.StateResourceIndex,
ChangeOrder: tt.fields.Order,
Runtime: tt.fields.Runtime,
RuntimeMap: tt.fields.RuntimeMap,
Stack: tt.fields.Stack,
MsgCh: tt.fields.MsgCh,
ResultState: tt.fields.resultState,
Expand All @@ -172,6 +175,9 @@ func TestOperation_Apply(t *testing.T) {
o.ResultState = rs
return nil
})
monkey.Patch(runtimeinit.Runtimes, func(resources models.Resources) (map[models.Type]runtime.Runtime, status.Status) {
return map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}}, nil
})

gotRsp, gotSt := ao.Apply(tt.args.applyRequest)
assert.Equalf(t, tt.wantRsp.State.Stack, gotRsp.State.Stack, "Apply(%v)", tt.args.applyRequest)
Expand Down
9 changes: 8 additions & 1 deletion pkg/engine/operation/destory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"kusionstack.io/kusion/pkg/engine/operation/graph"
opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"
"kusionstack.io/kusion/pkg/engine/operation/parser"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/log"
"kusionstack.io/kusion/pkg/status"
"kusionstack.io/kusion/third_party/terraform/dag"
Expand Down Expand Up @@ -66,6 +67,12 @@ func (do *DestroyOperation) Destroy(request *DestroyRequest) (st status.Status)
resources := request.Request.Spec.Resources
priorStateResourceIndex := resources.Index()

runtimesMap, s := runtimeinit.Runtimes(resources)
if status.IsErr(s) {
return s
}
o.RuntimeMap = runtimesMap

// 2. build & walk DAG
destroyGraph, s := NewDestroyGraph(resources)
if status.IsErr(s) {
Expand All @@ -79,7 +86,7 @@ func (do *DestroyOperation) Destroy(request *DestroyRequest) (st status.Status)
CtxResourceIndex: map[string]*models.Resource{},
PriorStateResourceIndex: priorStateResourceIndex,
StateResourceIndex: priorStateResourceIndex,
Runtime: o.Runtime,
RuntimeMap: o.RuntimeMap,
Stack: o.Stack,
MsgCh: o.MsgCh,
ResultState: resultState,
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/operation/destory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func TestOperation_Destroy(t *testing.T) {
}

resourceState := models.Resource{
ID: "id1",

ID: "id1",
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"foo": "bar",
},
Expand All @@ -54,7 +54,7 @@ func TestOperation_Destroy(t *testing.T) {
opsmodels.Operation{
OperationType: opsmodels.Destroy,
StateStorage: &local.FileSystemState{Path: filepath.Join("test_data", local.KusionState)},
Runtime: &runtime.KubernetesRuntime{},
RuntimeMap: map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}},
},
}
r := &DestroyRequest{
Expand Down
10 changes: 6 additions & 4 deletions pkg/engine/operation/graph/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (rn *ResourceNode) Execute(operation *opsmodels.Operation) status.Status {
// 3. get the latest resource from runtime
readRequest := &runtime.ReadRequest{PlanResource: planedState, PriorResource: priorState, Stack: operation.Stack}

response := operation.Runtime.Read(context.Background(), readRequest)
resourceType := rn.state.Type
response := operation.RuntimeMap[resourceType].Read(context.Background(), readRequest)
liveState := response.Resource
s := response.Status
if status.IsErr(s) {
Expand All @@ -75,7 +76,7 @@ func (rn *ResourceNode) Execute(operation *opsmodels.Operation) status.Status {
rn.Action = opsmodels.Create
} else {
// Dry run to fetch predictable state
dryRunResp := operation.Runtime.Apply(context.Background(), &runtime.ApplyRequest{
dryRunResp := operation.RuntimeMap[resourceType].Apply(context.Background(), &runtime.ApplyRequest{
PriorResource: priorState,
PlanResource: planedState,
Stack: operation.Stack,
Expand Down Expand Up @@ -126,10 +127,11 @@ func (rn *ResourceNode) applyResource(operation *opsmodels.Operation, priorState

var res *models.Resource
var s status.Status
resourceType := rn.state.Type

switch rn.Action {
case opsmodels.Create, opsmodels.Update:
response := operation.Runtime.Apply(context.Background(), &runtime.ApplyRequest{
response := operation.RuntimeMap[resourceType].Apply(context.Background(), &runtime.ApplyRequest{
PriorResource: priorState, PlanResource: planedState, Stack: operation.Stack,
})
res = response.Resource
Expand All @@ -139,7 +141,7 @@ func (rn *ResourceNode) applyResource(operation *opsmodels.Operation, priorState
log.Debugf("apply status: %v", s.String())
}
case opsmodels.Delete:
response := operation.Runtime.Delete(context.Background(), &runtime.DeleteRequest{Resource: priorState, Stack: operation.Stack})
response := operation.RuntimeMap[resourceType].Delete(context.Background(), &runtime.DeleteRequest{Resource: priorState, Stack: operation.Stack})
s = response.Status
if s != nil {
log.Debugf("delete state: %v", s.String())
Expand Down
35 changes: 16 additions & 19 deletions pkg/engine/operation/graph/resource_node_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build !arm64
// +build !arm64

package graph

import (
Expand Down Expand Up @@ -36,24 +33,24 @@ func TestResourceNode_Execute(t *testing.T) {
const Eric = "eric"
mf := &models.Spec{Resources: []models.Resource{
{
ID: Pony,

ID: Pony,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"c": "d",
},
DependsOn: []string{Jack},
},
{
ID: Eric,

ID: Eric,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": ImplicitRefPrefix + "jack.a.b",
},
DependsOn: []string{Pony},
},
{
ID: Jack,

ID: Jack,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": map[string]interface{}{
"b": "c",
Expand All @@ -69,17 +66,17 @@ func TestResourceNode_Execute(t *testing.T) {
}

newResourceState := &models.Resource{
ID: Eric,

ID: Eric,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": ImplicitRefPrefix + "jack.a.b",
},
DependsOn: []string{Pony},
}

illegalResourceState := &models.Resource{
ID: Eric,

ID: Eric,
Type: runtime.Kubernetes,
Attributes: map[string]interface{}{
"a": ImplicitRefPrefix + "jack.notExist",
},
Expand Down Expand Up @@ -112,7 +109,7 @@ func TestResourceNode_Execute(t *testing.T) {
MsgCh: make(chan opsmodels.Message),
ResultState: states.NewState(),
Lock: &sync.Mutex{},
Runtime: &runtime.KubernetesRuntime{},
RuntimeMap: map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}},
}},
want: nil,
},
Expand All @@ -132,7 +129,7 @@ func TestResourceNode_Execute(t *testing.T) {
MsgCh: make(chan opsmodels.Message),
ResultState: states.NewState(),
Lock: &sync.Mutex{},
Runtime: &runtime.KubernetesRuntime{},
RuntimeMap: map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}},
}},
want: nil,
},
Expand All @@ -152,7 +149,7 @@ func TestResourceNode_Execute(t *testing.T) {
MsgCh: make(chan opsmodels.Message),
ResultState: states.NewState(),
Lock: &sync.Mutex{},
Runtime: &runtime.KubernetesRuntime{},
RuntimeMap: map[models.Type]runtime.Runtime{runtime.Kubernetes: &runtime.KubernetesRuntime{}},
}},
want: status.NewErrorStatusWithMsg(status.IllegalManifest, "can't find specified value in resource:jack by ref:jack.notExist"),
},
Expand All @@ -164,19 +161,19 @@ func TestResourceNode_Execute(t *testing.T) {
Action: tt.fields.Action,
state: tt.fields.state,
}
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.Runtime), "Apply",
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.RuntimeMap[runtime.Kubernetes]), "Apply",
func(k *runtime.KubernetesRuntime, ctx context.Context, request *runtime.ApplyRequest) *runtime.ApplyResponse {
mockState := *newResourceState
mockState.Attributes["a"] = "c"
return &runtime.ApplyResponse{
Resource: &mockState,
}
})
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.Runtime), "Delete",
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.RuntimeMap[runtime.Kubernetes]), "Delete",
func(k *runtime.KubernetesRuntime, ctx context.Context, request *runtime.DeleteRequest) *runtime.DeleteResponse {
return &runtime.DeleteResponse{Status: nil}
})
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.Runtime), "Read",
monkey.PatchInstanceMethod(reflect.TypeOf(tt.args.operation.RuntimeMap[runtime.Kubernetes]), "Read",
func(k *runtime.KubernetesRuntime, ctx context.Context, request *runtime.ReadRequest) *runtime.ReadResponse {
return &runtime.ReadResponse{Resource: request.PriorResource}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/operation/models/operation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Operation struct {
// ChangeOrder is resources' change order during this operation
ChangeOrder *ChangeOrder

// Runtime is the resource infrastructure runtime of this operation
Runtime runtime.Runtime
// RuntimeMap contains all infrastructure runtimes involved this operation. The key of this map is the Runtime type
RuntimeMap map[models.Type]runtime.Runtime

// Stack contains info about where this command is invoked
Stack *projectstack.Stack
Expand Down
12 changes: 11 additions & 1 deletion pkg/engine/operation/preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"kusionstack.io/kusion/pkg/engine/models"
"kusionstack.io/kusion/pkg/engine/operation/graph"
opsmodels "kusionstack.io/kusion/pkg/engine/operation/models"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/engine/states"
"kusionstack.io/kusion/pkg/log"
"kusionstack.io/kusion/pkg/status"
Expand Down Expand Up @@ -60,6 +61,15 @@ func (po *PreviewOperation) Preview(request *PreviewRequest) (rsp *PreviewRespon
// 1. init & build Indexes
priorState, resultState = po.InitStates(&request.Request)

// Kusion is a multi-runtime system. We initialize runtimes dynamically by resource types
resources := request.Spec.Resources
resources = append(resources, priorState.Resources...)
runtimesMap, s := runtimeinit.Runtimes(resources)
if status.IsErr(s) {
return nil, s
}
o.RuntimeMap = runtimesMap

switch o.OperationType {
case opsmodels.ApplyPreview:
priorStateResourceIndex = priorState.Resources.Index()
Expand All @@ -85,7 +95,7 @@ func (po *PreviewOperation) Preview(request *PreviewRequest) (rsp *PreviewRespon
StateResourceIndex: priorStateResourceIndex,
IgnoreFields: o.IgnoreFields,
ChangeOrder: o.ChangeOrder,
Runtime: o.Runtime, // preview need get the latest spec from runtime
RuntimeMap: o.RuntimeMap,
Stack: o.Stack,
ResultState: resultState,
Lock: &sync.Mutex{},
Expand Down
Loading

0 comments on commit bb4df7a

Please sign in to comment.