From ff1094fa0417343a2394a8787eb5dca59c5b1a19 Mon Sep 17 00:00:00 2001 From: Aiden Scandella Date: Wed, 22 Jun 2022 09:29:41 -0700 Subject: [PATCH] feat: stream output for custom workflows (#2261) * Start threading job output to RunStepRunner * Strip ANSI * Fix lint * Use waitgroup to avoid test flakiness * Move waitgroup higher * Add ANSI test and use strings.Builder * Fix lint * Use errors.Wrap per style guide * Create ShellCommandRunner to encapsulate streaming * WIP: shell command runner * Update signatures to propagate error finding version * Fix log output * Fix error checking * Fix accidental whitespace stripping * Remove unused struct field * Fix error checking in terraform client * Add unit tests to verify command output handler was called * Remove err from async interface * Remove duplicative log now that shell command runner does it * Hide output in stream for env/multienv * Add comment explaining goroutines * Use printf for better macOS compatibility --- .../events/events_controller_e2e_test.go | 5 +- server/core/runtime/apply_step_runner_test.go | 10 +- server/core/runtime/env_step_runner.go | 4 +- server/core/runtime/env_step_runner_test.go | 7 +- .../runtime/models/shell_command_runner.go | 161 ++++++++++++++++++ .../models/shell_command_runner_test.go | 75 ++++++++ server/core/runtime/multienv_step_runner.go | 2 +- .../core/runtime/multienv_step_runner_test.go | 7 +- server/core/runtime/plan_step_runner_test.go | 8 +- server/core/runtime/run_step_runner.go | 21 ++- server/core/runtime/run_step_runner_test.go | 19 ++- server/core/runtime/runtime.go | 4 +- .../terraform/mocks/mock_terraform_client.go | 30 +--- server/core/terraform/terraform_client.go | 140 ++++----------- .../terraform_client_internal_test.go | 3 +- .../events/mocks/mock_custom_step_runner.go | 20 ++- server/events/project_command_runner.go | 4 +- server/events/project_command_runner_test.go | 15 +- server/server.go | 7 +- 19 files changed, 360 insertions(+), 182 deletions(-) create mode 100644 server/core/runtime/models/shell_command_runner.go create mode 100644 server/core/runtime/models/shell_command_runner_test.go diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index 563a0921d9..9f78fddc15 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -975,8 +975,9 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl TerraformExecutor: terraformClient, }, RunStepRunner: &runtime.RunStepRunner{ - TerraformExecutor: terraformClient, - DefaultTFVersion: defaultTFVersion, + TerraformExecutor: terraformClient, + DefaultTFVersion: defaultTFVersion, + ProjectCmdOutputHandler: projectCmdOutputHandler, }, WorkingDir: workingDir, Webhooks: &mockWebhookSender{}, diff --git a/server/core/runtime/apply_step_runner_test.go b/server/core/runtime/apply_step_runner_test.go index eebd8ffe3b..23a4bbc6a3 100644 --- a/server/core/runtime/apply_step_runner_test.go +++ b/server/core/runtime/apply_step_runner_test.go @@ -13,7 +13,7 @@ import ( . "github.com/petergtz/pegomock" "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/core/runtime" - "github.com/runatlantis/atlantis/server/core/terraform" + runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/core/terraform/mocks" matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers" "github.com/runatlantis/atlantis/server/events/command" @@ -371,11 +371,11 @@ type remoteApplyMock struct { } // RunCommandAsync fakes out running terraform async. -func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) { +func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) { r.CalledArgs = args in := make(chan string) - out := make(chan terraform.Line) + out := make(chan runtimemodels.Line) // We use a wait group to ensure our sending and receiving routines have // completed. @@ -398,10 +398,10 @@ func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path strin // Asynchronously send the lines we're supposed to. go func() { for _, line := range strings.Split(r.LinesToSend, "\n") { - out <- terraform.Line{Line: line} + out <- runtimemodels.Line{Line: line} } if r.Err != nil { - out <- terraform.Line{Err: r.Err} + out <- runtimemodels.Line{Err: r.Err} } close(out) wg.Done() diff --git a/server/core/runtime/env_step_runner.go b/server/core/runtime/env_step_runner.go index ae7c27dd5d..6eced91ad1 100644 --- a/server/core/runtime/env_step_runner.go +++ b/server/core/runtime/env_step_runner.go @@ -18,7 +18,9 @@ func (r *EnvStepRunner) Run(ctx command.ProjectContext, command string, value st if value != "" { return value, nil } - res, err := r.RunStepRunner.Run(ctx, command, path, envs) + // Pass `false` for streamOutput because this isn't interesting to the user reading the build logs + // in the web UI. + res, err := r.RunStepRunner.Run(ctx, command, path, envs, false) // Trim newline from res to support running `echo env_value` which has // a newline. We don't recommend users run echo -n env_value to remove the // newline because -n doesn't work in the sh shell which is what we use diff --git a/server/core/runtime/env_step_runner_test.go b/server/core/runtime/env_step_runner_test.go index 0084ca97b8..1bf1d872ba 100644 --- a/server/core/runtime/env_step_runner_test.go +++ b/server/core/runtime/env_step_runner_test.go @@ -8,6 +8,7 @@ import ( "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/models" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/petergtz/pegomock" @@ -40,9 +41,11 @@ func TestEnvStepRunner_Run(t *testing.T) { tfClient := mocks.NewMockClient() tfVersion, err := version.NewVersion("0.12.0") Ok(t, err) + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() runStepRunner := runtime.RunStepRunner{ - TerraformExecutor: tfClient, - DefaultTFVersion: tfVersion, + TerraformExecutor: tfClient, + DefaultTFVersion: tfVersion, + ProjectCmdOutputHandler: projectCmdOutputHandler, } envRunner := runtime.EnvStepRunner{ RunStepRunner: &runStepRunner, diff --git a/server/core/runtime/models/shell_command_runner.go b/server/core/runtime/models/shell_command_runner.go new file mode 100644 index 0000000000..c30f97378d --- /dev/null +++ b/server/core/runtime/models/shell_command_runner.go @@ -0,0 +1,161 @@ +package models + +import ( + "bufio" + "io" + "os/exec" + "strings" + "sync" + + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/events/command" + "github.com/runatlantis/atlantis/server/events/terraform/ansi" + "github.com/runatlantis/atlantis/server/jobs" +) + +// Setting the buffer size to 10mb +const BufioScannerBufferSize = 10 * 1024 * 1024 + +// Line represents a line that was output from a shell command. +type Line struct { + // Line is the contents of the line (without the newline). + Line string + // Err is set if there was an error. + Err error +} + +// ShellCommandRunner runs a command via `exec.Command` and streams output to the +// `ProjectCommandOutputHandler`. +type ShellCommandRunner struct { + command string + workingDir string + outputHandler jobs.ProjectCommandOutputHandler + streamOutput bool + cmd *exec.Cmd +} + +func NewShellCommandRunner(command string, environ []string, workingDir string, streamOutput bool, outputHandler jobs.ProjectCommandOutputHandler) *ShellCommandRunner { + cmd := exec.Command("sh", "-c", command) // #nosec + cmd.Env = environ + cmd.Dir = workingDir + + return &ShellCommandRunner{ + command: command, + workingDir: workingDir, + outputHandler: outputHandler, + streamOutput: streamOutput, + cmd: cmd, + } +} + +func (s *ShellCommandRunner) Run(ctx command.ProjectContext) (string, error) { + _, outCh := s.RunCommandAsync(ctx) + + outbuf := new(strings.Builder) + var err error + for line := range outCh { + if line.Err != nil { + err = line.Err + break + } + outbuf.WriteString(line.Line) + outbuf.WriteString("\n") + } + + // sanitize output by stripping out any ansi characters. + output := ansi.Strip(outbuf.String()) + return output, err +} + +// RunCommandAsync runs terraform with args. It immediately returns an +// input and output channel. Callers can use the output channel to +// get the realtime output from the command. +// Callers can use the input channel to pass stdin input to the command. +// If any error is passed on the out channel, there will be no +// further output (so callers are free to exit). +func (s *ShellCommandRunner) RunCommandAsync(ctx command.ProjectContext) (chan<- string, <-chan Line) { + outCh := make(chan Line) + inCh := make(chan string) + + // We start a goroutine to do our work asynchronously and then immediately + // return our channels. + go func() { + // Ensure we close our channels when we exit. + defer func() { + close(outCh) + close(inCh) + }() + + stdout, _ := s.cmd.StdoutPipe() + stderr, _ := s.cmd.StderrPipe() + stdin, _ := s.cmd.StdinPipe() + + ctx.Log.Debug("starting %q in %q", s.command, s.workingDir) + err := s.cmd.Start() + if err != nil { + err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir) + ctx.Log.Err(err.Error()) + outCh <- Line{Err: err} + return + } + + // If we get anything on inCh, write it to stdin. + // This function will exit when inCh is closed which we do in our defer. + go func() { + for line := range inCh { + ctx.Log.Debug("writing %q to remote command's stdin", line) + _, err := io.WriteString(stdin, line) + if err != nil { + ctx.Log.Err(errors.Wrapf(err, "writing %q to process", line).Error()) + } + } + }() + + wg := new(sync.WaitGroup) + wg.Add(2) + // Asynchronously copy from stdout/err to outCh. + go func() { + scanner := bufio.NewScanner(stdout) + buf := []byte{} + scanner.Buffer(buf, BufioScannerBufferSize) + + for scanner.Scan() { + message := scanner.Text() + outCh <- Line{Line: message} + if s.streamOutput { + s.outputHandler.Send(ctx, message, false) + } + } + wg.Done() + }() + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + message := scanner.Text() + outCh <- Line{Line: message} + if s.streamOutput { + s.outputHandler.Send(ctx, message, false) + } + } + wg.Done() + }() + + // Wait for our copying to complete. This *must* be done before + // calling cmd.Wait(). (see https://github.com/golang/go/issues/19685) + wg.Wait() + + // Wait for the command to complete. + err = s.cmd.Wait() + + // We're done now. Send an error if there was one. + if err != nil { + err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir) + ctx.Log.Err(err.Error()) + outCh <- Line{Err: err} + } else { + ctx.Log.Info("successfully ran %q in %q", s.command, s.workingDir) + } + }() + + return inCh, outCh +} diff --git a/server/core/runtime/models/shell_command_runner_test.go b/server/core/runtime/models/shell_command_runner_test.go new file mode 100644 index 0000000000..84054b838a --- /dev/null +++ b/server/core/runtime/models/shell_command_runner_test.go @@ -0,0 +1,75 @@ +package models_test + +import ( + "fmt" + "os" + "strings" + "testing" + + . "github.com/petergtz/pegomock" + "github.com/runatlantis/atlantis/server/core/runtime/mocks/matchers" + "github.com/runatlantis/atlantis/server/core/runtime/models" + "github.com/runatlantis/atlantis/server/events/command" + "github.com/runatlantis/atlantis/server/jobs/mocks" + "github.com/runatlantis/atlantis/server/logging" + . "github.com/runatlantis/atlantis/testing" +) + +func TestShellCommandRunner_Run(t *testing.T) { + cases := []struct { + Command string + ExpLines []string + Environ map[string]string + }{ + { + Command: "echo $HELLO", + Environ: map[string]string{ + "HELLO": "world", + }, + ExpLines: []string{"world"}, + }, + { + Command: ">&2 echo this is an error", + ExpLines: []string{"this is an error"}, + }, + } + + for _, c := range cases { + t.Run(c.Command, func(t *testing.T) { + RegisterMockTestingT(t) + ctx := command.ProjectContext{ + Log: logging.NewNoopLogger(t), + Workspace: "default", + RepoRelDir: ".", + } + projectCmdOutputHandler := mocks.NewMockProjectCommandOutputHandler() + + cwd, err := os.Getwd() + Ok(t, err) + environ := []string{} + for k, v := range c.Environ { + environ = append(environ, fmt.Sprintf("%s=%s", k, v)) + } + expectedOutput := fmt.Sprintf("%s\n", strings.Join(c.ExpLines, "\n")) + + // Run once with streaming enabled + runner := models.NewShellCommandRunner(c.Command, environ, cwd, true, projectCmdOutputHandler) + output, err := runner.Run(ctx) + Ok(t, err) + Equals(t, expectedOutput, output) + for _, line := range c.ExpLines { + projectCmdOutputHandler.VerifyWasCalledOnce().Send(ctx, line, false) + } + + // And again with streaming disabled. Everything should be the same except the + // command output handler should not have received anything + + projectCmdOutputHandler = mocks.NewMockProjectCommandOutputHandler() + runner = models.NewShellCommandRunner(c.Command, environ, cwd, false, projectCmdOutputHandler) + output, err = runner.Run(ctx) + Ok(t, err) + Equals(t, expectedOutput, output) + projectCmdOutputHandler.VerifyWasCalled(Never()).Send(matchers.AnyModelsProjectCommandContext(), AnyString(), EqBool(false)) + }) + } +} diff --git a/server/core/runtime/multienv_step_runner.go b/server/core/runtime/multienv_step_runner.go index 69a9a2028a..fd659a4de4 100644 --- a/server/core/runtime/multienv_step_runner.go +++ b/server/core/runtime/multienv_step_runner.go @@ -15,7 +15,7 @@ type MultiEnvStepRunner struct { // Run runs the multienv step command. // The command must return a json string containing the array of name-value pairs that are being added as extra environment variables func (r *MultiEnvStepRunner) Run(ctx command.ProjectContext, command string, path string, envs map[string]string) (string, error) { - res, err := r.RunStepRunner.Run(ctx, command, path, envs) + res, err := r.RunStepRunner.Run(ctx, command, path, envs, false) if err == nil { envVars := strings.Split(res, ",") if len(envVars) > 0 { diff --git a/server/core/runtime/multienv_step_runner_test.go b/server/core/runtime/multienv_step_runner_test.go index 7628ea95ea..72ce92352c 100644 --- a/server/core/runtime/multienv_step_runner_test.go +++ b/server/core/runtime/multienv_step_runner_test.go @@ -9,6 +9,7 @@ import ( "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/models" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -31,9 +32,11 @@ func TestMultiEnvStepRunner_Run(t *testing.T) { tfClient := mocks.NewMockClient() tfVersion, err := version.NewVersion("0.12.0") Ok(t, err) + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() runStepRunner := runtime.RunStepRunner{ - TerraformExecutor: tfClient, - DefaultTFVersion: tfVersion, + TerraformExecutor: tfClient, + DefaultTFVersion: tfVersion, + ProjectCmdOutputHandler: projectCmdOutputHandler, } multiEnvStepRunner := runtime.MultiEnvStepRunner{ RunStepRunner: &runStepRunner, diff --git a/server/core/runtime/plan_step_runner_test.go b/server/core/runtime/plan_step_runner_test.go index 3db3514499..e8fb35f538 100644 --- a/server/core/runtime/plan_step_runner_test.go +++ b/server/core/runtime/plan_step_runner_test.go @@ -8,13 +8,13 @@ import ( "testing" "github.com/hashicorp/go-version" - "github.com/runatlantis/atlantis/server/core/terraform" "github.com/runatlantis/atlantis/server/events/command" mocks2 "github.com/runatlantis/atlantis/server/events/mocks" . "github.com/petergtz/pegomock" "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/core/runtime" + runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/core/terraform/mocks" matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers" "github.com/runatlantis/atlantis/server/events/mocks/matchers" @@ -885,13 +885,13 @@ type remotePlanMock struct { CalledArgs []string } -func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) { +func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) { r.CalledArgs = args in := make(chan string) - out := make(chan terraform.Line) + out := make(chan runtimemodels.Line) go func() { for _, line := range strings.Split(r.LinesToSend, "\n") { - out <- terraform.Line{Line: line} + out <- runtimemodels.Line{Line: line} } close(out) close(in) diff --git a/server/core/runtime/run_step_runner.go b/server/core/runtime/run_step_runner.go index 6a7673bf3e..c0a4d53efc 100644 --- a/server/core/runtime/run_step_runner.go +++ b/server/core/runtime/run_step_runner.go @@ -3,12 +3,13 @@ package runtime import ( "fmt" "os" - "os/exec" "path/filepath" "strings" "github.com/hashicorp/go-version" + "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/events/command" + "github.com/runatlantis/atlantis/server/jobs" ) // RunStepRunner runs custom commands. @@ -16,10 +17,11 @@ type RunStepRunner struct { TerraformExecutor TerraformExec DefaultTFVersion *version.Version // TerraformBinDir is the directory where Atlantis downloads Terraform binaries. - TerraformBinDir string + TerraformBinDir string + ProjectCmdOutputHandler jobs.ProjectCommandOutputHandler } -func (r *RunStepRunner) Run(ctx command.ProjectContext, command string, path string, envs map[string]string) (string, error) { +func (r *RunStepRunner) Run(ctx command.ProjectContext, command string, path string, envs map[string]string, streamOutput bool) (string, error) { tfVersion := r.DefaultTFVersion if ctx.TerraformVersion != nil { tfVersion = ctx.TerraformVersion @@ -32,9 +34,6 @@ func (r *RunStepRunner) Run(ctx command.ProjectContext, command string, path str return "", err } - cmd := exec.Command("sh", "-c", command) // #nosec - cmd.Dir = path - baseEnvVars := os.Environ() customEnvVars := map[string]string{ "ATLANTIS_TERRAFORM_VERSION": tfVersion.String(), @@ -65,14 +64,14 @@ func (r *RunStepRunner) Run(ctx command.ProjectContext, command string, path str for key, val := range envs { finalEnvVars = append(finalEnvVars, fmt.Sprintf("%s=%s", key, val)) } - cmd.Env = finalEnvVars - out, err := cmd.CombinedOutput() + + runner := models.NewShellCommandRunner(command, finalEnvVars, path, streamOutput, r.ProjectCmdOutputHandler) + output, err := runner.Run(ctx) if err != nil { - err = fmt.Errorf("%s: running %q in %q: \n%s", err, command, path, out) + err = fmt.Errorf("%s: running %q in %q: \n%s", err, command, path, output) ctx.Log.Debug("error: %s", err) return "", err } - ctx.Log.Info("successfully ran %q in %q", command, path) - return string(out), nil + return output, nil } diff --git a/server/core/runtime/run_step_runner_test.go b/server/core/runtime/run_step_runner_test.go index 8536a331ea..d6e235dcfc 100644 --- a/server/core/runtime/run_step_runner_test.go +++ b/server/core/runtime/run_step_runner_test.go @@ -14,6 +14,7 @@ import ( "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/mocks/matchers" "github.com/runatlantis/atlantis/server/events/models" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -38,11 +39,15 @@ func TestRunStepRunner_Run(t *testing.T) { }, { Command: `printf \'your main.tf file does not provide default region.\\ncheck\'`, - ExpOut: `'your`, + ExpOut: "'your\n", }, { Command: `printf 'your main.tf file does not provide default region.\ncheck'`, - ExpOut: "your main.tf file does not provide default region.\ncheck", + ExpOut: "your main.tf file does not provide default region.\ncheck\n", + }, + { + Command: `printf '\e[32mgreen'`, + ExpOut: "green\n", }, { Command: "echo 'a", @@ -104,11 +109,13 @@ func TestRunStepRunner_Run(t *testing.T) { ThenReturn(nil) logger := logging.NewNoopLogger(t) + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() r := runtime.RunStepRunner{ - TerraformExecutor: terraform, - DefaultTFVersion: defaultVersion, - TerraformBinDir: "/bin/dir", + TerraformExecutor: terraform, + DefaultTFVersion: defaultVersion, + TerraformBinDir: "/bin/dir", + ProjectCmdOutputHandler: projectCmdOutputHandler, } t.Run(c.Command, func(t *testing.T) { tmpDir, cleanup := TempDir(t) @@ -139,7 +146,7 @@ func TestRunStepRunner_Run(t *testing.T) { ProjectName: c.ProjectName, EscapedCommentArgs: []string{"-target=resource1", "-target=resource2"}, } - out, err := r.Run(ctx, c.Command, tmpDir, map[string]string{"test": "var"}) + out, err := r.Run(ctx, c.Command, tmpDir, map[string]string{"test": "var"}, true) if c.ExpErr != "" { ErrContains(t, c.ExpErr, err) return diff --git a/server/core/runtime/runtime.go b/server/core/runtime/runtime.go index 7bd575308d..c82b9bc8f9 100644 --- a/server/core/runtime/runtime.go +++ b/server/core/runtime/runtime.go @@ -10,7 +10,7 @@ import ( version "github.com/hashicorp/go-version" "github.com/pkg/errors" - "github.com/runatlantis/atlantis/server/core/terraform" + runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" @@ -41,7 +41,7 @@ type AsyncTFExec interface { // Callers can use the input channel to pass stdin input to the command. // If any error is passed on the out channel, there will be no // further output (so callers are free to exit). - RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) + RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) } // StatusUpdater brings the interface from CommitStatusUpdater into this package diff --git a/server/core/terraform/mocks/mock_terraform_client.go b/server/core/terraform/mocks/mock_terraform_client.go index 76ccc3da48..746f39a0c7 100644 --- a/server/core/terraform/mocks/mock_terraform_client.go +++ b/server/core/terraform/mocks/mock_terraform_client.go @@ -4,14 +4,12 @@ package mocks import ( - "reflect" - "time" - go_version "github.com/hashicorp/go-version" pegomock "github.com/petergtz/pegomock" - "github.com/runatlantis/atlantis/server/core/terraform" - "github.com/runatlantis/atlantis/server/events/command" + command "github.com/runatlantis/atlantis/server/events/command" logging "github.com/runatlantis/atlantis/server/logging" + "reflect" + "time" ) type MockClient struct { @@ -48,16 +46,6 @@ func (mock *MockClient) RunCommandWithVersion(ctx command.ProjectContext, path s return ret0, ret1 } -func (mock *MockClient) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *go_version.Version, workspace string) (chan<- string, <-chan terraform.Line) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockClient().") - } - outCh := make(chan terraform.Line) - inCh := make(chan string) - - return inCh, outCh -} - func (mock *MockClient) EnsureVersion(log logging.SimpleLogging, v *go_version.Version) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockClient().") @@ -121,17 +109,17 @@ type MockClient_RunCommandWithVersion_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockClient_RunCommandWithVersion_OngoingVerification) GetCapturedArguments() (logging.SimpleLogging, string, []string, map[string]string, *go_version.Version, string) { - log, path, args, envs, v, workspace := c.GetAllCapturedArguments() - return log[len(log)-1], path[len(path)-1], args[len(args)-1], envs[len(envs)-1], v[len(v)-1], workspace[len(workspace)-1] +func (c *MockClient_RunCommandWithVersion_OngoingVerification) GetCapturedArguments() (command.ProjectContext, string, []string, map[string]string, *go_version.Version, string) { + ctx, path, args, envs, v, workspace := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], path[len(path)-1], args[len(args)-1], envs[len(envs)-1], v[len(v)-1], workspace[len(workspace)-1] } -func (c *MockClient_RunCommandWithVersion_OngoingVerification) GetAllCapturedArguments() (_param0 []logging.SimpleLogging, _param1 []string, _param2 [][]string, _param3 []map[string]string, _param4 []*go_version.Version, _param5 []string) { +func (c *MockClient_RunCommandWithVersion_OngoingVerification) GetAllCapturedArguments() (_param0 []command.ProjectContext, _param1 []string, _param2 [][]string, _param3 []map[string]string, _param4 []*go_version.Version, _param5 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]logging.SimpleLogging, len(c.methodInvocations)) + _param0 = make([]command.ProjectContext, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(logging.SimpleLogging) + _param0[u] = param.(command.ProjectContext) } _param1 = make([]string, len(c.methodInvocations)) for u, param := range params[1] { diff --git a/server/core/terraform/terraform_client.go b/server/core/terraform/terraform_client.go index f6e5c92a42..b359b841e4 100644 --- a/server/core/terraform/terraform_client.go +++ b/server/core/terraform/terraform_client.go @@ -15,9 +15,7 @@ package terraform import ( - "bufio" "fmt" - "io" "os" "os/exec" "path/filepath" @@ -31,6 +29,7 @@ import ( "github.com/mitchellh/go-homedir" "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/terraform/ansi" "github.com/runatlantis/atlantis/server/jobs" @@ -297,7 +296,7 @@ func (c *DefaultClient) RunCommandWithVersion(ctx command.ProjectContext, path s output = ansi.Strip(output) return fmt.Sprintf("%s\n", output), err } - tfCmd, cmd, err := c.prepCmd(ctx.Log, v, workspace, path, args) + tfCmd, cmd, err := c.prepExecCmd(ctx.Log, v, workspace, path, args) if err != nil { return "", err } @@ -317,10 +316,23 @@ func (c *DefaultClient) RunCommandWithVersion(ctx command.ProjectContext, path s return ansi.Strip(string(out)), nil } -// prepCmd builds a ready to execute command based on the version of terraform +// prepExecCmd builds a ready to execute command based on the version of terraform // v, and args. It returns a printable representation of the command that will // be run and the actual command. -func (c *DefaultClient) prepCmd(log logging.SimpleLogging, v *version.Version, workspace string, path string, args []string) (string, *exec.Cmd, error) { +func (c *DefaultClient) prepExecCmd(log logging.SimpleLogging, v *version.Version, workspace string, path string, args []string) (string, *exec.Cmd, error) { + tfCmd, envVars, err := c.prepCmd(log, v, workspace, path, args) + if err != nil { + return "", nil, err + } + cmd := exec.Command("sh", "-c", tfCmd) + cmd.Dir = path + cmd.Env = envVars + return tfCmd, cmd, nil +} + +// prepCmd prepares a shell command (to be interpreted with `sh -c `) and set of environment +// variables for running terraform. +func (c *DefaultClient) prepCmd(log logging.SimpleLogging, v *version.Version, workspace string, path string, args []string) (string, []string, error) { if v == nil { v = c.defaultVersion } @@ -356,18 +368,7 @@ func (c *DefaultClient) prepCmd(log logging.SimpleLogging, v *version.Version, w // AWS_ACCESS_KEY. envVars = append(envVars, os.Environ()...) tfCmd := fmt.Sprintf("%s %s", binPath, strings.Join(args, " ")) - cmd := exec.Command("sh", "-c", tfCmd) - cmd.Dir = path - cmd.Env = envVars - return tfCmd, cmd, nil -} - -// Line represents a line that was output from a terraform command. -type Line struct { - // Line is the contents of the line (without the newline). - Line string - // Err is set if there was an error. - Err error + return tfCmd, envVars, nil } // RunCommandAsync runs terraform with args. It immediately returns an @@ -376,99 +377,28 @@ type Line struct { // Callers can use the input channel to pass stdin input to the command. // If any error is passed on the out channel, there will be no // further output (so callers are free to exit). -func (c *DefaultClient) RunCommandAsync(ctx command.ProjectContext, path string, args []string, customEnvVars map[string]string, v *version.Version, workspace string) (chan<- string, <-chan Line) { - outCh := make(chan Line) - inCh := make(chan string) - - // We start a goroutine to do our work asynchronously and then immediately - // return our channels. - go func() { - - // Ensure we close our channels when we exit. - defer func() { +func (c *DefaultClient) RunCommandAsync(ctx command.ProjectContext, path string, args []string, customEnvVars map[string]string, v *version.Version, workspace string) (chan<- string, <-chan models.Line) { + cmd, envVars, err := c.prepCmd(ctx.Log, v, workspace, path, args) + if err != nil { + // The signature of `RunCommandAsync` doesn't provide for returning an immediate error, only one + // once reading the output. Since we won't be spawning a process, simulate that by sending the + // errorcustomEnvVars to the output channel. + outCh := make(chan models.Line) + inCh := make(chan string) + go func() { + outCh <- models.Line{Err: err} close(outCh) close(inCh) }() + return inCh, outCh + } - tfCmd, cmd, err := c.prepCmd(ctx.Log, v, workspace, path, args) - if err != nil { - ctx.Log.Err(err.Error()) - outCh <- Line{Err: err} - return - } - stdout, _ := cmd.StdoutPipe() - stderr, _ := cmd.StderrPipe() - stdin, _ := cmd.StdinPipe() - envVars := cmd.Env - for key, val := range customEnvVars { - envVars = append(envVars, fmt.Sprintf("%s=%s", key, val)) - } - cmd.Env = envVars - - ctx.Log.Debug("starting %q in %q", tfCmd, path) - err = cmd.Start() - if err != nil { - err = errors.Wrapf(err, "running %q in %q", tfCmd, path) - ctx.Log.Err(err.Error()) - outCh <- Line{Err: err} - return - } - - // If we get anything on inCh, write it to stdin. - // This function will exit when inCh is closed which we do in our defer. - go func() { - for line := range inCh { - ctx.Log.Debug("writing %q to remote command's stdin", line) - _, err := io.WriteString(stdin, line) - if err != nil { - ctx.Log.Err(errors.Wrapf(err, "writing %q to process", line).Error()) - } - } - }() - - // Use a waitgroup to block until our stdout/err copying is complete. - wg := new(sync.WaitGroup) - wg.Add(2) - // Asynchronously copy from stdout/err to outCh. - go func() { - s := bufio.NewScanner(stdout) - buf := []byte{} - s.Buffer(buf, BufioScannerBufferSize) - - for s.Scan() { - message := s.Text() - outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message, false) - } - wg.Done() - }() - go func() { - s := bufio.NewScanner(stderr) - for s.Scan() { - message := s.Text() - outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message, false) - } - wg.Done() - }() - - // Wait for our copying to complete. This *must* be done before - // calling cmd.Wait(). (see https://github.com/golang/go/issues/19685) - wg.Wait() - - // Wait for the command to complete. - err = cmd.Wait() - - // We're done now. Send an error if there was one. - if err != nil { - err = errors.Wrapf(err, "running %q in %q", tfCmd, path) - ctx.Log.Err(err.Error()) - outCh <- Line{Err: err} - } else { - ctx.Log.Info("successfully ran %q in %q", tfCmd, path) - } - }() + for key, val := range customEnvVars { + envVars = append(envVars, fmt.Sprintf("%s=%s", key, val)) + } + runner := models.NewShellCommandRunner(cmd, envVars, path, true, c.projectCmdOutputHandler) + inCh, outCh := runner.RunCommandAsync(ctx) return inCh, outCh } diff --git a/server/core/terraform/terraform_client_internal_test.go b/server/core/terraform/terraform_client_internal_test.go index a99bc584bb..647ee48e06 100644 --- a/server/core/terraform/terraform_client_internal_test.go +++ b/server/core/terraform/terraform_client_internal_test.go @@ -8,6 +8,7 @@ import ( "testing" version "github.com/hashicorp/go-version" + runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/models" jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" @@ -381,7 +382,7 @@ func TestDefaultClient_RunCommandAsync_Input(t *testing.T) { Equals(t, "echo me", out) } -func waitCh(ch <-chan Line) (string, error) { +func waitCh(ch <-chan runtimemodels.Line) (string, error) { var ls []string for line := range ch { if line.Err != nil { diff --git a/server/events/mocks/mock_custom_step_runner.go b/server/events/mocks/mock_custom_step_runner.go index 6490660663..8554651132 100644 --- a/server/events/mocks/mock_custom_step_runner.go +++ b/server/events/mocks/mock_custom_step_runner.go @@ -26,11 +26,11 @@ func NewMockCustomStepRunner(options ...pegomock.Option) *MockCustomStepRunner { func (mock *MockCustomStepRunner) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockCustomStepRunner) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockCustomStepRunner) Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string) (string, error) { +func (mock *MockCustomStepRunner) Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string, streamOutput bool) (string, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockCustomStepRunner().") } - params := []pegomock.Param{ctx, cmd, path, envs} + params := []pegomock.Param{ctx, cmd, path, envs, streamOutput} result := pegomock.GetGenericMockFrom(mock).Invoke("Run", params, []reflect.Type{reflect.TypeOf((*string)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 string var ret1 error @@ -82,8 +82,8 @@ type VerifierMockCustomStepRunner struct { timeout time.Duration } -func (verifier *VerifierMockCustomStepRunner) Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string) *MockCustomStepRunner_Run_OngoingVerification { - params := []pegomock.Param{ctx, cmd, path, envs} +func (verifier *VerifierMockCustomStepRunner) Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string, streamOutput bool) *MockCustomStepRunner_Run_OngoingVerification { + params := []pegomock.Param{ctx, cmd, path, envs, streamOutput} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Run", params, verifier.timeout) return &MockCustomStepRunner_Run_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -93,12 +93,12 @@ type MockCustomStepRunner_Run_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockCustomStepRunner_Run_OngoingVerification) GetCapturedArguments() (command.ProjectContext, string, string, map[string]string) { - ctx, cmd, path, envs := c.GetAllCapturedArguments() - return ctx[len(ctx)-1], cmd[len(cmd)-1], path[len(path)-1], envs[len(envs)-1] +func (c *MockCustomStepRunner_Run_OngoingVerification) GetCapturedArguments() (command.ProjectContext, string, string, map[string]string, bool) { + ctx, cmd, path, envs, streamOutput := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], cmd[len(cmd)-1], path[len(path)-1], envs[len(envs)-1], streamOutput[len(streamOutput)-1] } -func (c *MockCustomStepRunner_Run_OngoingVerification) GetAllCapturedArguments() (_param0 []command.ProjectContext, _param1 []string, _param2 []string, _param3 []map[string]string) { +func (c *MockCustomStepRunner_Run_OngoingVerification) GetAllCapturedArguments() (_param0 []command.ProjectContext, _param1 []string, _param2 []string, _param3 []map[string]string, _param4 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]command.ProjectContext, len(c.methodInvocations)) @@ -117,6 +117,10 @@ func (c *MockCustomStepRunner_Run_OngoingVerification) GetAllCapturedArguments() for u, param := range params[3] { _param3[u] = param.(map[string]string) } + _param4 = make([]bool, len(c.methodInvocations)) + for u, param := range params[4] { + _param4[u] = param.(bool) + } } return } diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index a52c9e2cdb..5b5c7976f0 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -62,7 +62,7 @@ type StepRunner interface { // CustomStepRunner runs custom run steps. type CustomStepRunner interface { // Run cmd in path. - Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string) (string, error) + Run(ctx command.ProjectContext, cmd string, path string, envs map[string]string, streamOutput bool) (string, error) } //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_env_step_runner.go EnvStepRunner @@ -493,7 +493,7 @@ func (p *DefaultProjectCommandRunner) runSteps(steps []valid.Step, ctx command.P case "version": out, err = p.VersionStepRunner.Run(ctx, step.ExtraArgs, absPath, envs) case "run": - out, err = p.RunStepRunner.Run(ctx, step.RunCommand, absPath, envs) + out, err = p.RunStepRunner.Run(ctx, step.RunCommand, absPath, envs, true) case "env": out, err = p.EnvStepRunner.Run(ctx, step.RunCommand, step.EnvVarValue, absPath, envs) envs[step.EnvVarName] = out diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index d057581847..6c75f3beea 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -30,6 +30,7 @@ import ( eventmocks "github.com/runatlantis/atlantis/server/events/mocks" "github.com/runatlantis/atlantis/server/events/mocks/matchers" "github.com/runatlantis/atlantis/server/events/models" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -111,7 +112,7 @@ func TestDefaultProjectCommandRunner_Plan(t *testing.T) { When(mockInit.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("init", nil) When(mockPlan.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("plan", nil) When(mockApply.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("apply", nil) - When(mockRun.Run(ctx, "", repoDir, expEnvs)).ThenReturn("run", nil) + When(mockRun.Run(ctx, "", repoDir, expEnvs, true)).ThenReturn("run", nil) res := runner.Plan(ctx) Assert(t, res.PlanSuccess != nil, "exp plan success") @@ -127,7 +128,7 @@ func TestDefaultProjectCommandRunner_Plan(t *testing.T) { case "apply": mockApply.VerifyWasCalledOnce().Run(ctx, nil, repoDir, expEnvs) case "run": - mockRun.VerifyWasCalledOnce().Run(ctx, "", repoDir, expEnvs) + mockRun.VerifyWasCalledOnce().Run(ctx, "", repoDir, expEnvs, true) } } } @@ -457,7 +458,7 @@ func TestDefaultProjectCommandRunner_Apply(t *testing.T) { When(mockInit.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("init", nil) When(mockPlan.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("plan", nil) When(mockApply.Run(ctx, nil, repoDir, expEnvs)).ThenReturn("apply", nil) - When(mockRun.Run(ctx, "", repoDir, expEnvs)).ThenReturn("run", nil) + When(mockRun.Run(ctx, "", repoDir, expEnvs, true)).ThenReturn("run", nil) When(mockEnv.Run(ctx, "", "value", repoDir, make(map[string]string))).ThenReturn("value", nil) res := runner.Apply(ctx) @@ -473,7 +474,7 @@ func TestDefaultProjectCommandRunner_Apply(t *testing.T) { case "apply": mockApply.VerifyWasCalledOnce().Run(ctx, nil, repoDir, expEnvs) case "run": - mockRun.VerifyWasCalledOnce().Run(ctx, "", repoDir, expEnvs) + mockRun.VerifyWasCalledOnce().Run(ctx, "", repoDir, expEnvs, true) case "env": mockEnv.VerifyWasCalledOnce().Run(ctx, "", "value", repoDir, expEnvs) } @@ -537,9 +538,11 @@ func TestDefaultProjectCommandRunner_RunEnvSteps(t *testing.T) { tfClient := tmocks.NewMockClient() tfVersion, err := version.NewVersion("0.12.0") Ok(t, err) + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() run := runtime.RunStepRunner{ - TerraformExecutor: tfClient, - DefaultTFVersion: tfVersion, + TerraformExecutor: tfClient, + DefaultTFVersion: tfVersion, + ProjectCmdOutputHandler: projectCmdOutputHandler, } env := runtime.EnvStepRunner{ RunStepRunner: &run, diff --git a/server/server.go b/server/server.go index fa36411611..f9f6f2b951 100644 --- a/server/server.go +++ b/server/server.go @@ -466,9 +466,10 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { defaultTfVersion := terraformClient.DefaultVersion() pendingPlanFinder := &events.DefaultPendingPlanFinder{} runStepRunner := &runtime.RunStepRunner{ - TerraformExecutor: terraformClient, - DefaultTFVersion: defaultTfVersion, - TerraformBinDir: terraformClient.TerraformBinDir(), + TerraformExecutor: terraformClient, + DefaultTFVersion: defaultTfVersion, + TerraformBinDir: terraformClient.TerraformBinDir(), + ProjectCmdOutputHandler: projectCmdOutputHandler, } drainer := &events.Drainer{} statusController := &controllers.StatusController{