Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor state #67

Merged
merged 3 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions internal/state/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"encoding/json"
)

const (
StateFileResourceInstanceTainted StateFileResourceInstanceStatus = "tainted"
)

type (
// StateFile is the terraform state file contents
StateFile struct {
FormatVersion string `json:"format_version"`
Version int
TerraformVersion string `json:"terraform_version"`
Values StateFileValues
}

StateFileValues struct {
Outputs map[string]StateFileOutput
RootModule StateFileModule `json:"root_module"`
Serial int64
Lineage string
Outputs map[string]StateFileOutput
Resources []StateFileResource
}

// StateFileOutput is an output in the terraform state file
Expand All @@ -29,27 +31,18 @@ type (
}

StateFileResource struct {
Address ResourceAddress
Tainted bool
Name string
ProviderURI string `json:"provider"`
Type string
Module string
Instances []StateFileResourceInstance
}
)

func getResourcesFromFile(f StateFile) map[ResourceAddress]*Resource {
m := make(map[ResourceAddress]*Resource)
return getResourcesFromStateFileModule(f.Values.RootModule, m)
}

func getResourcesFromStateFileModule(mod StateFileModule, m map[ResourceAddress]*Resource) map[ResourceAddress]*Resource {
for _, res := range mod.Resources {
m[res.Address] = &Resource{
Address: res.Address,
}
if res.Tainted {
m[res.Address].Status = Tainted
}
}
for _, child := range mod.ChildModules {
m = getResourcesFromStateFileModule(child, m)
StateFileResourceInstance struct {
IndexKey *int `json:"index_key"`
Status StateFileResourceInstanceStatus
Attributes json.RawMessage
}
return m
}

StateFileResourceInstanceStatus string
)
45 changes: 0 additions & 45 deletions internal/state/file_test.go

This file was deleted.

30 changes: 11 additions & 19 deletions internal/state/resource.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
package state

import "github.com/leg100/pug/internal/resource"

// Resource is a pug state resource.
type Resource struct {
resource.Common

Address ResourceAddress
Status ResourceStatus
Tainted bool
}

type ResourceStatus string
func newResource(ws resource.Resource, addr ResourceAddress) *Resource {
return &Resource{
Common: resource.New(resource.StateResource, ws),
Address: addr,
}
}

type ResourceAddress string

const (
// Idle means the resource is idle (no tasks are currently operating on
// it).
Idle ResourceStatus = "idle"
// Removing means the resource is in the process of being removed.
Removing ResourceStatus = "removing"
// Tainting means the resource is in the process of being tainted.
Tainting ResourceStatus = "tainting"
// Tainted means the resource is currently tainted
Tainted ResourceStatus = "tainted"
// Untainting means the resource is in the process of being untainted.
Untainting ResourceStatus = "untainting"
// Moving means the resource is in the process of being moved to a different
// address.
Moving ResourceStatus = "moving"
)
134 changes: 13 additions & 121 deletions internal/state/service.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package state

import (
"encoding/json"
"errors"
"slices"

"github.com/leg100/pug/internal/logging"
"github.com/leg100/pug/internal/module"
"github.com/leg100/pug/internal/pubsub"
Expand Down Expand Up @@ -57,72 +53,29 @@ func NewService(opts ServiceOptions) *Service {

// Get retrieves the state for a workspace.
func (s *Service) Get(workspaceID resource.ID) (*State, error) {
state, err := s.cache.Get(workspaceID)
if errors.Is(err, resource.ErrNotFound) {
return EmptyState(workspaceID), nil
}
return state, err
return s.cache.Get(workspaceID)
}

// Reload creates a task to repopulate the local cache of the state of the given
// workspace.
func (s *Service) Reload(workspaceID resource.ID) (*task.Task, error) {
err := s.updateStateStatus(workspaceID, func(existing *State) error {
return existing.startReload()
})
if err != nil {
return nil, err
}

revertIdle := func() {
s.updateStateStatus(workspaceID, func(existing *State) error {
existing.State = IdleState
return nil
})
}

ws, err := s.workspaces.Get(workspaceID)
if err != nil {
return nil, err
}

task, err := s.createTask(workspaceID, task.CreateOptions{
Command: []string{"show"},
Args: []string{"-json"},
return s.createTask(workspaceID, task.CreateOptions{
Command: []string{"state", "pull"},
JSON: true,
AfterError: func(t *task.Task) {
revertIdle()
s.logger.Error("reloading state", "error", t.Err, "workspace", ws)
},
AfterCanceled: func(t *task.Task) {
revertIdle()
s.logger.Error("reloading state", "error", t.Err, "workspace", t.Workspace())
},
AfterExited: func(t *task.Task) {
var file StateFile
if err := json.NewDecoder(t.NewReader()).Decode(&file); err != nil {
s.logger.Error("reloading state", "error", err, "workspace", ws)
state, err := newState(t.Workspace(), t.NewReader())
if err != nil {
s.logger.Error("reloading state", "error", err, "workspace", t.Workspace())
return
}
current := newState(workspaceID, file)
// For each current resource, check if it previously existed in the
// cache, and if so, copy across its status.
if previous, err := s.cache.Get(workspaceID); err == nil {
for currentAddress := range current.Resources {
if previousResource, ok := previous.Resources[currentAddress]; ok {
current.Resources[currentAddress].Status = previousResource.Status
}
}
}
// Add/replace state in cache.
s.cache.Add(workspaceID, current)
s.logger.Info("reloaded state", "workspace", ws, "resources", len(current.Resources))
s.cache.Add(workspaceID, state)
s.logger.Info("reloaded state", "workspace", t.Workspace(), "resources", len(state.Resources))
},
})
if err != nil {
revertIdle()
return nil, err
}
return task, nil
}

func (s *Service) Delete(workspaceID resource.ID, addrs ...ResourceAddress) (*task.Task, error) {
Expand All @@ -134,24 +87,11 @@ func (s *Service) Delete(workspaceID resource.ID, addrs ...ResourceAddress) (*ta
Blocking: true,
Command: []string{"state", "rm"},
Args: addrStrings,
AfterCreate: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Removing, addrs...)
},
AfterError: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, addrs...)
s.logger.Error("deleting resources", "error", t.Err, "resources", addrs)
},
AfterCanceled: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, addrs...)
},
AfterExited: func(t *task.Task) {
s.cache.Update(workspaceID, func(existing *State) error {
// Remove resources from cache
for _, addr := range addrs {
delete(existing.Resources, addr)
}
return nil
})
s.Reload(workspaceID)
},
})
}
Expand All @@ -161,18 +101,11 @@ func (s *Service) Taint(workspaceID resource.ID, addr ResourceAddress) (*task.Ta
Blocking: true,
Command: []string{"taint"},
Args: []string{string(addr)},
AfterCreate: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Tainting, addr)
},
AfterError: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, addr)
s.logger.Error("tainting resource", "error", t.Err, "resource", addr)
},
AfterCanceled: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, addr)
},
AfterExited: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Tainted, addr)
s.Reload(workspaceID)
},
})
}
Expand All @@ -182,18 +115,11 @@ func (s *Service) Untaint(workspaceID resource.ID, addr ResourceAddress) (*task.
Blocking: true,
Command: []string{"untaint"},
Args: []string{string(addr)},
AfterCreate: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Untainting, addr)
},
AfterError: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, addr)
s.logger.Error("untainting resource", "error", t.Err, "resource", addr)
},
AfterCanceled: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Tainted, addr)
},
AfterExited: func(t *task.Task) {
s.updateResourceStatus(workspaceID, "", addr)
s.Reload(workspaceID)
},
})
}
Expand All @@ -203,23 +129,11 @@ func (s *Service) Move(workspaceID resource.ID, src, dest ResourceAddress) (*tas
Blocking: true,
Command: []string{"state", "mv"},
Args: []string{string(src), string(dest)},
AfterCreate: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Moving, src)
},
AfterError: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, src)
s.logger.Error("moving resource", "error", t.Err, "resources", src)
},
AfterCanceled: func(t *task.Task) {
s.updateResourceStatus(workspaceID, Idle, src)
},
AfterExited: func(t *task.Task) {
// Upon success, move the resource in the cache itself.
s.cache.Update(workspaceID, func(state *State) error {
delete(state.Resources, src)
state.Resources[dest] = &Resource{Address: dest}
return nil
})
s.Reload(workspaceID)
},
})
}
Expand All @@ -236,25 +150,3 @@ func (s *Service) createTask(workspaceID resource.ID, opts task.CreateOptions) (

return s.tasks.Create(opts)
}

func (s *Service) updateStateStatus(workspaceID resource.ID, fn func(*State) error) error {
var err error
s.cache.Update(workspaceID, func(existing *State) error {
if updateErr := fn(existing); updateErr != nil {
err = updateErr
}
return nil
})
return err
}

func (s *Service) updateResourceStatus(workspaceID resource.ID, state ResourceStatus, addrs ...ResourceAddress) {
s.cache.Update(workspaceID, func(existing *State) error {
for _, res := range existing.Resources {
if slices.Contains(addrs, res.Address) {
res.Status = state
}
}
return nil
})
}
Loading
Loading