Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: Add event stream capture to nomad operator debug #11865

Merged
merged 13 commits into from
Jan 18, 2022
3 changes: 3 additions & 0 deletions .changelog/11865.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: Add event stream capture to `nomad operator debug`
```
231 changes: 224 additions & 7 deletions command/operator_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"html/template"
Expand All @@ -21,6 +22,7 @@ import (
"time"

"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/helper"
Expand All @@ -42,12 +44,15 @@ type OperatorDebugCommand struct {
nodeClass string
nodeIDs []string
serverIDs []string
topics map[api.Topic][]string
index uint64
consul *external
vault *external
manifest []string
ctx context.Context
cancel context.CancelFunc
opts *api.QueryOptions
verbose bool
}

const (
Expand All @@ -73,6 +78,11 @@ Usage: nomad operator debug [options]
token will also require 'agent:write', or enable_debug configuration set to
true.

If event stream capture is enabled, the Job, Allocation, Deployment,
and Evaluation topics require 'namespace:read-job' capabilities, the Node
topic requires 'node:read'. A 'management' token is required to capture
ACLToken, ACLPolicy, or all all events.

General Options:

` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Expand Down Expand Up @@ -137,7 +147,20 @@ Debug Options:

-duration=<duration>
Set the duration of the debug capture. Logs will be captured from specified servers and
nodes at "log-level". Defaults to 2m.
nodes at "log-level". Defaults to 2m.

-event-index=<index>
Specifies the index to start streaming events from. If the requested index is
no longer in the buffer the stream will start at the next available index.
Defaults to 0.

-event-topic=<Allocation,Evaluation,Job,Node,*>:<filter>
Enable event stream capture, filtered by comma delimited list of topic filters.
Examples:
"all" or "*:*" for all events
"Evaluation" or "Evaluation:*" for all evaluation events
"*:example" for all events related to the job "example"
Defaults to "none" (disabled).

-interval=<interval>
The interval between snapshots of the Nomad state. Set interval equal to
Expand Down Expand Up @@ -173,7 +196,10 @@ Debug Options:

-output=<path>
Path to the parent directory of the output directory. If specified, no
archive is built. Defaults to the current directory.
archive is built. Defaults to the current directory.

-verbose
Enable verbose output.
`
return strings.TrimSpace(helpText)
}
Expand All @@ -186,6 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-duration": complete.PredictAnything,
"-event-index": complete.PredictAnything,
"-event-topic": complete.PredictAnything,
"-interval": complete.PredictAnything,
"-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"),
"-max-nodes": complete.PredictAnything,
Expand All @@ -196,6 +224,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
"-pprof-duration": complete.PredictAnything,
"-consul-token": complete.PredictAnything,
"-vault-token": complete.PredictAnything,
"-verbose": complete.PredictAnything,
})
}

Expand Down Expand Up @@ -225,7 +254,7 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor {
}

// NodeClassPredictor returns a client node class predictor
// TODO: Consider API options for node class filtering
// TODO dmay: Consider API options for node class filtering
func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
Expand Down Expand Up @@ -261,7 +290,7 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
}

// ServerPredictor returns a server member predictor
// TODO: Consider API options for server member filtering
// TODO dmay: Consider API options for server member filtering
func ServerPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
Expand Down Expand Up @@ -305,11 +334,14 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

var duration, interval, output, pprofDuration string
var duration, interval, output, pprofDuration, eventTopic string
var eventIndex int64
var nodeIDs, serverIDs string
var allowStale bool

flags.StringVar(&duration, "duration", "2m", "")
flags.Int64Var(&eventIndex, "event-index", 0, "")
flags.StringVar(&eventTopic, "event-topic", "none", "")
flags.StringVar(&interval, "interval", "30s", "")
flags.StringVar(&c.logLevel, "log-level", "DEBUG", "")
flags.IntVar(&c.maxNodes, "max-nodes", 10, "")
Expand All @@ -319,6 +351,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags.BoolVar(&allowStale, "stale", false, "")
flags.StringVar(&output, "output", "", "")
flags.StringVar(&pprofDuration, "pprof-duration", "1s", "")
flags.BoolVar(&c.verbose, "verbose", false, "")

c.consul = &external{tls: &api.TLSConfig{}}
flags.StringVar(&c.consul.addrVal, "consul-http-addr", os.Getenv("CONSUL_HTTP_ADDR"), "")
Expand Down Expand Up @@ -375,6 +408,21 @@ func (c *OperatorDebugCommand) Run(args []string) int {
}
c.pprofDuration = pd

// Parse event stream topic filter
t, err := topicsFromString(eventTopic)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing event topics: %v", err))
return 1
}
c.topics = t

// Validate and set initial event stream index
if eventIndex < 0 {
c.Ui.Error(fmt.Sprintf("Event stream index must be greater than zero"))
return 1
}
c.index = uint64(eventIndex)

// Verify there are no extra arguments
args = flags.Args()
if l := len(args); l != 0 {
Expand Down Expand Up @@ -550,6 +598,9 @@ func (c *OperatorDebugCommand) Run(args []string) int {
if c.pprofDuration.Seconds() != 1 {
c.Ui.Output(fmt.Sprintf(" pprof Duration: %s", c.pprofDuration))
}
if c.topics != nil {
c.Ui.Output(fmt.Sprintf(" Event topics: %+v", c.topics))
}
c.Ui.Output("")
c.Ui.Output("Capturing cluster data...")

Expand Down Expand Up @@ -584,8 +635,11 @@ func (c *OperatorDebugCommand) Run(args []string) int {

// collect collects data from our endpoints and writes the archive bundle
func (c *OperatorDebugCommand) collect(client *api.Client) error {
// Collect cluster data
// Start background captures
c.startMonitors(client)
c.startEventStream(client)

// Collect cluster data
self, err := client.Agent().Self()
c.writeJSON(clusterDir, "agent-self.json", self, err)

Expand All @@ -611,7 +665,6 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
c.collectAgentHosts(client)
c.collectPprofs(client)

c.startMonitors(client)
c.collectPeriodic(client)

return nil
Expand Down Expand Up @@ -686,6 +739,103 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client *
}
}

// captureEventStream wraps the event stream capture process.
func (c *OperatorDebugCommand) startEventStream(client *api.Client) {
c.verboseOut("Launching eventstream goroutine...")

go func() {
if err := c.captureEventStream(client); err != nil {
var es string
if mErr, ok := err.(*multierror.Error); ok {
es = multierror.ListFormatFunc(mErr.Errors)
} else {
es = err.Error()
}

c.Ui.Error(fmt.Sprintf("Error capturing event stream: %s", es))
}
}()
}

func (c *OperatorDebugCommand) captureEventStream(client *api.Client) error {
// Ensure output directory is present
path := clusterDir
if err := c.mkdir(c.path(path)); err != nil {
return err
}

// Create the output file
fh, err := os.Create(c.path(path, "eventstream.json"))
if err != nil {
return err
}
defer fh.Close()

// Get handle to events endpoint
events := client.EventStream()

// Start streaming events
eventCh, err := events.Stream(c.ctx, c.topics, c.index, c.queryOpts())
if err != nil {
if errors.Is(err, context.Canceled) {
c.verboseOut("Event stream canceled: No events captured")
return nil
}
return fmt.Errorf("failed to stream events: %w", err)
}

eventCount := 0
errCount := 0
davemay99 marked this conversation as resolved.
Show resolved Hide resolved
heartbeatCount := 0
channelEventCount := 0

var mErrs *multierror.Error

for {
select {
case event := <-eventCh:
channelEventCount++
if event.Err != nil {
errCount++
c.verboseOutf("error from event stream: index; %d err: %v", event.Index, event.Err)
mErrs = multierror.Append(mErrs, fmt.Errorf("error at index: %d, Err: %w", event.Index, event.Err))
break
}

if event.IsHeartbeat() {
heartbeatCount++
continue
}

for _, e := range event.Events {
eventCount++
c.verboseOutf("Event: %4d, Index: %d, Topic: %-10s, Type: %s, FilterKeys: %s", eventCount, e.Index, e.Topic, e.Type, e.FilterKeys)
davemay99 marked this conversation as resolved.
Show resolved Hide resolved

bytes, err := json.Marshal(e)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to marshal json from Topic: %s, Type: %s, Err: %w", e.Topic, e.Type, err))
}

n, err := fh.Write(bytes)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write bytes to eventstream.json; bytes written: %d, Err: %w", n, err))
break
}
n, err = fh.WriteString("\n")
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write string to eventstream.json; chars written: %d, Err: %w", n, err))
}
}
case <-c.ctx.Done():
c.verboseOutf("Event stream captured %d events, %d frames, %d heartbeats, %d errors", eventCount, channelEventCount, heartbeatCount, errCount)
return mErrs.ErrorOrNil()
}
}
}

// collectAgentHosts calls collectAgentHost for each selected node
func (c *OperatorDebugCommand) collectAgentHosts(client *api.Client) {
for _, n := range c.nodeIDs {
Expand Down Expand Up @@ -1192,6 +1342,16 @@ func (c *OperatorDebugCommand) trap() {
}()
}

func (c *OperatorDebugCommand) verboseOut(out string) {
if c.verbose {
c.Ui.Output(out)
}
}

func (c *OperatorDebugCommand) verboseOutf(format string, a ...interface{}) {
c.verboseOut(fmt.Sprintf(format, a...))
}

// TarCZF like the tar command, recursively builds a gzip compressed tar
// archive from a directory. If not empty, all files in the bundle are prefixed
// with the target path.
Expand Down Expand Up @@ -1312,6 +1472,63 @@ func stringToSlice(input string) []string {
return out
}

func parseEventTopics(topicList []string) (map[api.Topic][]string, error) {
topics := make(map[api.Topic][]string)

var mErrs *multierror.Error

for _, topic := range topicList {
k, v, err := parseTopic(topic)
if err != nil {
mErrs = multierror.Append(mErrs, err)
}

topics[api.Topic(k)] = append(topics[api.Topic(k)], v)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
}

return topics, mErrs.ErrorOrNil()
}

func parseTopic(input string) (string, string, error) {
var topic, filter string

parts := strings.Split(input, ":")
switch len(parts) {
case 1:
// infer wildcard if only given a topic
topic = input
filter = "*"
case 2:
topic = parts[0]
filter = parts[1]
default:
return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic)
}

return strings.Title(topic), filter, nil
}

func allTopics() map[api.Topic][]string {
return map[api.Topic][]string{"*": {"*"}}
}

// topicsFromString parses a comma separated list into a topicMap
func topicsFromString(topicList string) (map[api.Topic][]string, error) {
if topicList == "none" {
return nil, nil
}
if topicList == "all" {
return allTopics(), nil
}

topics := stringToSlice(topicList)
topicMap, err := parseEventTopics(topics)
if err != nil {
return nil, err
}
return topicMap, nil
}

// external holds address configuration for Consul and Vault APIs
type external struct {
tls *api.TLSConfig
Expand Down
Loading