Skip to content

Commit

Permalink
feat: enable concurrent manipulation of TF resources since they are s…
Browse files Browse the repository at this point in the history
…tored in separate directories
  • Loading branch information
SparkYuan committed Jul 2, 2024
1 parent 884ff58 commit 07d943a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 97 deletions.
2 changes: 1 addition & 1 deletion docs/design/kusion_module/kusion_modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Kusion module is a reusable building block of KusionStack designed by platform e
3. Modules should not have dependencies or be nested within each other.
4. AppConfig is not a Module.

For more details, please visit our [official website](https://www.kusionstack.io/docs/concepts/kusion-module/overview).
For more details, please visit our [official website](https://www.kusionstack.io/docs/concepts/module/overview).

![module](../collaboration/kusion-module.png)

Expand Down
83 changes: 29 additions & 54 deletions pkg/engine/runtime/terraform/terraform_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/spf13/afero"

"github.com/patrickmn/go-cache"

apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
Expand All @@ -21,48 +18,35 @@ import (
"kusionstack.io/kusion/pkg/log"
)

var _ runtime.Runtime = &TerraformRuntime{}
var _ runtime.Runtime = &Runtime{}

// tfEvents is used to record the operation events of the Terraform
// resources into the related channels for watching.
var tfEvents = cache.New(cache.NoExpiration, cache.NoExpiration)

type TerraformRuntime struct {
tfops.WorkSpace
mu *sync.Mutex
}
type Runtime struct{}

func NewTerraformRuntime(_ *apiv1.Resource) (runtime.Runtime, error) {
fs := afero.Afero{Fs: afero.NewOsFs()}
ws := tfops.NewWorkSpace(fs)
TFRuntime := &TerraformRuntime{
WorkSpace: *ws,
mu: &sync.Mutex{},
}
TFRuntime := &Runtime{}
return TFRuntime, nil
}

// Apply Terraform resource
func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequest) *runtime.ApplyResponse {
t.mu.Lock()
defer t.mu.Unlock()

func (t *Runtime) Apply(ctx context.Context, request *runtime.ApplyRequest) *runtime.ApplyResponse {
plan := request.PlanResource
stackPath := request.Stack.Path
key := plan.ResourceKey()
tfCacheDir := buildTFCacheDir(stackPath, key)
t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(plan)
ws := tfops.NewWorkSpace(plan, stackPath, tfCacheDir)

if err := t.WorkSpace.WriteHCL(); err != nil {
if err := ws.WriteHCL(); err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

_, err := os.Stat(filepath.Join(tfCacheDir, tfops.LockHCLFile))
if err != nil {
if os.IsNotExist(err) {
if err := t.WorkSpace.InitWorkSpace(ctx); err != nil {
if err := ws.InitWorkSpace(ctx); err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
Expand All @@ -72,7 +56,7 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ

// dry run by terraform plan
if request.DryRun {
pr, err := t.WorkSpace.Plan(ctx)
pr, err := ws.Plan(ctx)
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
Expand Down Expand Up @@ -105,12 +89,12 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ

// Start applying the resource.
go func() {
tfstate, err = t.WorkSpace.Apply(ctx)
tfstate, err = ws.Apply(ctx)
if err != nil {
errCh <- err
}

providerAddr, err = t.WorkSpace.GetProvider()
providerAddr, err = ws.GetProvider()
errCh <- err
}()

Expand Down Expand Up @@ -145,13 +129,13 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ
}
} else {
// Apply without watching.
tfstate, err = t.WorkSpace.Apply(ctx)
tfstate, err = ws.Apply(ctx)
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

// get terraform provider version
providerAddr, err = t.WorkSpace.GetProvider()
providerAddr, err = ws.GetProvider()
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
Expand All @@ -177,7 +161,7 @@ func buildTFCacheDir(stackPath string, key string) string {
}

// Read terraform show state
func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadRequest) *runtime.ReadResponse {
func (t *Runtime) Read(ctx context.Context, request *runtime.ReadRequest) *runtime.ReadResponse {
priorResource := request.PriorResource
planResource := request.PlanResource

Expand Down Expand Up @@ -206,23 +190,18 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
}
}

var tfstate *tfops.StateRepresentation

t.mu.Lock()
defer t.mu.Unlock()
var tfState *tfops.StateRepresentation
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, planResource.ResourceKey())
t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(planResource)

if err := t.WorkSpace.WriteHCL(); err != nil {
ws := tfops.NewWorkSpace(planResource, stackPath, tfCacheDir)
if err := ws.WriteHCL(); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
_, err := os.Stat(filepath.Join(tfCacheDir, tfops.LockHCLFile))
if err != nil {
if os.IsNotExist(err) {
if err := t.WorkSpace.InitWorkSpace(ctx); err != nil {
if err := ws.InitWorkSpace(ctx); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
Expand All @@ -235,33 +214,33 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
// use 'terraform import' to import the latest state.
importID, ok := planResource.Extensions[tfops.ImportIDKey].(string)
if ok && importID != "" {
if err = t.WorkSpace.ImportResource(ctx, importID); err != nil {
if err = ws.ImportResource(ctx, importID); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
return &runtime.ReadResponse{Resource: nil, Status: nil}
}
} else if err = t.WorkSpace.WriteTFState(priorResource); err != nil {
// priorResource overwrite tfstate in workspace
} else if err = ws.WriteTFState(priorResource); err != nil {
// priorResource overwrite tfState in workspace
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

tfstate, err = t.WorkSpace.RefreshOnly(ctx)
tfState, err = ws.RefreshOnly(ctx)
if err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

if tfstate == nil || tfstate.Values == nil {
if tfState == nil || tfState.Values == nil {
return &runtime.ReadResponse{Resource: nil, Status: nil}
}

// get terraform provider addr
providerAddr, err := t.WorkSpace.GetProvider()
providerAddr, err := ws.GetProvider()
if err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

r := tfops.ConvertTFState(tfstate, providerAddr)
r := tfops.ConvertTFState(tfState, providerAddr)
return &runtime.ReadResponse{
Resource: &apiv1.Resource{
ID: planResource.ID,
Expand All @@ -274,7 +253,7 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
}
}

func (t *TerraformRuntime) Import(ctx context.Context, request *runtime.ImportRequest) *runtime.ImportResponse {
func (t *Runtime) Import(ctx context.Context, request *runtime.ImportRequest) *runtime.ImportResponse {
response := t.Read(ctx, &runtime.ReadRequest{
PlanResource: request.PlanResource,
Stack: request.Stack,
Expand All @@ -294,16 +273,12 @@ func (t *TerraformRuntime) Import(ctx context.Context, request *runtime.ImportRe
}

// Delete terraform resource and remove workspace
func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRequest) (res *runtime.DeleteResponse) {
func (t *Runtime) Delete(ctx context.Context, request *runtime.DeleteRequest) (res *runtime.DeleteResponse) {
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, request.Resource.ResourceKey())
t.mu.Lock()
defer t.mu.Unlock()

t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(request.Resource)
if err := t.WorkSpace.Destroy(ctx); err != nil {
ws := tfops.NewWorkSpace(request.Resource, stackPath, tfCacheDir)
if err := ws.Destroy(ctx); err != nil {
return &runtime.DeleteResponse{Status: v1.NewErrorStatus(err)}
}

Expand All @@ -316,7 +291,7 @@ func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRe
}

// Watch terraform resource
func (t *TerraformRuntime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse {
func (t *Runtime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse {
// Get the event channel.
id := request.Resource.ResourceKey()
eventCh, ok := tfEvents.Get(id)
Expand Down
7 changes: 1 addition & 6 deletions pkg/engine/runtime/terraform/terraform_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"

"github.com/bytedance/mockey"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"

"kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
Expand Down Expand Up @@ -44,10 +42,7 @@ func TestTerraformRuntime(t *testing.T) {
Path: filepath.Join(cwd, "fakePath"),
}
defer os.RemoveAll(stack.Path)
tfRuntime := TerraformRuntime{
WorkSpace: *tfops.NewWorkSpace(afero.Afero{Fs: afero.NewOsFs()}),
mu: &sync.Mutex{},
}
tfRuntime := Runtime{}

mockey.PatchConvey("ApplyDryRun", t, func() {
mockApplySetup()
Expand Down
37 changes: 14 additions & 23 deletions pkg/engine/runtime/terraform/tfops/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
"github.com/hashicorp/hcl/v2/hclparse"
"github.com/spf13/afero"

v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
"kusionstack.io/kusion/pkg/log"
Expand Down Expand Up @@ -45,21 +44,19 @@ var envTFLog = fmt.Sprintf("%s=%s", envLog, tfDebugLOG)

type WorkSpace struct {
resource *v1.Resource
fs afero.Afero
stackDir string
tfCacheDir string
}

func NewWorkSpace(resource *v1.Resource, stackDir string, tfCacheDir string) *WorkSpace {
return &WorkSpace{resource: resource, stackDir: stackDir, tfCacheDir: tfCacheDir}
}

// SetResource set workspace resource
func (w *WorkSpace) SetResource(resource *v1.Resource) {
w.resource = resource
}

// SetFS set filesystem
func (w *WorkSpace) SetFS(fs afero.Afero) {
w.fs = fs
}

// SetStackDir set workspace work directory.
func (w *WorkSpace) SetStackDir(stackDir string) {
w.stackDir = stackDir
Expand All @@ -70,12 +67,6 @@ func (w *WorkSpace) SetCacheDir(cacheDir string) {
w.tfCacheDir = cacheDir
}

func NewWorkSpace(fs afero.Afero) *WorkSpace {
return &WorkSpace{
fs: fs,
}
}

// WriteHCL convert kusion Resource to HCL json
// and write hcl json to main.tf.json
func (w *WorkSpace) WriteHCL() error {
Expand Down Expand Up @@ -115,17 +106,17 @@ func (w *WorkSpace) WriteHCL() error {

hclMain := jsonutil.Marshal2PrettyString(m)

_, err := w.fs.Stat(w.tfCacheDir)
_, err := os.Stat(w.tfCacheDir)
if err != nil {
if os.IsNotExist(err) {
if err := w.fs.MkdirAll(w.tfCacheDir, os.ModePerm); err != nil {
if err := os.MkdirAll(w.tfCacheDir, os.ModePerm); err != nil {
return fmt.Errorf("create workspace error: %v", err)
}
} else {
return err
}
}
err = w.fs.WriteFile(filepath.Join(w.tfCacheDir, mainTFFile), []byte(hclMain), 0o600)
err = os.WriteFile(filepath.Join(w.tfCacheDir, mainTFFile), []byte(hclMain), 0o600)
if err != nil {
return fmt.Errorf("write hcl main.tf.json error: %v", err)
}
Expand Down Expand Up @@ -178,7 +169,7 @@ func (w *WorkSpace) WriteTFState(priorState *v1.Resource) error {
}
hclState := jsonutil.Marshal2PrettyString(m)

err := w.fs.WriteFile(filepath.Join(w.tfCacheDir, tfStateFile), []byte(hclState), os.ModePerm)
err := os.WriteFile(filepath.Join(w.tfCacheDir, tfStateFile), []byte(hclState), os.ModePerm)
if err != nil {
return fmt.Errorf("write hcl error: %v", err)
}
Expand All @@ -193,7 +184,7 @@ func (w *WorkSpace) ClearTFState() error {
}
hclState := jsonutil.Marshal2PrettyString(m)

err := w.fs.WriteFile(filepath.Join(w.tfCacheDir, tfStateFile), []byte(hclState), os.ModePerm)
err := os.WriteFile(filepath.Join(w.tfCacheDir, tfStateFile), []byte(hclState), os.ModePerm)
if err != nil {
return fmt.Errorf("write hcl error: %v", err)
}
Expand Down Expand Up @@ -239,7 +230,7 @@ func (w *WorkSpace) Apply(ctx context.Context) (*StateRepresentation, error) {
return nil, err
}

cmd := exec.CommandContext(ctx, "terraform", chdir, "apply", "-auto-approve", "-json", "-lock=false")
cmd := exec.CommandContext(ctx, "terraform", chdir, "apply", "-auto-approve", "-json")
cmd.Dir = w.stackDir
envs, err := w.initEnvs()
if err != nil {
Expand Down Expand Up @@ -296,7 +287,7 @@ func (w *WorkSpace) Import(ctx context.Context, to, id string) error {
return err
}

cmd := exec.CommandContext(ctx, "terraform", chdir, "import", "-lock=false", to, id)
cmd := exec.CommandContext(ctx, "terraform", chdir, "import", to, id)
cmd.Dir = w.stackDir
envs, err := w.initEnvs()
if err != nil {
Expand All @@ -314,7 +305,7 @@ func (w *WorkSpace) Import(ctx context.Context, to, id string) error {

// ShowState shows local tfstate with the terraform cli show command
func (w *WorkSpace) ShowState(ctx context.Context) (*StateRepresentation, error) {
fi, err := w.fs.Stat(filepath.Join(w.tfCacheDir, tfStateFile))
fi, err := os.Stat(filepath.Join(w.tfCacheDir, tfStateFile))
if os.IsNotExist(err) {
return nil, nil
}
Expand All @@ -335,7 +326,7 @@ func (w *WorkSpace) ShowState(ctx context.Context) (*StateRepresentation, error)

// ShowPlan shows local plan file with the terraform cli show command
func (w *WorkSpace) ShowPlan(ctx context.Context) (*PlanRepresentation, error) {
fi, err := w.fs.Stat(filepath.Join(w.tfCacheDir, tfPlanFile))
fi, err := os.Stat(filepath.Join(w.tfCacheDir, tfPlanFile))
if os.IsNotExist(err) {
return nil, nil
}
Expand Down Expand Up @@ -372,7 +363,7 @@ func (w *WorkSpace) RefreshOnly(ctx context.Context) (*StateRepresentation, erro
if err != nil {
return nil, err
}
cmd := exec.CommandContext(ctx, "terraform", chdir, "apply", "-auto-approve", "-json", "--refresh-only", "-lock=false")
cmd := exec.CommandContext(ctx, "terraform", chdir, "apply", "-auto-approve", "-json", "--refresh-only")
cmd.Dir = w.stackDir

envs, err := w.initEnvs()
Expand Down
Loading

0 comments on commit 07d943a

Please sign in to comment.