Skip to content

Commit

Permalink
chore: parallelize tests (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 authored May 5, 2024
1 parent 7bb8c2b commit 5ca1dd3
Show file tree
Hide file tree
Showing 25 changed files with 491 additions and 182 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/otiai10/copy v1.14.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.6 // indirect
github.com/zclconf/go-cty v1.13.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU=
github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w=
github.com/packer-community/winrmcp v0.0.0-20180921211025-c76d91c1e7db/go.mod h1:f6Izs6JvFTdnRbziASagjZ2vmf55NSIkC/weStxCHqk=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0=
Expand Down
192 changes: 106 additions & 86 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"os"
"sync"

tea "github.com/charmbracelet/bubbletea"
"github.com/leg100/pug/internal"
Expand All @@ -21,25 +22,8 @@ import (
"github.com/leg100/pug/internal/workspace"
)

type app struct {
modules *module.Service
workspaces *workspace.Service
states *state.Service
runs *run.Service
tasks *task.Service
logger *logging.Logger
cfg config
}

type sender interface {
Send(tea.Msg)
}

// Start the app.
func Start(stdout, stderr io.Writer, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Parse configuration from env vars and flags
cfg, err := parse(stderr, args)
if err != nil {
Expand All @@ -51,10 +35,12 @@ func Start(stdout, stderr io.Writer, args []string) error {
return nil
}

app, model, err := newApp(ctx, cfg)
// Start daemons and create event subscriptions.
app, err := startApp(cfg, stdout)
if err != nil {
return err
}
defer app.cleanup()

// Log some info useful to the user
app.logger.Info("loaded config",
Expand All @@ -66,7 +52,7 @@ func Start(stdout, stderr io.Writer, args []string) error {
)

p := tea.NewProgram(
model,
app.model,
// Use the full size of the terminal with its "alternate screen buffer"
tea.WithAltScreen(),
// Enabling mouse cell motion removes the ability to "blackboard" text
Expand All @@ -77,35 +63,44 @@ func Start(stdout, stderr io.Writer, args []string) error {
// tea.WithMouseCellMotion(),
)

// Start daemons and relay events. When the app exits, use the returned func
// to wait for any running tasks to complete.
cleanup := app.start(ctx, stdout, p)
// Relay events to TUI
app.relay(p)

// Blocks until user quits
if _, err := p.Run(); err != nil {
return err
}

return cleanup()
return nil
}

type app struct {
model tea.Model
ch chan tea.Msg
logger *logging.Logger
cleanup func()
}

// newApp constructs an instance of the app and the top-level TUI model.
func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) {
// startApp starts the application, constructing services, starting daemons and
// subscribing to events. The returned app is used for constructing the TUI and
// relaying events. The app's cleanup function should be called when finished.
func startApp(cfg config, stdout io.Writer) (*app, error) {
// Setup logging
logger := logging.NewLogger(cfg.loggingOptions)

// Perform any conversions from the flag parsed primitive types to pug
// defined types.
workdir, err := internal.NewWorkdir(cfg.WorkDir)
if err != nil {
return nil, nil, err
return nil, err
}

// Instantiate services
tasks := task.NewService(task.ServiceOptions{
Program: cfg.Program,
Logger: logger,
Workdir: workdir,
Program: cfg.Program,
Logger: logger,
Workdir: workdir,
UserEnvs: cfg.Envs,
})
modules := module.NewService(module.ServiceOptions{
TaskService: tasks,
Expand All @@ -118,8 +113,7 @@ func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) {
ModuleService: modules,
Logger: logger,
})
// TODO: separate auto-state pull code
states := state.NewService(ctx, state.ServiceOptions{
states := state.NewService(state.ServiceOptions{
ModuleService: modules,
WorkspaceService: workspaces,
TaskService: tasks,
Expand All @@ -135,17 +129,6 @@ func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) {
Logger: logger,
})

// Shutdown services once context is done.
go func() {
<-ctx.Done()
logger.Shutdown()
tasks.Shutdown()
modules.Shutdown()
workspaces.Shutdown()
states.Shutdown()
runs.Shutdown()
}()

// Construct top-level TUI model.
model, err := top.New(top.Options{
TaskService: tasks,
Expand All @@ -160,86 +143,123 @@ func newApp(ctx context.Context, cfg config) (*app, tea.Model, error) {
Debug: cfg.Debug,
})
if err != nil {
return nil, nil, err
return nil, err
}

app := &app{
modules: modules,
workspaces: workspaces,
runs: runs,
states: states,
tasks: tasks,
cfg: cfg,
logger: logger,
}
return app, model, nil
}
ctx, cancel := context.WithCancel(context.Background())

// start starts the app daemons and relays events to the TUI, returning a func
// to wait for running tasks to complete.
func (a *app) start(ctx context.Context, stdout io.Writer, s sender) func() error {
// Start daemons
task.StartEnqueuer(a.tasks)
run.StartScheduler(a.runs, a.workspaces)
waitfn := task.StartRunner(ctx, a.logger, a.tasks, a.cfg.MaxTasks)
task.StartEnqueuer(tasks)
run.StartScheduler(runs, workspaces)
waitTasks := task.StartRunner(ctx, logger, tasks, cfg.MaxTasks)

// Automatically load workspaces whenever modules are loaded.
a.workspaces.LoadWorkspacesUponModuleLoad(a.modules)
workspaces.LoadWorkspacesUponModuleLoad(modules)

// Relay resource events to TUI. Deliberately set up subscriptions *before*
// any events are triggered, to ensure the TUI receives all messages.
logEvents := a.logger.Subscribe()
ch := make(chan tea.Msg)
wg := sync.WaitGroup{} // sync closure of subscriptions

logEvents := logger.Subscribe()
wg.Add(1)
go func() {
for ev := range logEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()
modEvents := a.modules.Subscribe()

modEvents := modules.Subscribe()
wg.Add(1)
go func() {
for ev := range modEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()
wsEvents := a.workspaces.Subscribe()

wsEvents := workspaces.Subscribe()
wg.Add(1)
go func() {
for ev := range wsEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()
stateEvents := a.states.Subscribe()

stateEvents := states.Subscribe()
wg.Add(1)
go func() {
for ev := range stateEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()
runEvents := a.runs.Subscribe()

runEvents := runs.Subscribe()
wg.Add(1)
go func() {
for ev := range runEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()
taskEvents := a.tasks.Subscribe()

taskEvents := tasks.Subscribe()
wg.Add(1)
go func() {
for ev := range taskEvents {
s.Send(ev)
ch <- ev
}
wg.Done()
}()

// Return cleanup function to be invoked when app is terminated.
return func() error {
// Cancel any running processes
running := a.tasks.List(task.ListOptions{Status: []task.Status{task.Running}})
if len(running) > 0 {
fmt.Fprintf(stdout, "terminating %d terraform processes\n", len(running))
}
for _, task := range running {
a.tasks.Cancel(task.ID)
}
// cleanup function to be invoked when app is terminated.
cleanup := func() {
// Cancel context
cancel()

// Close subscriptions
logger.Shutdown()
tasks.Shutdown()
modules.Shutdown()
workspaces.Shutdown()
states.Shutdown()
runs.Shutdown()

// Wait for relays to finish before closing channel, to avoid sends
// to a closed channel, which would result in a panic.
wg.Wait()
close(ch)

// Remove all run artefacts (plan files etc,...)
for _, run := range a.runs.List(run.ListOptions{}) {
for _, run := range runs.List(run.ListOptions{}) {
_ = os.RemoveAll(run.ArtefactsPath())
}
// Wait for canceled tasks to terminate.
return waitfn()

// Wait for running tasks to terminate. Canceling the context (above)
// sends each task a termination signal so each task's process should
// shut itself down.
waitTasks()
}
return &app{
model: model,
ch: ch,
cleanup: cleanup,
logger: logger,
}, nil
}

type tui interface {
Send(tea.Msg)
}

// relay events to TUI.
func (a *app) relay(s tui) {
go func() {
for msg := range a.ch {
s.Send(msg)
}
}()
}
2 changes: 2 additions & 0 deletions internal/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type config struct {
DisableReloadAfterApply bool
WorkDir string
DataDir string
Envs []string

loggingOptions logging.Options
version bool
Expand All @@ -44,6 +45,7 @@ func parse(stderr io.Writer, args []string) (config, error) {
fs.StringVar(&cfg.WorkDir, 'w', "workdir", ".", "The working directory containing modules.")
fs.IntVar(&cfg.MaxTasks, 't', "max-tasks", 2*runtime.NumCPU(), "The maximum number of parallel tasks.")
fs.StringVar(&cfg.DataDir, 0, "data-dir", defaultDataDir, "Directory in which to store plan files.")
fs.StringListVar(&cfg.Envs, 'e', "env", "Environment variable to pass to terraform process. Can set more than once.")
fs.StringEnumVar(&cfg.FirstPage, 'f', "first-page", "The first page to open on startup.", "modules", "workspaces", "runs", "tasks", "logs")
fs.BoolVar(&cfg.Debug, 'd', "debug", "Log bubbletea messages to messages.log")
fs.BoolVar(&cfg.version, 'v', "version", "Print version.")
Expand Down
21 changes: 21 additions & 0 deletions internal/app/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ func TestConfig(t *testing.T) {
assert.Equal(t, got.FirstPage, "runs")
},
},
{
"set terraform process environment variable",
"",
[]string{"-e", "TF_LOG=DEBUG"},
nil,
func(t *testing.T, got config) {
assert.Equal(t, got.Envs, []string{"TF_LOG=DEBUG"})
},
},
{
"set multiple terraform process environment variables",
"",
[]string{"-e", "TF_LOG=DEBUG", "-e", "TF_IGNORE=TRACE", "-e", "TF_PLUGIN_CACHE_DIR=/tmp"},
nil,
func(t *testing.T, got config) {
assert.Len(t, got.Envs, 3)
assert.Contains(t, got.Envs, "TF_LOG=DEBUG")
assert.Contains(t, got.Envs, "TF_IGNORE=TRACE")
assert.Contains(t, got.Envs, "TF_PLUGIN_CACHE_DIR=/tmp")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 5ca1dd3

Please sign in to comment.