Skip to content

Commit

Permalink
cli: stream both stdout and stderr when following an alloc. (#16556)
Browse files Browse the repository at this point in the history
This update changes the behaviour when following logs from an
allocation, so that both stdout and stderr files streamed when the
operator supplies the follow flag. The previous behaviour is held
when all other flags and situations are provided.

Co-authored-by: Luiz Aoqui <luiz@hashicorp.com>
  • Loading branch information
jrasell and lgfa29 authored Apr 4, 2023
1 parent 6b64b20 commit 4848bfa
Show file tree
Hide file tree
Showing 11 changed files with 505 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .changelog/16556.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: stream both stdout and stderr logs by default when following an allocation
```
8 changes: 8 additions & 0 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const (
// and end of a file.
OriginStart = "start"
OriginEnd = "end"

// FSLogNameStdout is the name given to the stdout log stream of a task. It
// can be used when calling AllocFS.Logs as the logType parameter.
FSLogNameStdout = "stdout"

// FSLogNameStderr is the name given to the stderr log stream of a task. It
// can be used when calling AllocFS.Logs as the logType parameter.
FSLogNameStderr = "stderr"
)

// AllocFileInfo holds information about a file inside the AllocDir
Expand Down
1 change: 1 addition & 0 deletions ci/test-core.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"client/taskenv/...",
"command/agent/...",
"command/raft_tools/...",
"command/ui/...",
"helper/...",
"internal/...",
"jobspec/...",
Expand Down
216 changes: 160 additions & 56 deletions command/alloc_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ import (

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/command/ui"
"github.com/posener/complete"
)

type AllocLogsCommand struct {
Meta

// The fields below represent the commands flags.
verbose, job, tail, stderr, stdout, follow bool
numLines int64
numBytes int64
task string
}

func (l *AllocLogsCommand) Help() string {
Expand All @@ -35,6 +42,11 @@ General Options:
Logs Specific Options:
-stdout
Display stdout logs. This is used as the default value in all commands
except when using the "-f" flag where both stdout and stderr are used as
default.
-stderr
Display stderr logs.
Expand All @@ -50,7 +62,9 @@ Logs Specific Options:
-f
Causes the output to not stop when the end of the logs are reached, but
rather to wait for additional output.
rather to wait for additional output. When supplied with no other flags
except optionally "-job" and "-task", both stdout and stderr logs will be
followed.
-tail
Show the logs contents with offsets relative to the end of the logs. If no
Expand Down Expand Up @@ -79,6 +93,7 @@ func (l *AllocLogsCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(l.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-stderr": complete.PredictNothing,
"-stdout": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-task": complete.PredictAnything,
"-job": complete.PredictAnything,
Expand Down Expand Up @@ -107,28 +122,26 @@ func (l *AllocLogsCommand) AutocompleteArgs() complete.Predictor {
func (l *AllocLogsCommand) Name() string { return "alloc logs" }

func (l *AllocLogsCommand) Run(args []string) int {
var verbose, job, tail, stderr, follow bool
var numLines, numBytes int64
var task string

flags := l.Meta.FlagSet(l.Name(), FlagSetClient)
flags.Usage = func() { l.Ui.Output(l.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&job, "job", false, "")
flags.BoolVar(&tail, "tail", false, "")
flags.BoolVar(&follow, "f", false, "")
flags.BoolVar(&stderr, "stderr", false, "")
flags.Int64Var(&numLines, "n", -1, "")
flags.Int64Var(&numBytes, "c", -1, "")
flags.StringVar(&task, "task", "", "")
flags.BoolVar(&l.verbose, "verbose", false, "")
flags.BoolVar(&l.job, "job", false, "")
flags.BoolVar(&l.tail, "tail", false, "")
flags.BoolVar(&l.follow, "f", false, "")
flags.BoolVar(&l.stderr, "stderr", false, "")
flags.BoolVar(&l.stdout, "stdout", false, "")
flags.Int64Var(&l.numLines, "n", -1, "")
flags.Int64Var(&l.numBytes, "c", -1, "")
flags.StringVar(&l.task, "task", "", "")

if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()

if numArgs := len(args); numArgs < 1 {
if job {
if l.job {
l.Ui.Error("A job ID is required")
} else {
l.Ui.Error("An allocation ID is required")
Expand All @@ -150,7 +163,7 @@ func (l *AllocLogsCommand) Run(args []string) int {

// If -job is specified, use random allocation, otherwise use provided allocation
allocID := args[0]
if job {
if l.job {
allocID, err = getRandomJobAllocID(client, args[0])
if err != nil {
l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err))
Expand All @@ -160,7 +173,7 @@ func (l *AllocLogsCommand) Run(args []string) int {

// Truncate the id unless full length is requested
length := shortId
if verbose {
if l.verbose {
length = fullId
}
// Query the allocation info
Expand All @@ -181,7 +194,7 @@ func (l *AllocLogsCommand) Run(args []string) int {
}
if len(allocs) > 1 {
// Format the allocs
out := formatAllocListStubs(allocs, verbose, length)
out := formatAllocListStubs(allocs, l.verbose, length)
l.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out))
return 1
}
Expand All @@ -195,110 +208,201 @@ func (l *AllocLogsCommand) Run(args []string) int {

// If -task isn't provided fallback to reading the task name
// from args.
if task != "" {
err = validateTaskExistsInAllocation(task, alloc)
if l.task != "" {
err = validateTaskExistsInAllocation(l.task, alloc)
} else {
if len(args) >= 2 {
task = args[1]
if task == "" {
l.task = args[1]
if l.task == "" {
l.Ui.Error("Task name required")
return 1
}
} else {
task, err = lookupAllocTask(alloc)
l.task, err = lookupAllocTask(alloc)
}
}
if err != nil {
l.Ui.Error(fmt.Sprintf("Failed to validate task: %s", err))
return 1
}

logType := "stdout"
if stderr {
logType = "stderr"
// In order to run the mixed log output, we can only follow the files from
// their current positions. There is no way to interleave previous log
// lines as there is no timestamp references.
if l.follow && !(l.stderr || l.stdout || l.tail || l.numLines > 0 || l.numBytes > 0) {
if err := l.tailMultipleFiles(client, alloc); err != nil {
l.Ui.Error(fmt.Sprintf("Failed to tail stdout and stderr files: %v", err))
return 1
}
} else {

// If we are not strictly following the two files, we cannot support
// specifying both are targets.
if l.stderr && l.stdout {
l.Ui.Error("Unable to support both stdout and stderr")
return 1
}

logType := api.FSLogNameStdout
if l.stderr {
logType = api.FSLogNameStderr
}
if err := l.handleSingleFile(client, alloc, logType); err != nil {
l.Ui.Error(fmt.Sprintf("Failed to read %s file: %v", logType, err))
return 1
}
}

return 0
}

func (l *AllocLogsCommand) handleSingleFile(client *api.Client, alloc *api.Allocation, logType string) error {
// We have a file, output it.
var r io.ReadCloser
var readErr error
if !tail {
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0)
if !l.tail {
r, readErr = l.followFile(client, alloc, logType, api.OriginStart, 0)
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
return fmt.Errorf("error reading file: %v", readErr)
}
} else {
// Parse the offset
var offset int64 = defaultTailLines * bytesToLines
var offset = defaultTailLines * bytesToLines

if nLines, nBytes := numLines != -1, numBytes != -1; nLines && nBytes {
l.Ui.Error("Both -n and -c set")
return 1
if nLines, nBytes := l.numLines != -1, l.numBytes != -1; nLines && nBytes {
return errors.New("both -n and -c set")
} else if nLines {
offset = numLines * bytesToLines
offset = l.numLines * bytesToLines
} else if nBytes {
offset = numBytes
offset = l.numBytes
} else {
numLines = defaultTailLines
l.numLines = defaultTailLines
}

r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset)
r, readErr = l.followFile(client, alloc, logType, api.OriginEnd, offset)

// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
if l.numLines != -1 {
r = NewLineLimitReader(r, int(l.numLines), int(l.numLines*bytesToLines), 1*time.Second)
}

if readErr != nil {
readErr = fmt.Errorf("Error tailing file: %v", readErr)
return fmt.Errorf("error tailing file: %v", readErr)
}
}

if readErr != nil {
l.Ui.Error(readErr.Error())
return 1
}

defer r.Close()
_, err = io.Copy(os.Stdout, r)
if err != nil {
l.Ui.Error(fmt.Sprintf("error following logs: %s", err))
return 1
if _, err := io.Copy(os.Stdout, r); err != nil {
return fmt.Errorf("error following logs: %s", err)
}

return 0
return nil
}

// followFile outputs the contents of the file to stdout relative to the end of
// the file.
func (l *AllocLogsCommand) followFile(client *api.Client, alloc *api.Allocation,
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {
logType, origin string, offset int64) (io.ReadCloser, error) {

cancel := make(chan struct{})
frames, errCh := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
frames, errCh := client.AllocFS().Logs(alloc, l.follow, l.task, logType, origin, offset, cancel, nil)

// Setting up the logs stream can fail, therefore we need to check the
// error channel before continuing further.
select {
case err := <-errCh:
return nil, err
default:
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

// Create a reader
// Create a reader but don't initially cast it to an io.ReadCloser so that
// we can set the unblock time.
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

// This go routine blocks until the command receives an interrupt or
// terminate signal, at which point we close the ReadCloser.
go func() {
<-signalCh

// End the streaming
r.Close()
_ = r.Close()
}()

return r, nil
}

// tailMultipleFiles will follow both stdout and stderr log files of the passed
// allocation. Each stream will be output to the users console via stout and
// stderr until the user cancels it.
func (l *AllocLogsCommand) tailMultipleFiles(client *api.Client, alloc *api.Allocation) error {

// Use a single cancel channel for both log streams, so we only have to
// close one.
cancel := make(chan struct{})

// Ensure the channel is closed in order to notify listeners whenever we
// exit.
defer close(cancel)

stdoutFrames, stdoutErrCh := client.AllocFS().Logs(
alloc, true, l.task, api.FSLogNameStdout, api.OriginEnd, 1, cancel, nil)

// Setting up the logs stream can fail, therefore we need to check the
// error channel before continuing further.
select {
case err := <-stdoutErrCh:
return fmt.Errorf("failed to setup stdout log tailing: %v", err)
default:
}

stderrFrames, stderrErrCh := client.AllocFS().Logs(
alloc, true, l.task, api.FSLogNameStderr, api.OriginEnd, 1, cancel, nil)

// Setting up the logs stream can fail, therefore we need to check the
// error channel before continuing further.
select {
case err := <-stderrErrCh:
return fmt.Errorf("failed to setup stderr log tailing: %v", err)
default:
}

// Trap user signals, so we know when to exit and cancel the log streams
// running in the background.
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

// Generate our logging UI that doesn't add any additional formatting to
// output strings.
logUI, err := ui.NewLogUI(l.Ui)
if err != nil {
return err
}

// Enter the main loop where we listen for log frames, errors, and a cancel
// signal. Any error at this point will result in the stream being ended,
// therefore should result in this command exiting. Otherwise, we would
// just be printing a single stream, which might be hard to notice for the
// user.
for {
select {
case <-signalCh:
return nil
case stdoutErr := <-stdoutErrCh:
return fmt.Errorf("received an error from stdout log stream: %v", stdoutErr)
case stdoutFrame := <-stdoutFrames:
logUI.Output(string(stdoutFrame.Data))
case stderrErr := <-stderrErrCh:
return fmt.Errorf("received an error from stderr log stream: %v", stderrErr)
case stderrFrame := <-stderrFrames:
logUI.Warn(string(stderrFrame.Data))
}
}
}

func lookupAllocTask(alloc *api.Allocation) (string, error) {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
Expand Down
Loading

0 comments on commit 4848bfa

Please sign in to comment.