Skip to content

Commit

Permalink
feat: enable concurrent manipulation of TF resources since they are s…
Browse files Browse the repository at this point in the history
…tored in separate directories (#1194)
  • Loading branch information
SparkYuan committed Jul 3, 2024
1 parent e42d77f commit 1ba096b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 94 deletions.
82 changes: 31 additions & 51 deletions pkg/engine/runtime/terraform/terraform_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sync"
"time"

"github.com/spf13/afero"

"github.com/patrickmn/go-cache"

apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
Expand All @@ -21,48 +19,39 @@ import (
"kusionstack.io/kusion/pkg/log"
)

var _ runtime.Runtime = &TerraformRuntime{}
var _ runtime.Runtime = &Runtime{}

// tfEvents is used to record the operation events of the Terraform
// resources into the related channels for watching.
var tfEvents = cache.New(cache.NoExpiration, cache.NoExpiration)

type TerraformRuntime struct {
tfops.WorkSpace
mu *sync.Mutex
type Runtime struct {
mutex *sync.Mutex
}

func NewTerraformRuntime(_ *apiv1.Resource) (runtime.Runtime, error) {
fs := afero.Afero{Fs: afero.NewOsFs()}
ws := tfops.NewWorkSpace(fs)
TFRuntime := &TerraformRuntime{
WorkSpace: *ws,
mu: &sync.Mutex{},
TFRuntime := &Runtime{
mutex: &sync.Mutex{},
}
return TFRuntime, nil
}

// Apply Terraform resource
func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequest) *runtime.ApplyResponse {
t.mu.Lock()
defer t.mu.Unlock()

func (t *Runtime) Apply(ctx context.Context, request *runtime.ApplyRequest) *runtime.ApplyResponse {
plan := request.PlanResource
stackPath := request.Stack.Path
key := plan.ResourceKey()
tfCacheDir := buildTFCacheDir(stackPath, key)
t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(plan)
ws := tfops.NewWorkSpace(plan, stackPath, tfCacheDir, t.mutex)

if err := t.WorkSpace.WriteHCL(); err != nil {
if err := ws.WriteHCL(); err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

_, err := os.Stat(filepath.Join(tfCacheDir, tfops.LockHCLFile))
if err != nil {
if os.IsNotExist(err) {
if err := t.WorkSpace.InitWorkSpace(ctx); err != nil {
if err := ws.InitWorkSpace(ctx); err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
Expand All @@ -72,7 +61,7 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ

// dry run by terraform plan
if request.DryRun {
pr, err := t.WorkSpace.Plan(ctx)
pr, err := ws.Plan(ctx)
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
Expand Down Expand Up @@ -105,12 +94,12 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ

// Start applying the resource.
go func() {
tfstate, err = t.WorkSpace.Apply(ctx)
tfstate, err = ws.Apply(ctx)
if err != nil {
errCh <- err
}

providerAddr, err = t.WorkSpace.GetProvider()
providerAddr, err = ws.GetProvider()
errCh <- err
}()

Expand Down Expand Up @@ -145,13 +134,13 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ
}
} else {
// Apply without watching.
tfstate, err = t.WorkSpace.Apply(ctx)
tfstate, err = ws.Apply(ctx)
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

// get terraform provider version
providerAddr, err = t.WorkSpace.GetProvider()
providerAddr, err = ws.GetProvider()
if err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
Expand All @@ -177,7 +166,7 @@ func buildTFCacheDir(stackPath string, key string) string {
}

// Read terraform show state
func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadRequest) *runtime.ReadResponse {
func (t *Runtime) Read(ctx context.Context, request *runtime.ReadRequest) *runtime.ReadResponse {
priorResource := request.PriorResource
planResource := request.PlanResource

Expand Down Expand Up @@ -206,23 +195,18 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
}
}

var tfstate *tfops.StateRepresentation

t.mu.Lock()
defer t.mu.Unlock()
var tfState *tfops.StateRepresentation
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, planResource.ResourceKey())
t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(planResource)

if err := t.WorkSpace.WriteHCL(); err != nil {
ws := tfops.NewWorkSpace(planResource, stackPath, tfCacheDir, t.mutex)
if err := ws.WriteHCL(); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
_, err := os.Stat(filepath.Join(tfCacheDir, tfops.LockHCLFile))
if err != nil {
if os.IsNotExist(err) {
if err := t.WorkSpace.InitWorkSpace(ctx); err != nil {
if err := ws.InitWorkSpace(ctx); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
Expand All @@ -235,33 +219,33 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
// use 'terraform import' to import the latest state.
importID, ok := planResource.Extensions[tfops.ImportIDKey].(string)
if ok && importID != "" {
if err = t.WorkSpace.ImportResource(ctx, importID); err != nil {
if err = ws.ImportResource(ctx, importID); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
} else {
return &runtime.ReadResponse{Resource: nil, Status: nil}
}
} else if err = t.WorkSpace.WriteTFState(priorResource); err != nil {
// priorResource overwrite tfstate in workspace
} else if err = ws.WriteTFState(priorResource); err != nil {
// priorResource overwrite tfState in workspace
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

tfstate, err = t.WorkSpace.RefreshOnly(ctx)
tfState, err = ws.RefreshOnly(ctx)
if err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

if tfstate == nil || tfstate.Values == nil {
if tfState == nil || tfState.Values == nil {
return &runtime.ReadResponse{Resource: nil, Status: nil}
}

// get terraform provider addr
providerAddr, err := t.WorkSpace.GetProvider()
providerAddr, err := ws.GetProvider()
if err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}

r := tfops.ConvertTFState(tfstate, providerAddr)
r := tfops.ConvertTFState(tfState, providerAddr)
return &runtime.ReadResponse{
Resource: &apiv1.Resource{
ID: planResource.ID,
Expand All @@ -274,7 +258,7 @@ func (t *TerraformRuntime) Read(ctx context.Context, request *runtime.ReadReques
}
}

func (t *TerraformRuntime) Import(ctx context.Context, request *runtime.ImportRequest) *runtime.ImportResponse {
func (t *Runtime) Import(ctx context.Context, request *runtime.ImportRequest) *runtime.ImportResponse {
response := t.Read(ctx, &runtime.ReadRequest{
PlanResource: request.PlanResource,
Stack: request.Stack,
Expand All @@ -294,16 +278,12 @@ func (t *TerraformRuntime) Import(ctx context.Context, request *runtime.ImportRe
}

// Delete terraform resource and remove workspace
func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRequest) (res *runtime.DeleteResponse) {
func (t *Runtime) Delete(ctx context.Context, request *runtime.DeleteRequest) (res *runtime.DeleteResponse) {
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, request.Resource.ResourceKey())
t.mu.Lock()
defer t.mu.Unlock()

t.WorkSpace.SetStackDir(stackPath)
t.WorkSpace.SetCacheDir(tfCacheDir)
t.WorkSpace.SetResource(request.Resource)
if err := t.WorkSpace.Destroy(ctx); err != nil {
ws := tfops.NewWorkSpace(request.Resource, stackPath, tfCacheDir, t.mutex)
if err := ws.Destroy(ctx); err != nil {
return &runtime.DeleteResponse{Status: v1.NewErrorStatus(err)}
}

Expand All @@ -316,7 +296,7 @@ func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRe
}

// Watch terraform resource
func (t *TerraformRuntime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse {
func (t *Runtime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse {
// Get the event channel.
id := request.Resource.ResourceKey()
eventCh, ok := tfEvents.Get(id)
Expand Down
7 changes: 1 addition & 6 deletions pkg/engine/runtime/terraform/terraform_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"

"github.com/bytedance/mockey"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"

"kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
Expand Down Expand Up @@ -44,10 +42,7 @@ func TestTerraformRuntime(t *testing.T) {
Path: filepath.Join(cwd, "fakePath"),
}
defer os.RemoveAll(stack.Path)
tfRuntime := TerraformRuntime{
WorkSpace: *tfops.NewWorkSpace(afero.Afero{Fs: afero.NewOsFs()}),
mu: &sync.Mutex{},
}
tfRuntime := Runtime{}

mockey.PatchConvey("ApplyDryRun", t, func() {
mockApplySetup()
Expand Down
Loading

0 comments on commit 1ba096b

Please sign in to comment.