diff --git a/internal/agent/agent.go b/internal/agent/agent.go index a639ce240..53c6d3338 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -36,9 +36,10 @@ type agent struct { client.Client logr.Logger - spooler // spools new run events - *terminator // terminates runs - Downloader // terraform cli downloader + spooler // spools new run events + *terminator // terminates runs + Downloader // terraform cli downloader + terraformPathFinder // determines destination dir for terraform bins envs []string // terraform environment variables } @@ -61,14 +62,17 @@ func NewAgent(logger logr.Logger, app client.Client, cfg Config) (*agent, error) logger.V(0).Info("enabled debug mode") } + pathFinder := newTerraformPathFinder(cfg.TerraformBinDir) + agent := &agent{ - Client: app, - Config: cfg, - Logger: logger, - envs: DefaultEnvs, - spooler: newSpooler(app, logger, cfg), - terminator: newTerminator(), - Downloader: newTerraformDownloader(), + Client: app, + Config: cfg, + Logger: logger, + envs: DefaultEnvs, + spooler: newSpooler(app, logger, cfg), + terminator: newTerminator(), + Downloader: newTerraformDownloader(pathFinder), + terraformPathFinder: pathFinder, } if cfg.PluginCache { diff --git a/internal/agent/config.go b/internal/agent/config.go index b07cda086..4a54fb267 100644 --- a/internal/agent/config.go +++ b/internal/agent/config.go @@ -8,12 +8,13 @@ import ( type ( // Config is configuration for an agent. Config struct { - Organization *string // only process runs belonging to org - External bool // dedicated agent (true) or integrated into otfd (false) - Concurrency int // number of workers - Sandbox bool // isolate privileged ops within sandbox - Debug bool // toggle debug mode - PluginCache bool // toggle use of terraform's shared plugin cache + Organization *string // only process runs belonging to org + External bool // dedicated agent (true) or integrated into otfd (false) + Concurrency int // number of workers + Sandbox bool // isolate privileged ops within sandbox + Debug bool // toggle debug mode + PluginCache bool // toggle use of terraform's shared plugin cache + TerraformBinDir string // destination directory for terraform binaries } // ExternalConfig is configuration for an external agent ExternalConfig struct { diff --git a/internal/agent/downloader.go b/internal/agent/downloader.go index 7c5976bda..bab7445bb 100644 --- a/internal/agent/downloader.go +++ b/internal/agent/downloader.go @@ -17,14 +17,10 @@ const HashicorpReleasesHost = "releases.hashicorp.com" type ( // terraformDownloader downloads terraform binaries terraformDownloader struct { - // server hosting binaries - host string - // used to lookup destination path for saving download - terraform - // client for downloading from server via http - client *http.Client - // mutex channel - mu chan struct{} + host string // server hosting binaries + terraformPathFinder // used to lookup destination path for saving download + client *http.Client // client for downloading from server via http + mu chan struct{} // ensures only one download at a time } // Downloader downloads a specific version of a binary and returns its path @@ -33,15 +29,15 @@ type ( } ) -func newTerraformDownloader() *terraformDownloader { +func newTerraformDownloader(pathFinder terraformPathFinder) *terraformDownloader { mu := make(chan struct{}, 1) mu <- struct{}{} return &terraformDownloader{ - host: HashicorpReleasesHost, - terraform: &terraformPathFinder{}, - client: &http.Client{}, - mu: mu, + host: HashicorpReleasesHost, + terraformPathFinder: pathFinder, + client: &http.Client{}, + mu: mu, } } diff --git a/internal/agent/downloader_test.go b/internal/agent/downloader_test.go index 65591ca94..55d879a28 100644 --- a/internal/agent/downloader_test.go +++ b/internal/agent/downloader_test.go @@ -8,7 +8,6 @@ import ( "net/http/httptest" "net/url" "os" - "path" "testing" "github.com/stretchr/testify/assert" @@ -25,9 +24,9 @@ func TestDownloader(t *testing.T) { u, err := url.Parse(srv.URL) require.NoError(t, err) - dl := newTerraformDownloader() + pathFinder := newTerraformPathFinder(t.TempDir()) + dl := newTerraformDownloader(pathFinder) dl.host = u.Host - dl.terraform = &fakeTerraform{t.TempDir()} dl.client = &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -43,11 +42,3 @@ func TestDownloader(t *testing.T) { assert.Equal(t, "I am a fake terraform binary\n", string(tfbin)) assert.Equal(t, "downloading terraform, version 1.2.3\n", buf.String()) } - -type fakeTerraform struct { - dir string -} - -func (f *fakeTerraform) TerraformPath(version string) string { - return path.Join(f.dir, version, "terraform") -} diff --git a/internal/agent/environment.go b/internal/agent/environment.go index 1c394137f..ba3d017ab 100644 --- a/internal/agent/environment.go +++ b/internal/agent/environment.go @@ -41,13 +41,10 @@ type environment struct { func newEnvironment( ctx context.Context, logger logr.Logger, - svc client.Client, + agent *agent, run *run.Run, - envs []string, - downloader Downloader, - cfg Config, ) (*environment, error) { - ws, err := svc.GetWorkspace(ctx, run.WorkspaceID) + ws, err := agent.GetWorkspace(ctx, run.WorkspaceID) if err != nil { return nil, errors.Wrap(err, "retrieving workspace") } @@ -62,17 +59,17 @@ func newEnvironment( // via an environment variable. // // NOTE: environment variable support is only available in terraform >= 1.2.0 - token, err := svc.CreateRunToken(ctx, tokens.CreateRunTokenOptions{ + token, err := agent.CreateRunToken(ctx, tokens.CreateRunTokenOptions{ Organization: &ws.Organization, RunID: &run.ID, }) if err != nil { return nil, errors.Wrap(err, "creating registry session") } - envs = append(envs, internal.CredentialEnv(svc.Hostname(), token)) + envs := internal.SafeAppend(agent.envs, internal.CredentialEnv(agent.Hostname(), token)) // retrieve workspace variables and add them to the environment - variables, err := svc.ListVariables(ctx, run.WorkspaceID) + variables, err := agent.ListVariables(ctx, run.WorkspaceID) if err != nil { return nil, errors.Wrap(err, "retrieving workspace variables") } @@ -86,25 +83,25 @@ func newEnvironment( writer := logs.NewPhaseWriter(ctx, logs.PhaseWriterOptions{ RunID: run.ID, Phase: run.Phase(), - Writer: svc, + Writer: agent, }) env := &environment{ Logger: logger, - Client: svc, - Downloader: downloader, + Client: agent, + Downloader: agent, out: writer, workdir: wd, variables: variables, ctx: ctx, runner: &runner{out: writer}, executor: &executor{ - Config: cfg, - terraform: &terraformPathFinder{}, - version: ws.TerraformVersion, - out: writer, - envs: envs, - workdir: wd, + Config: agent.Config, + terraformPathFinder: agent.terraformPathFinder, + version: ws.TerraformVersion, + out: writer, + envs: envs, + workdir: wd, }, } diff --git a/internal/agent/executor.go b/internal/agent/executor.go index 9879bd5cf..f729a7d51 100644 --- a/internal/agent/executor.go +++ b/internal/agent/executor.go @@ -19,7 +19,7 @@ type ( // executor executes processes. executor struct { Config - terraform + terraformPathFinder version string // terraform cli version out io.Writer diff --git a/internal/agent/spooler.go b/internal/agent/spooler.go index f4fbebf62..1c44b11a7 100644 --- a/internal/agent/spooler.go +++ b/internal/agent/spooler.go @@ -105,7 +105,6 @@ func (s *spoolerDaemon) reinitialize(ctx context.Context) error { // whereas we want oldest first. for i := len(existing) - 1; i >= 0; i-- { s.handleEvent(pubsub.Event{ - Type: pubsub.EventRunStatusUpdate, Payload: existing[i], }) } @@ -139,9 +138,9 @@ func (s *spoolerDaemon) handleRun(event pubsub.EventType, run *run.Run) { if run.Queued() { s.queue <- run - } else if event == pubsub.EventRunCancel { + } else if run.Status == internal.RunCanceled { s.cancelations <- cancelation{Run: run} - } else if event == pubsub.EventRunForceCancel { + } else if run.Status == internal.RunForceCanceled { s.cancelations <- cancelation{Run: run, Forceful: true} } } diff --git a/internal/agent/spooler_test.go b/internal/agent/spooler_test.go index d0727de72..6d22feb3e 100644 --- a/internal/agent/spooler_test.go +++ b/internal/agent/spooler_test.go @@ -24,8 +24,8 @@ func TestSpooler(t *testing.T) { db := []*run.Run{run1, run2} events := make(chan pubsub.Event, 3) events <- pubsub.Event{Payload: run3} - events <- pubsub.Event{Type: pubsub.EventRunCancel, Payload: run4} - events <- pubsub.Event{Type: pubsub.EventRunForceCancel, Payload: run5} + events <- pubsub.Event{Payload: run4} + events <- pubsub.Event{Payload: run5} spooler := newSpooler( &fakeSpoolerApp{runs: db, events: events}, @@ -100,10 +100,9 @@ func TestSpooler_handleEvent(t *testing.T) { { name: "handle cancelation", event: pubsub.Event{ - Type: pubsub.EventRunCancel, Payload: &run.Run{ ExecutionMode: workspace.RemoteExecutionMode, - Status: internal.RunPlanning, + Status: internal.RunCanceled, }, }, wantCancelation: true, @@ -111,10 +110,9 @@ func TestSpooler_handleEvent(t *testing.T) { { name: "handle forceful cancelation", event: pubsub.Event{ - Type: pubsub.EventRunForceCancel, Payload: &run.Run{ ExecutionMode: workspace.RemoteExecutionMode, - Status: internal.RunPlanning, + Status: internal.RunForceCanceled, }, }, wantForceCancelation: true, diff --git a/internal/agent/terraform.go b/internal/agent/terraform.go index b9ba1f864..ff7fc1f82 100644 --- a/internal/agent/terraform.go +++ b/internal/agent/terraform.go @@ -5,13 +5,23 @@ import ( "path" ) -type terraform interface { - TerraformPath(version string) string -} +var defaultTerraformBinDir = path.Join(os.TempDir(), "otf-terraform-bins") + +type ( + terraformPathFinder struct { + dest string + } +) -type terraformPathFinder struct{} +func newTerraformPathFinder(dest string) terraformPathFinder { + if dest == "" { + dest = defaultTerraformBinDir + } + return terraformPathFinder{ + dest: dest, + } +} -// TerraformPath returns the path to a given version of the terraform binary -func (*terraformPathFinder) TerraformPath(version string) string { - return path.Join(os.TempDir(), "otf-terraform-bins", version, "terraform") +func (t terraformPathFinder) TerraformPath(version string) string { + return path.Join(t.dest, version, "terraform") } diff --git a/internal/agent/worker.go b/internal/agent/worker.go index cfdf9962e..2fcbdd713 100644 --- a/internal/agent/worker.go +++ b/internal/agent/worker.go @@ -37,11 +37,8 @@ func (w *worker) handle(ctx context.Context, r *run.Run) { env, err := newEnvironment( ctx, log, - w.Client, + w.agent, r, - w.envs, - w.Downloader, - w.Config, ) if err != nil { log.Error(err, "creating execution environment") diff --git a/internal/api/run_test.go b/internal/api/run_test.go index f46b5f3cf..42e50edcd 100644 --- a/internal/api/run_test.go +++ b/internal/api/run_test.go @@ -30,7 +30,7 @@ func TestAPI_Watch(t *testing.T) { // send one event and then close in <- pubsub.Event{ Payload: &run.Run{ID: "run-123"}, - Type: pubsub.EventRunCreated, + Type: pubsub.CreatedEvent, } close(in) @@ -43,7 +43,7 @@ func TestAPI_Watch(t *testing.T) { got = strings.TrimSpace(got) parts := strings.Split(got, "\n") if assert.Equal(t, 2, len(parts)) { - assert.Equal(t, "event: run_created", parts[1]) + assert.Equal(t, "event: created", parts[1]) if assert.Regexp(t, `data: .*`, parts[0]) { data := strings.TrimPrefix(parts[0], "data: ") // base64 decode diff --git a/internal/integration/daemon_helpers_test.go b/internal/integration/daemon_helpers_test.go index 582085d3c..448b086d3 100644 --- a/internal/integration/daemon_helpers_test.go +++ b/internal/integration/daemon_helpers_test.go @@ -450,7 +450,7 @@ func (s *testDaemon) tfcliWithError(t *testing.T, ctx context.Context, command, cmd := exec.Command("terraform", cmdargs...) cmd.Dir = configPath - cmd.Env = appendSharedEnvs(internal.CredentialEnv(s.Hostname(), token)) + cmd.Env = internal.SafeAppend(sharedEnvs, internal.CredentialEnv(s.Hostname(), token)) out, err := cmd.CombinedOutput() return string(out), err diff --git a/internal/integration/main_test.go b/internal/integration/main_test.go index 035c232e5..1b3148787 100644 --- a/internal/integration/main_test.go +++ b/internal/integration/main_test.go @@ -162,11 +162,3 @@ func setenv(name, value string) (func(), error) { os.Unsetenv(name) }, nil } - -// appendSharedEnvs appends environment variables to the shared environment -// variables in a thread-safe manner. -func appendSharedEnvs(envs ...string) []string { - dst := make([]string, len(sharedEnvs)+len(envs)) - copy(dst, sharedEnvs) - return append(dst, envs...) -} diff --git a/internal/integration/run_cancel_test.go b/internal/integration/run_cancel_test.go new file mode 100644 index 000000000..27539b4f9 --- /dev/null +++ b/internal/integration/run_cancel_test.go @@ -0,0 +1,82 @@ +package integration + +import ( + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/leg100/otf/internal" + "github.com/leg100/otf/internal/agent" + "github.com/leg100/otf/internal/variable" + "github.com/leg100/otf/internal/workspace" + "github.com/stretchr/testify/require" +) + +// TestIntegration_RunCancel demonstrates a run being canceled mid-flow. +func TestIntegration_RunCancel(t *testing.T) { + integrationTest(t) + + daemon, org, ctx := setup(t, nil) + + // stage a fake terraform bin that sleeps until it receives an interrupt + // signal + bins := filepath.Join(t.TempDir(), "bins") + dst := filepath.Join(bins, workspace.DefaultTerraformVersion, "terraform") + err := os.MkdirAll(filepath.Dir(dst), 0o755) + require.NoError(t, err) + err = os.Link("testdata/cancelme", dst) + require.NoError(t, err) + + // run a temporary http server as a means of communicating with the fake + // bin + got := make(chan string) + mux := http.NewServeMux() + mux.HandleFunc("/started", func(w http.ResponseWriter, r *http.Request) { + // fake bin has started + got <- "started" + }) + mux.HandleFunc("/canceled", func(w http.ResponseWriter, r *http.Request) { + // fake bin has received interrupt signal + got <- "canceled" + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + // start an external agent (it's the only way to specify a separate bin + // directory currently). + daemon.startAgent(t, ctx, org.Name, agent.ExternalConfig{ + Config: agent.Config{TerraformBinDir: bins}, + }) + + // create workspace specifying that it use an external agent. + ws, err := daemon.CreateWorkspace(ctx, workspace.CreateOptions{ + Name: internal.String("ws-1"), + Organization: internal.String(org.Name), + ExecutionMode: workspace.ExecutionModePtr(workspace.AgentExecutionMode), + }) + require.NoError(t, err) + + // create a variable so that the fake bin knows the url of the temp http + // server + _, err = daemon.CreateVariable(ctx, ws.ID, variable.CreateVariableOptions{ + Key: internal.String("URL"), + Value: internal.String(srv.URL), + Category: variable.VariableCategoryPtr(variable.CategoryEnv), + }) + require.NoError(t, err) + + cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil) + r := daemon.createRun(t, ctx, ws, cv) + + // fake bin process has started + require.Equal(t, "started", <-got) + + // we can now send interrupt + _, err = daemon.Cancel(ctx, r.ID) + require.NoError(t, err) + + // fake bin has received interrupt + require.Equal(t, "canceled", <-got) +} diff --git a/internal/integration/tag_e2e_test.go b/internal/integration/tag_e2e_test.go index 7c58a4ea0..0e066c568 100644 --- a/internal/integration/tag_e2e_test.go +++ b/internal/integration/tag_e2e_test.go @@ -42,7 +42,7 @@ resource "null_resource" "tags_e2e" {} []string{"terraform", "-chdir=" + root, "init", "-no-color"}, time.Minute, expect.PartialMatch(true), - expect.SetEnv(appendSharedEnvs(internal.CredentialEnv(daemon.Hostname(), token))), + expect.SetEnv(internal.SafeAppend(sharedEnvs, internal.CredentialEnv(daemon.Hostname(), token))), ) require.NoError(t, err) defer e.Close() diff --git a/internal/integration/terraform_cli_discard_test.go b/internal/integration/terraform_cli_discard_test.go index 27055b068..7fde20e42 100644 --- a/internal/integration/terraform_cli_discard_test.go +++ b/internal/integration/terraform_cli_discard_test.go @@ -32,7 +32,7 @@ func TestIntegration_TerraformCLIDiscard(t *testing.T) { []string{"terraform", "-chdir=" + configPath, "apply", "-no-color"}, time.Minute, expect.PartialMatch(true), - expect.SetEnv(appendSharedEnvs(internal.CredentialEnv(svc.Hostname(), token))), + expect.SetEnv(internal.SafeAppend(sharedEnvs, internal.CredentialEnv(svc.Hostname(), token))), ) require.NoError(t, err) defer e.Close() diff --git a/internal/integration/testdata/cancelme b/internal/integration/testdata/cancelme new file mode 100755 index 000000000..6f0776909 --- /dev/null +++ b/internal/integration/testdata/cancelme @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +function cancelme() { + curl ${URL}/canceled + exit 1 +} + +trap cancelme INT + +echo "started" + +curl ${URL}/started + +while true; do sleep 1; done diff --git a/internal/logs/proxy.go b/internal/logs/proxy.go index da425f89d..700518c94 100644 --- a/internal/logs/proxy.go +++ b/internal/logs/proxy.go @@ -104,21 +104,9 @@ func (p *proxy) get(ctx context.Context, opts internal.GetChunkOptions) (interna // put writes a chunk of data to the db func (p *proxy) put(ctx context.Context, opts internal.PutChunkOptions) error { - id, err := p.db.put(ctx, opts) - if err != nil { - return err - } - // make a chunk from the options and the id - chunk := internal.Chunk{ - ID: id, - RunID: opts.RunID, - Phase: opts.Phase, - Data: opts.Data, - Offset: opts.Offset, - } - // publish chunk for caching - p.Publish(pubsub.Event{Type: pubsub.EventLogChunk, Payload: chunk}) - return nil + // db triggers an event, which proxy listens for to populate its cache + _, err := p.db.put(ctx, opts) + return err } // cacheKey generates a key for caching log chunks. diff --git a/internal/notifications/service.go b/internal/notifications/service.go index bd2abda2b..116cc3fbb 100644 --- a/internal/notifications/service.go +++ b/internal/notifications/service.go @@ -71,7 +71,6 @@ func (s *service) CreateNotificationConfiguration(ctx context.Context, workspace return nil, err } s.Info("creating notification config", "config", nc, "subject", subject) - s.Publish(pubsub.NewCreatedEvent(nc)) return nc, nil } @@ -135,6 +134,5 @@ func (s *service) DeleteNotificationConfiguration(ctx context.Context, id string return err } s.Info("deleted notification config", "config", nc, "subject", subject) - s.Publish(pubsub.NewDeletedEvent(nc)) return nil } diff --git a/internal/organization/service.go b/internal/organization/service.go index fa707e140..c08bf0dd2 100644 --- a/internal/organization/service.go +++ b/internal/organization/service.go @@ -88,7 +88,6 @@ func (s *service) UpdateOrganization(ctx context.Context, name string, opts Orga s.V(2).Info("updated organization", "name", name, "id", org.ID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(org)) return org, nil } @@ -133,12 +132,6 @@ func (s *service) DeleteOrganization(ctx context.Context, name string) error { return err } - org, err := s.db.get(ctx, name) - if err != nil { - s.Error(err, "retrieving organization", "name", name, "subject", subject) - return err - } - err = s.db.delete(ctx, name) if err != nil { s.Error(err, "deleting organization", "name", name, "subject", subject) @@ -146,8 +139,6 @@ func (s *service) DeleteOrganization(ctx context.Context, name string) error { } s.V(0).Info("deleted organization", "name", name, "subject", subject) - s.Publish(pubsub.NewDeletedEvent(org)) - return nil } diff --git a/internal/orgcreator/service.go b/internal/orgcreator/service.go index 959df415b..8db43a141 100644 --- a/internal/orgcreator/service.go +++ b/internal/orgcreator/service.go @@ -124,8 +124,6 @@ func (s *service) CreateOrganization(ctx context.Context, opts OrganizationCreat return nil, err } - s.Publish(pubsub.NewCreatedEvent(org)) - s.V(0).Info("created organization", "id", org.ID, "name", org.Name, "subject", creator) return org, nil diff --git a/internal/pubsub/events.go b/internal/pubsub/events.go index 8d2ba0dca..449b09fa5 100644 --- a/internal/pubsub/events.go +++ b/internal/pubsub/events.go @@ -1,21 +1,11 @@ package pubsub const ( - EventOrganizationCreated EventType = "organization_created" - EventOrganizationDeleted EventType = "organization_deleted" - EventWorkspaceCreated EventType = "workspace_created" - EventWorkspaceRenamed EventType = "workspace_renamed" - EventWorkspaceDeleted EventType = "workspace_deleted" - EventRunCreated EventType = "run_created" - EventRunStatusUpdate EventType = "run_status_update" - EventRunDeleted EventType = "run_deleted" - EventRunCancel EventType = "run_cancel" - EventRunForceCancel EventType = "run_force_cancel" - EventError EventType = "error" - EventInfo EventType = "info" - EventLogChunk EventType = "log_update" - EventLogFinished EventType = "log_finished" - EventVCS EventType = "vcs_event" + EventError EventType = "error" + EventInfo EventType = "info" + EventLogChunk EventType = "log_update" + EventLogFinished EventType = "log_finished" + EventVCS EventType = "vcs_event" CreatedEvent EventType = "created" UpdatedEvent EventType = "updated" diff --git a/internal/run/client_test.go b/internal/run/client_test.go index eb800bcfd..d13e4724f 100644 --- a/internal/run/client_test.go +++ b/internal/run/client_test.go @@ -29,7 +29,7 @@ func TestWatchClient(t *testing.T) { ConfigurationVersion: &types.ConfigurationVersion{ID: "cv-123"}, }) require.NoError(t, err) - pubsub.WriteSSEEvent(w, b, pubsub.EventRunStatusUpdate, true) + pubsub.WriteSSEEvent(w, b, pubsub.UpdatedEvent, true) }) webserver := httptest.NewTLSServer(mux) @@ -50,5 +50,5 @@ func TestWatchClient(t *testing.T) { WorkspaceID: "ws-123", ConfigurationVersionID: "cv-123", } - assert.Equal(t, pubsub.Event{Type: pubsub.EventRunStatusUpdate, Payload: want}, <-got) + assert.Equal(t, pubsub.Event{Type: pubsub.UpdatedEvent, Payload: want}, <-got) } diff --git a/internal/run/run.go b/internal/run/run.go index 1219f6019..92309089f 100644 --- a/internal/run/run.go +++ b/internal/run/run.go @@ -215,9 +215,9 @@ func (r *Run) Discard() error { // Cancel run. Returns a boolean indicating whether a cancel request should be // enqueued (for an agent to kill an in progress process) -func (r *Run) Cancel() (enqueue bool, err error) { +func (r *Run) Cancel() error { if !r.Cancelable() { - return false, internal.ErrRunCancelNotAllowed + return internal.ErrRunCancelNotAllowed } // permit run to be force canceled after a cool off period of 10 seconds has // elapsed. @@ -235,13 +235,9 @@ func (r *Run) Cancel() (enqueue bool, err error) { r.Apply.UpdateStatus(PhaseCanceled) } - if r.Status == internal.RunPlanning || r.Status == internal.RunApplying { - enqueue = true - } - r.updateStatus(internal.RunCanceled) - return enqueue, nil + return nil } // ForceCancel force cancels a run. A cool-off period of 10 seconds must have diff --git a/internal/run/run_test.go b/internal/run/run_test.go index cd196b6a0..19b59769a 100644 --- a/internal/run/run_test.go +++ b/internal/run/run_test.go @@ -144,19 +144,9 @@ func TestRun_States(t *testing.T) { }) } -func TestRun_Cancel_Pending(t *testing.T) { +func TestRun_Cancel(t *testing.T) { run := newRun(&configversion.ConfigurationVersion{}, &workspace.Workspace{}, RunCreateOptions{}) - enqueue, err := run.Cancel() + err := run.Cancel() require.NoError(t, err) - assert.False(t, enqueue) - assert.NotZero(t, run.ForceCancelAvailableAt) -} - -func TestRun_Cancel_Planning(t *testing.T) { - run := newRun(&configversion.ConfigurationVersion{}, &workspace.Workspace{}, RunCreateOptions{}) - run.Status = internal.RunPlanning - enqueue, err := run.Cancel() - require.NoError(t, err) - assert.True(t, enqueue) assert.NotZero(t, run.ForceCancelAvailableAt) } diff --git a/internal/run/service.go b/internal/run/service.go index f5bf92b3b..cd6984eae 100644 --- a/internal/run/service.go +++ b/internal/run/service.go @@ -160,8 +160,6 @@ func (s *service) CreateRun(ctx context.Context, workspaceID string, opts RunCre } s.V(1).Info("created run", "id", run.ID, "workspace_id", run.WorkspaceID, "subject", subject) - s.Publish(pubsub.NewCreatedEvent(run)) - return run, nil } @@ -247,8 +245,6 @@ func (s *service) EnqueuePlan(ctx context.Context, runID string) (*Run, error) { } s.V(0).Info("enqueued plan", "id", runID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(run)) - return run, nil } @@ -268,7 +264,6 @@ func (s *service) Delete(ctx context.Context, runID string) error { return err } s.V(0).Info("deleted run", "id", runID, "subject", subject) - s.Publish(pubsub.Event{Type: pubsub.EventRunDeleted, Payload: run}) return nil } @@ -298,7 +293,6 @@ func (s *service) StartPhase(ctx context.Context, runID string, phase internal.P return nil, err } s.V(0).Info("started "+string(phase), "id", runID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(run)) return run, nil } @@ -327,7 +321,6 @@ func (s *service) FinishPhase(ctx context.Context, runID string, phase internal. return nil, err } s.V(0).Info("finished "+string(phase), "id", runID, "resource_changes", resourceReport, "output_changes", outputReport, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(run)) return run, nil } @@ -389,7 +382,7 @@ func (s *service) Apply(ctx context.Context, runID string) error { if err != nil { return err } - run, err := s.db.UpdateStatus(ctx, runID, func(run *Run) error { + _, err = s.db.UpdateStatus(ctx, runID, func(run *Run) error { return run.EnqueueApply() }) if err != nil { @@ -399,7 +392,6 @@ func (s *service) Apply(ctx context.Context, runID string) error { s.V(0).Info("enqueued apply", "id", runID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(run)) return err } @@ -410,7 +402,7 @@ func (s *service) DiscardRun(ctx context.Context, runID string) error { return err } - run, err := s.db.UpdateStatus(ctx, runID, func(run *Run) error { + _, err = s.db.UpdateStatus(ctx, runID, func(run *Run) error { return run.Discard() }) if err != nil { @@ -420,7 +412,6 @@ func (s *service) DiscardRun(ctx context.Context, runID string) error { s.V(0).Info("discarded run", "id", runID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(run)) return err } @@ -432,21 +423,14 @@ func (s *service) Cancel(ctx context.Context, runID string) (*Run, error) { return nil, err } - var enqueue bool run, err := s.db.UpdateStatus(ctx, runID, func(run *Run) (err error) { - enqueue, err = run.Cancel() - return err + return run.Cancel() }) if err != nil { s.Error(err, "canceling run", "id", runID, "subject", subject) return nil, err } s.V(0).Info("canceled run", "id", runID, "subject", subject) - if enqueue { - // notify agent which'll send a SIGINT to terraform - s.Publish(pubsub.Event{Type: pubsub.EventRunCancel, Payload: run}) - } - s.Publish(pubsub.NewUpdatedEvent(run)) return run, nil } @@ -456,7 +440,7 @@ func (s *service) ForceCancelRun(ctx context.Context, runID string) error { if err != nil { return err } - run, err := s.db.UpdateStatus(ctx, runID, func(run *Run) error { + _, err = s.db.UpdateStatus(ctx, runID, func(run *Run) error { return run.ForceCancel() }) if err != nil { @@ -465,9 +449,6 @@ func (s *service) ForceCancelRun(ctx context.Context, runID string) error { } s.V(0).Info("force canceled run", "id", runID, "subject", subject) - // notify agent which'll send a SIGKILL to terraform - s.Publish(pubsub.Event{Type: pubsub.EventRunForceCancel, Payload: run}) - return err } diff --git a/internal/run/service_test.go b/internal/run/service_test.go index 06af8e172..f8a04a81d 100644 --- a/internal/run/service_test.go +++ b/internal/run/service_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestService(t *testing.T) { +func TestService_Watch(t *testing.T) { // input event channel in := make(chan pubsub.Event, 1) @@ -24,7 +24,6 @@ func TestService(t *testing.T) { // inject input event want := pubsub.Event{ Payload: &Run{}, - Type: pubsub.EventRunCreated, } in <- want diff --git a/internal/scheduler/queue.go b/internal/scheduler/queue.go index a6970c123..6a1bb3400 100644 --- a/internal/scheduler/queue.go +++ b/internal/scheduler/queue.go @@ -50,11 +50,11 @@ func (q *queue) handleEvent(ctx context.Context, event pubsub.Event) error { switch payload := event.Payload.(type) { case *workspace.Workspace: q.ws = payload - if event.Type == workspace.EventUnlocked { - if q.current != nil { - if err := q.scheduleRun(ctx, q.current); err != nil { - return err - } + // workspace state has changed; pessimistically schedule the current run + // in case the workspace has been unlocked. + if q.current != nil { + if err := q.scheduleRun(ctx, q.current); err != nil { + return err } } case *run.Run: diff --git a/internal/scheduler/queue_test.go b/internal/scheduler/queue_test.go index db53788ff..2edf09177 100644 --- a/internal/scheduler/queue_test.go +++ b/internal/scheduler/queue_test.go @@ -49,7 +49,7 @@ func TestQueue(t *testing.T) { assert.True(t, q.ws.Locked()) // cancel run2, check it is removed from queue and run3 is shuffled forward - _, err = run2.Cancel() + err = run2.Cancel() require.NoError(t, err) err = q.handleEvent(ctx, pubsub.Event{Payload: run2}) require.NoError(t, err) @@ -59,7 +59,7 @@ func TestQueue(t *testing.T) { assert.True(t, q.ws.Locked()) // cancel run1; check run3 takes its place as current run - _, err = run1.Cancel() + err = run1.Cancel() require.NoError(t, err) err = q.handleEvent(ctx, pubsub.Event{Payload: run1}) require.NoError(t, err) @@ -68,7 +68,7 @@ func TestQueue(t *testing.T) { assert.True(t, q.ws.Locked()) // cancel run3; check everything is empty and workspace is unlocked - _, err = run3.Cancel() + err = run3.Cancel() require.NoError(t, err) err = q.handleEvent(ctx, pubsub.Event{Payload: run3}) require.NoError(t, err) @@ -109,7 +109,7 @@ func TestQueue(t *testing.T) { // user unlocks workspace; run should be scheduled, locking the workspace err = ws.Unlock("bobby", workspace.UserLock, false) require.NoError(t, err) - err = q.handleEvent(ctx, pubsub.Event{Type: workspace.EventUnlocked, Payload: ws}) + err = q.handleEvent(ctx, pubsub.Event{Payload: ws}) require.NoError(t, err) assert.Equal(t, run.ID, q.current.ID) assert.Equal(t, workspace.RunLock, q.ws.Lock.LockKind) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index e474cc6b9..eeb2b6f9d 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -96,7 +96,6 @@ func (s *scheduler) Start(ctx context.Context) error { go func() { for _, ws := range workspaces { queue <- pubsub.Event{ - Type: pubsub.EventWorkspaceCreated, Payload: ws, } } @@ -104,7 +103,6 @@ func (s *scheduler) Start(ctx context.Context) error { // whereas we want oldest first. for i := len(runs) - 1; i >= 0; i-- { queue <- pubsub.Event{ - Type: pubsub.EventRunStatusUpdate, Payload: runs[i], } } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 85dfdc676..3d58af7b3 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -21,7 +21,7 @@ func TestScheduler(t *testing.T) { scheduler, got := newTestScheduler([]*workspace.Workspace{ws}, nil) go scheduler.Start(ctx) - assert.Equal(t, pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws}, <-got) + assert.Equal(t, pubsub.Event{Payload: ws}, <-got) assert.Equal(t, 1, len(scheduler.queues)) }) @@ -30,7 +30,7 @@ func TestScheduler(t *testing.T) { defer cancel() ws := &workspace.Workspace{ID: "ws-123"} - event := pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws} + event := pubsub.Event{Payload: ws} scheduler, got := newTestScheduler(nil, nil, event) go scheduler.Start(ctx) assert.Equal(t, event, <-got) @@ -49,7 +49,7 @@ func TestScheduler(t *testing.T) { scheduler, got := newTestScheduler([]*workspace.Workspace{ws}, nil, del, sync) go scheduler.Start(ctx) - assert.Equal(t, pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws}, <-got) + assert.Equal(t, pubsub.Event{Payload: ws}, <-got) assert.Equal(t, sync, <-got) assert.NotContains(t, scheduler.queues, ws) }) @@ -63,8 +63,8 @@ func TestScheduler(t *testing.T) { scheduler, got := newTestScheduler([]*workspace.Workspace{ws}, []*run.Run{r}) go scheduler.Start(ctx) - assert.Equal(t, pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws}, <-got) - assert.Equal(t, pubsub.Event{Type: pubsub.EventRunStatusUpdate, Payload: r}, <-got) + assert.Equal(t, pubsub.Event{Payload: ws}, <-got) + assert.Equal(t, pubsub.Event{Payload: r}, <-got) }) t.Run("relay run from event", func(t *testing.T) { @@ -76,7 +76,7 @@ func TestScheduler(t *testing.T) { scheduler, got := newTestScheduler([]*workspace.Workspace{ws}, nil, event) go scheduler.Start(ctx) - assert.Equal(t, pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws}, <-got) + assert.Equal(t, pubsub.Event{Payload: ws}, <-got) assert.Equal(t, event, <-got) }) @@ -90,8 +90,8 @@ func TestScheduler(t *testing.T) { scheduler, got := newTestScheduler([]*workspace.Workspace{ws}, []*run.Run{run1, run2}) go scheduler.Start(ctx) - assert.Equal(t, pubsub.Event{Type: pubsub.EventWorkspaceCreated, Payload: ws}, <-got) - assert.Equal(t, pubsub.Event{Type: pubsub.EventRunStatusUpdate, Payload: run2}, <-got) - assert.Equal(t, pubsub.Event{Type: pubsub.EventRunStatusUpdate, Payload: run1}, <-got) + assert.Equal(t, pubsub.Event{Payload: ws}, <-got) + assert.Equal(t, pubsub.Event{Payload: run2}, <-got) + assert.Equal(t, pubsub.Event{Payload: run1}, <-got) }) } diff --git a/internal/slices.go b/internal/slices.go new file mode 100644 index 000000000..f591497f1 --- /dev/null +++ b/internal/slices.go @@ -0,0 +1,11 @@ +package internal + +// SafeAppend appends strings to a slice whilst ensuring the slice is +// not modified. +// +// see: https://yourbasic.org/golang/gotcha-append/ +func SafeAppend(a []string, b ...string) []string { + dst := make([]string, len(a)+len(b)) + copy(dst, a) + return append(dst, b...) +} diff --git a/internal/workspace/lock.go b/internal/workspace/lock.go index 4c46c1a08..7016ff82f 100644 --- a/internal/workspace/lock.go +++ b/internal/workspace/lock.go @@ -3,7 +3,6 @@ package workspace import ( "github.com/leg100/otf/internal" "github.com/leg100/otf/internal/http/html/paths" - "github.com/leg100/otf/internal/pubsub" "github.com/leg100/otf/internal/rbac" ) @@ -12,11 +11,6 @@ const ( RunLock ) -var ( - EventLocked pubsub.EventType = "workspace_locked" - EventUnlocked pubsub.EventType = "workspace_unlocked" -) - type ( // Lock is a workspace Lock, which blocks runs from running and prevents state from being // uploaded. diff --git a/internal/workspace/service.go b/internal/workspace/service.go index 0ec1b5d60..f8cf75230 100644 --- a/internal/workspace/service.go +++ b/internal/workspace/service.go @@ -148,8 +148,6 @@ func (s *service) CreateWorkspace(ctx context.Context, opts CreateOptions) (*Wor s.V(0).Info("created workspace", "id", ws.ID, "name", ws.Name, "organization", ws.Organization, "subject", subject) - s.Publish(pubsub.NewCreatedEvent(ws)) - return ws, nil } @@ -243,8 +241,6 @@ func (s *service) UpdateWorkspace(ctx context.Context, workspaceID string, opts s.V(0).Info("updated workspace", "workspace", workspaceID, "subject", subject) - s.Publish(pubsub.NewUpdatedEvent(updated)) - return updated, nil } @@ -273,8 +269,6 @@ func (s *service) DeleteWorkspace(ctx context.Context, workspaceID string) (*Wor return nil, err } - s.Publish(pubsub.NewDeletedEvent(ws)) - s.V(0).Info("deleted workspace", "id", ws.ID, "name", ws.Name, "subject", subject) return ws, nil