Skip to content

Commit

Permalink
feat: stream output for custom workflows (#2261)
Browse files Browse the repository at this point in the history
* Start threading job output to RunStepRunner

* Strip ANSI

* Fix lint

* Use waitgroup to avoid test flakiness

* Move waitgroup higher

* Add ANSI test and use strings.Builder

* Fix lint

* Use errors.Wrap per style guide

* Create ShellCommandRunner to encapsulate streaming

* WIP: shell command runner

* Update signatures to propagate error finding version

* Fix log output

* Fix error checking

* Fix accidental whitespace stripping

* Remove unused struct field

* Fix error checking in terraform client

* Add unit tests to verify command output handler was called

* Remove err from async interface

* Remove duplicative log now that shell command runner does it

* Hide output in stream for env/multienv

* Add comment explaining goroutines

* Use printf for better macOS compatibility
  • Loading branch information
ascandella authored Jun 22, 2022
1 parent d6aa1e0 commit ff1094f
Show file tree
Hide file tree
Showing 19 changed files with 360 additions and 182 deletions.
5 changes: 3 additions & 2 deletions server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,9 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
TerraformExecutor: terraformClient,
},
RunStepRunner: &runtime.RunStepRunner{
TerraformExecutor: terraformClient,
DefaultTFVersion: defaultTFVersion,
TerraformExecutor: terraformClient,
DefaultTFVersion: defaultTFVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
},
WorkingDir: workingDir,
Webhooks: &mockWebhookSender{},
Expand Down
10 changes: 5 additions & 5 deletions server/core/runtime/apply_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
. "github.com/petergtz/pegomock"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/core/runtime"
"github.com/runatlantis/atlantis/server/core/terraform"
runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/command"
Expand Down Expand Up @@ -371,11 +371,11 @@ type remoteApplyMock struct {
}

// RunCommandAsync fakes out running terraform async.
func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) {
func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) {
r.CalledArgs = args

in := make(chan string)
out := make(chan terraform.Line)
out := make(chan runtimemodels.Line)

// We use a wait group to ensure our sending and receiving routines have
// completed.
Expand All @@ -398,10 +398,10 @@ func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path strin
// Asynchronously send the lines we're supposed to.
go func() {
for _, line := range strings.Split(r.LinesToSend, "\n") {
out <- terraform.Line{Line: line}
out <- runtimemodels.Line{Line: line}
}
if r.Err != nil {
out <- terraform.Line{Err: r.Err}
out <- runtimemodels.Line{Err: r.Err}
}
close(out)
wg.Done()
Expand Down
4 changes: 3 additions & 1 deletion server/core/runtime/env_step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ func (r *EnvStepRunner) Run(ctx command.ProjectContext, command string, value st
if value != "" {
return value, nil
}
res, err := r.RunStepRunner.Run(ctx, command, path, envs)
// Pass `false` for streamOutput because this isn't interesting to the user reading the build logs
// in the web UI.
res, err := r.RunStepRunner.Run(ctx, command, path, envs, false)
// Trim newline from res to support running `echo env_value` which has
// a newline. We don't recommend users run echo -n env_value to remove the
// newline because -n doesn't work in the sh shell which is what we use
Expand Down
7 changes: 5 additions & 2 deletions server/core/runtime/env_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/models"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"

. "github.com/petergtz/pegomock"
Expand Down Expand Up @@ -40,9 +41,11 @@ func TestEnvStepRunner_Run(t *testing.T) {
tfClient := mocks.NewMockClient()
tfVersion, err := version.NewVersion("0.12.0")
Ok(t, err)
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()
runStepRunner := runtime.RunStepRunner{
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
}
envRunner := runtime.EnvStepRunner{
RunStepRunner: &runStepRunner,
Expand Down
161 changes: 161 additions & 0 deletions server/core/runtime/models/shell_command_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package models

import (
"bufio"
"io"
"os/exec"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/jobs"
)

// Setting the buffer size to 10mb
const BufioScannerBufferSize = 10 * 1024 * 1024

// Line represents a line that was output from a shell command.
type Line struct {
// Line is the contents of the line (without the newline).
Line string
// Err is set if there was an error.
Err error
}

// ShellCommandRunner runs a command via `exec.Command` and streams output to the
// `ProjectCommandOutputHandler`.
type ShellCommandRunner struct {
command string
workingDir string
outputHandler jobs.ProjectCommandOutputHandler
streamOutput bool
cmd *exec.Cmd
}

func NewShellCommandRunner(command string, environ []string, workingDir string, streamOutput bool, outputHandler jobs.ProjectCommandOutputHandler) *ShellCommandRunner {
cmd := exec.Command("sh", "-c", command) // #nosec
cmd.Env = environ
cmd.Dir = workingDir

return &ShellCommandRunner{
command: command,
workingDir: workingDir,
outputHandler: outputHandler,
streamOutput: streamOutput,
cmd: cmd,
}
}

func (s *ShellCommandRunner) Run(ctx command.ProjectContext) (string, error) {
_, outCh := s.RunCommandAsync(ctx)

outbuf := new(strings.Builder)
var err error
for line := range outCh {
if line.Err != nil {
err = line.Err
break
}
outbuf.WriteString(line.Line)
outbuf.WriteString("\n")
}

// sanitize output by stripping out any ansi characters.
output := ansi.Strip(outbuf.String())
return output, err
}

// RunCommandAsync runs terraform with args. It immediately returns an
// input and output channel. Callers can use the output channel to
// get the realtime output from the command.
// Callers can use the input channel to pass stdin input to the command.
// If any error is passed on the out channel, there will be no
// further output (so callers are free to exit).
func (s *ShellCommandRunner) RunCommandAsync(ctx command.ProjectContext) (chan<- string, <-chan Line) {
outCh := make(chan Line)
inCh := make(chan string)

// We start a goroutine to do our work asynchronously and then immediately
// return our channels.
go func() {
// Ensure we close our channels when we exit.
defer func() {
close(outCh)
close(inCh)
}()

stdout, _ := s.cmd.StdoutPipe()
stderr, _ := s.cmd.StderrPipe()
stdin, _ := s.cmd.StdinPipe()

ctx.Log.Debug("starting %q in %q", s.command, s.workingDir)
err := s.cmd.Start()
if err != nil {
err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir)
ctx.Log.Err(err.Error())
outCh <- Line{Err: err}
return
}

// If we get anything on inCh, write it to stdin.
// This function will exit when inCh is closed which we do in our defer.
go func() {
for line := range inCh {
ctx.Log.Debug("writing %q to remote command's stdin", line)
_, err := io.WriteString(stdin, line)
if err != nil {
ctx.Log.Err(errors.Wrapf(err, "writing %q to process", line).Error())
}
}
}()

wg := new(sync.WaitGroup)
wg.Add(2)
// Asynchronously copy from stdout/err to outCh.
go func() {
scanner := bufio.NewScanner(stdout)
buf := []byte{}
scanner.Buffer(buf, BufioScannerBufferSize)

for scanner.Scan() {
message := scanner.Text()
outCh <- Line{Line: message}
if s.streamOutput {
s.outputHandler.Send(ctx, message, false)
}
}
wg.Done()
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
message := scanner.Text()
outCh <- Line{Line: message}
if s.streamOutput {
s.outputHandler.Send(ctx, message, false)
}
}
wg.Done()
}()

// Wait for our copying to complete. This *must* be done before
// calling cmd.Wait(). (see https://github.com/golang/go/issues/19685)
wg.Wait()

// Wait for the command to complete.
err = s.cmd.Wait()

// We're done now. Send an error if there was one.
if err != nil {
err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir)
ctx.Log.Err(err.Error())
outCh <- Line{Err: err}
} else {
ctx.Log.Info("successfully ran %q in %q", s.command, s.workingDir)
}
}()

return inCh, outCh
}
75 changes: 75 additions & 0 deletions server/core/runtime/models/shell_command_runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package models_test

import (
"fmt"
"os"
"strings"
"testing"

. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/core/runtime/mocks/matchers"
"github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)

func TestShellCommandRunner_Run(t *testing.T) {
cases := []struct {
Command string
ExpLines []string
Environ map[string]string
}{
{
Command: "echo $HELLO",
Environ: map[string]string{
"HELLO": "world",
},
ExpLines: []string{"world"},
},
{
Command: ">&2 echo this is an error",
ExpLines: []string{"this is an error"},
},
}

for _, c := range cases {
t.Run(c.Command, func(t *testing.T) {
RegisterMockTestingT(t)
ctx := command.ProjectContext{
Log: logging.NewNoopLogger(t),
Workspace: "default",
RepoRelDir: ".",
}
projectCmdOutputHandler := mocks.NewMockProjectCommandOutputHandler()

cwd, err := os.Getwd()
Ok(t, err)
environ := []string{}
for k, v := range c.Environ {
environ = append(environ, fmt.Sprintf("%s=%s", k, v))
}
expectedOutput := fmt.Sprintf("%s\n", strings.Join(c.ExpLines, "\n"))

// Run once with streaming enabled
runner := models.NewShellCommandRunner(c.Command, environ, cwd, true, projectCmdOutputHandler)
output, err := runner.Run(ctx)
Ok(t, err)
Equals(t, expectedOutput, output)
for _, line := range c.ExpLines {
projectCmdOutputHandler.VerifyWasCalledOnce().Send(ctx, line, false)
}

// And again with streaming disabled. Everything should be the same except the
// command output handler should not have received anything

projectCmdOutputHandler = mocks.NewMockProjectCommandOutputHandler()
runner = models.NewShellCommandRunner(c.Command, environ, cwd, false, projectCmdOutputHandler)
output, err = runner.Run(ctx)
Ok(t, err)
Equals(t, expectedOutput, output)
projectCmdOutputHandler.VerifyWasCalled(Never()).Send(matchers.AnyModelsProjectCommandContext(), AnyString(), EqBool(false))
})
}
}
2 changes: 1 addition & 1 deletion server/core/runtime/multienv_step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type MultiEnvStepRunner struct {
// Run runs the multienv step command.
// The command must return a json string containing the array of name-value pairs that are being added as extra environment variables
func (r *MultiEnvStepRunner) Run(ctx command.ProjectContext, command string, path string, envs map[string]string) (string, error) {
res, err := r.RunStepRunner.Run(ctx, command, path, envs)
res, err := r.RunStepRunner.Run(ctx, command, path, envs, false)
if err == nil {
envVars := strings.Split(res, ",")
if len(envVars) > 0 {
Expand Down
7 changes: 5 additions & 2 deletions server/core/runtime/multienv_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/models"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)
Expand All @@ -31,9 +32,11 @@ func TestMultiEnvStepRunner_Run(t *testing.T) {
tfClient := mocks.NewMockClient()
tfVersion, err := version.NewVersion("0.12.0")
Ok(t, err)
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()
runStepRunner := runtime.RunStepRunner{
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
}
multiEnvStepRunner := runtime.MultiEnvStepRunner{
RunStepRunner: &runStepRunner,
Expand Down
8 changes: 4 additions & 4 deletions server/core/runtime/plan_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"testing"

"github.com/hashicorp/go-version"
"github.com/runatlantis/atlantis/server/core/terraform"
"github.com/runatlantis/atlantis/server/events/command"
mocks2 "github.com/runatlantis/atlantis/server/events/mocks"

. "github.com/petergtz/pegomock"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/core/runtime"
runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/mocks/matchers"
Expand Down Expand Up @@ -885,13 +885,13 @@ type remotePlanMock struct {
CalledArgs []string
}

func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) {
func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) {
r.CalledArgs = args
in := make(chan string)
out := make(chan terraform.Line)
out := make(chan runtimemodels.Line)
go func() {
for _, line := range strings.Split(r.LinesToSend, "\n") {
out <- terraform.Line{Line: line}
out <- runtimemodels.Line{Line: line}
}
close(out)
close(in)
Expand Down
Loading

0 comments on commit ff1094f

Please sign in to comment.