Skip to content

Commit

Permalink
cli: ensure -stale flag is respected by nomad operator debug (#11678
Browse files Browse the repository at this point in the history
)

When a cluster doesn't have a leader, the `nomad operator debug`
command can safely use stale queries to gracefully degrade the
consistency of almost all its queries. The query parameter for these
API calls was not being set by the command.

Some `api` package queries do not include `QueryOptions` because
they target a specific agent, but they can potentially be forwarded to
other agents. If there is no leader, these forwarded queries will
fail. Provide methods to call these APIs with `QueryOptions`.
  • Loading branch information
tgross authored and lgfa29 committed Jan 17, 2022
1 parent 060a474 commit 04cbf5d
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 36 deletions.
3 changes: 3 additions & 0 deletions .changelog/11678.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
cli: Fixed a bug where the `-stale` flag was not respected by `nomad operator debug`
```
11 changes: 11 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func (a *Agent) Members() (*ServerMembers, error) {
return resp, nil
}

// Members is used to query all of the known server members
// with the ability to set QueryOptions
func (a *Agent) MembersOpts(opts *QueryOptions) (*ServerMembers, error) {
var resp *ServerMembers
_, err := a.client.query("/v1/agent/members", &resp, opts)
if err != nil {
return nil, err
}
return resp, nil
}

// ForceLeave is used to eject an existing node from the cluster.
func (a *Agent) ForceLeave(node string) error {
_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
Expand Down
9 changes: 9 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (n *Nodes) PrefixList(prefix string) ([]*NodeListStub, *QueryMeta, error) {
return n.List(&QueryOptions{Prefix: prefix})
}

func (n *Nodes) PrefixListOpts(prefix string, opts *QueryOptions) ([]*NodeListStub, *QueryMeta, error) {
if opts == nil {
opts = &QueryOptions{Prefix: prefix}
} else {
opts.Prefix = prefix
}
return n.List(opts)
}

// Info is used to query a specific node by its ID.
func (n *Nodes) Info(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error) {
var resp Node
Expand Down
3 changes: 2 additions & 1 deletion api/regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ func (c *Client) Regions() *Regions {
return &Regions{client: c}
}

// List returns a list of all of the regions.
// List returns a list of all of the regions from the server
// that serves the request. It is never forwarded to a leader.
func (r *Regions) List() ([]string, error) {
var resp []string
if _, err := r.client.query("/v1/regions", &resp, nil); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques
member = srv.LocalMember()
aclObj, err = srv.ResolveToken(secret)
} else {
// Not a Server; use the Client for token resolution
// Not a Server, so use the Client for token resolution. Note
// this gets forwarded to a server with AllowStale = true if
// the local ACL cache TTL has expired (30s by default)
aclObj, err = s.agent.Client().ResolveToken(secret)
}

Expand Down Expand Up @@ -677,7 +679,9 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
aclObj, err = srv.ResolveToken(secret)
enableDebug = srv.GetConfig().EnableDebug
} else {
// Not a Server; use the Client for token resolution
// Not a Server, so use the Client for token resolution. Note
// this gets forwarded to a server with AllowStale = true if
// the local ACL cache TTL has expired (30s by default)
aclObj, err = s.agent.Client().ResolveToken(secret)
enableDebug = s.agent.Client().GetConfig().EnableDebug
}
Expand Down
82 changes: 49 additions & 33 deletions command/operator_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type OperatorDebugCommand struct {
interval time.Duration
pprofDuration time.Duration
logLevel string
stale bool
maxNodes int
nodeClass string
nodeIDs []string
Expand All @@ -45,6 +44,7 @@ type OperatorDebugCommand struct {
manifest []string
ctx context.Context
cancel context.CancelFunc
opts *api.QueryOptions
}

const (
Expand Down Expand Up @@ -132,7 +132,7 @@ Debug Options:
The duration of the log monitor command. Defaults to 2m.
-interval=<interval>
The interval between snapshots of the Nomad state. Set interval equal to
The interval between snapshots of the Nomad state. Set interval equal to
duration to capture a single snapshot. Defaults to 30s.
-log-level=<level>
Expand Down Expand Up @@ -195,6 +195,15 @@ func (c *OperatorDebugCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}

// queryOpts returns a copy of the shared api.QueryOptions so
// that api package methods can safely modify the options
func (c *OperatorDebugCommand) queryOpts() *api.QueryOptions {
qo := new(api.QueryOptions)
*qo = *c.opts
qo.Params = helper.CopyMapStringString(c.opts.Params)
return qo
}

func (c *OperatorDebugCommand) Name() string { return "debug" }

func (c *OperatorDebugCommand) Run(args []string) int {
Expand All @@ -203,6 +212,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {

var duration, interval, output, pprofDuration string
var nodeIDs, serverIDs string
var allowStale bool

flags.StringVar(&duration, "duration", "2m", "")
flags.StringVar(&interval, "interval", "30s", "")
Expand All @@ -211,7 +221,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags.StringVar(&c.nodeClass, "node-class", "", "")
flags.StringVar(&nodeIDs, "node-id", "", "")
flags.StringVar(&serverIDs, "server-id", "all", "")
flags.BoolVar(&c.stale, "stale", false, "")
flags.BoolVar(&allowStale, "stale", false, "")
flags.StringVar(&output, "output", "", "")
flags.StringVar(&pprofDuration, "pprof-duration", "1s", "")

Expand Down Expand Up @@ -319,6 +329,12 @@ func (c *OperatorDebugCommand) Run(args []string) int {
return 1
}

c.opts = &api.QueryOptions{
Region: c.Meta.region,
AllowStale: allowStale,
AuthToken: c.Meta.token,
}

// Search all nodes If a node class is specified without a list of node id prefixes
if c.nodeClass != "" && nodeIDs == "" {
nodeIDs = "all"
Expand All @@ -337,7 +353,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
// Capture from nodes starting with prefix id
id = sanitizeUUIDPrefix(id)
}
nodes, _, err := client.Nodes().PrefixList(id)
nodes, _, err := client.Nodes().PrefixListOpts(id, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err))
return 1
Expand Down Expand Up @@ -377,7 +393,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
}

// Resolve servers
members, err := client.Agent().Members()
members, err := client.Agent().MembersOpts(c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to retrieve server list; err: %v", err))
return 1
Expand Down Expand Up @@ -470,8 +486,7 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
self, err := client.Agent().Self()
c.writeJSON(dir, "agent-self.json", self, err)

var qo *api.QueryOptions
namespaces, _, err := client.Namespaces().List(qo)
namespaces, _, err := client.Namespaces().List(c.queryOpts())
c.writeJSON(dir, "namespaces.json", namespaces, err)

regions, err := client.Regions().List()
Expand Down Expand Up @@ -566,6 +581,7 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client *
idKey: nodeID,
"log_level": c.logLevel,
},
AllowStale: c.queryOpts().AllowStale,
}

outCh, errCh := client.Agent().Monitor(c.ctx.Done(), &qo)
Expand Down Expand Up @@ -603,9 +619,9 @@ func (c *OperatorDebugCommand) collectAgentHost(path, id string, client *api.Cli
var host *api.HostDataResponse
var err error
if path == "server" {
host, err = client.Agent().Host(id, "", nil)
host, err = client.Agent().Host(id, "", c.queryOpts())
} else {
host, err = client.Agent().Host("", id, nil)
host, err = client.Agent().Host("", id, c.queryOpts())
}

if err != nil {
Expand Down Expand Up @@ -645,7 +661,7 @@ func (c *OperatorDebugCommand) collectPprof(path, id string, client *api.Client)

path = filepath.Join(path, id)

bs, err := client.Agent().CPUProfile(opts, nil)
bs, err := client.Agent().CPUProfile(opts, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("%s: Failed to retrieve pprof profile.prof, err: %v", path, err))
if structs.IsErrPermissionDenied(err) {
Expand Down Expand Up @@ -694,7 +710,7 @@ func (c *OperatorDebugCommand) savePprofProfile(path string, profile string, opt
fileName = fmt.Sprintf("%s-debug%d.txt", profile, opts.Debug)
}

bs, err := retrievePprofProfile(profile, opts, client)
bs, err := retrievePprofProfile(profile, opts, client, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("%s: Failed to retrieve pprof %s, err: %s", path, fileName, err.Error()))
}
Expand All @@ -705,22 +721,23 @@ func (c *OperatorDebugCommand) savePprofProfile(path string, profile string, opt
}
}

// retrievePprofProfile gets a pprof profile from the node specified in opts using the API client
func retrievePprofProfile(profile string, opts api.PprofOptions, client *api.Client) (bs []byte, err error) {
// retrievePprofProfile gets a pprof profile from the node specified
// in opts using the API client
func retrievePprofProfile(profile string, opts api.PprofOptions, client *api.Client, qopts *api.QueryOptions) (bs []byte, err error) {
switch profile {
case "cpuprofile":
bs, err = client.Agent().CPUProfile(opts, nil)
bs, err = client.Agent().CPUProfile(opts, qopts)
case "trace":
bs, err = client.Agent().Trace(opts, nil)
bs, err = client.Agent().Trace(opts, qopts)
default:
bs, err = client.Agent().Lookup(profile, opts, nil)
bs, err = client.Agent().Lookup(profile, opts, qopts)
}

return bs, err
}

// collectPeriodic runs for duration, capturing the cluster state every interval. It flushes and stops
// the monitor requests
// collectPeriodic runs for duration, capturing the cluster state
// every interval. It flushes and stops the monitor requests
func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) {
duration := time.After(c.duration)
// Set interval to 0 so that we immediately execute, wait the interval next time
Expand Down Expand Up @@ -751,61 +768,60 @@ func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) {

// collectOperator captures some cluster meta information
func (c *OperatorDebugCommand) collectOperator(dir string, client *api.Client) {
rc, err := client.Operator().RaftGetConfiguration(nil)
rc, err := client.Operator().RaftGetConfiguration(c.queryOpts())
c.writeJSON(dir, "operator-raft.json", rc, err)

sc, _, err := client.Operator().SchedulerGetConfiguration(nil)
sc, _, err := client.Operator().SchedulerGetConfiguration(c.queryOpts())
c.writeJSON(dir, "operator-scheduler.json", sc, err)

ah, _, err := client.Operator().AutopilotServerHealth(nil)
ah, _, err := client.Operator().AutopilotServerHealth(c.queryOpts())
c.writeJSON(dir, "operator-autopilot-health.json", ah, err)

lic, _, err := client.Operator().LicenseGet(nil)
lic, _, err := client.Operator().LicenseGet(c.queryOpts())
c.writeJSON(dir, "license.json", lic, err)
}

// collectNomad captures the nomad cluster state
func (c *OperatorDebugCommand) collectNomad(dir string, client *api.Client) error {
var qo *api.QueryOptions

js, _, err := client.Jobs().List(qo)
js, _, err := client.Jobs().List(c.queryOpts())
c.writeJSON(dir, "jobs.json", js, err)

ds, _, err := client.Deployments().List(qo)
ds, _, err := client.Deployments().List(c.queryOpts())
c.writeJSON(dir, "deployments.json", ds, err)

es, _, err := client.Evaluations().List(qo)
es, _, err := client.Evaluations().List(c.queryOpts())
c.writeJSON(dir, "evaluations.json", es, err)

as, _, err := client.Allocations().List(qo)
as, _, err := client.Allocations().List(c.queryOpts())
c.writeJSON(dir, "allocations.json", as, err)

ns, _, err := client.Nodes().List(qo)
ns, _, err := client.Nodes().List(c.queryOpts())
c.writeJSON(dir, "nodes.json", ns, err)

// CSI Plugins - /v1/plugins?type=csi
ps, _, err := client.CSIPlugins().List(qo)
ps, _, err := client.CSIPlugins().List(c.queryOpts())
c.writeJSON(dir, "plugins.json", ps, err)

// CSI Plugin details - /v1/plugin/csi/:plugin_id
for _, p := range ps {
csiPlugin, _, err := client.CSIPlugins().Info(p.ID, qo)
csiPlugin, _, err := client.CSIPlugins().Info(p.ID, c.queryOpts())
csiPluginFileName := fmt.Sprintf("csi-plugin-id-%s.json", p.ID)
c.writeJSON(dir, csiPluginFileName, csiPlugin, err)
}

// CSI Volumes - /v1/volumes?type=csi
csiVolumes, _, err := client.CSIVolumes().List(qo)
csiVolumes, _, err := client.CSIVolumes().List(c.queryOpts())
c.writeJSON(dir, "csi-volumes.json", csiVolumes, err)

// CSI Volume details - /v1/volumes/csi/:volume-id
for _, v := range csiVolumes {
csiVolume, _, err := client.CSIVolumes().Info(v.ID, qo)
csiVolume, _, err := client.CSIVolumes().Info(v.ID, c.queryOpts())
csiFileName := fmt.Sprintf("csi-volume-id-%s.json", v.ID)
c.writeJSON(dir, csiFileName, csiVolume, err)
}

metrics, _, err := client.Operator().MetricsSummary(qo)
metrics, _, err := client.Operator().MetricsSummary(c.queryOpts())
c.writeJSON(dir, "metrics.json", metrics, err)

return nil
Expand Down
52 changes: 52 additions & 0 deletions command/operator_debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -550,3 +551,54 @@ func TestDebug_WriteBytes_PathEscapesSandbox(t *testing.T) {
err := cmd.writeBytes(testDir, testFile, testBytes)
require.Error(t, err)
}

// TestDebug_StaleLeadership verifies that APIs that are required to
// complete a debug run have their query options configured with the
// -stale flag
func TestDebug_StaleLeadership(t *testing.T) {
srv, _, url := testServerWithoutLeader(t, false, nil)
addrServer := srv.HTTPAddr()

t.Logf("[TEST] testAgent api address: %s", url)
t.Logf("[TEST] Server api address: %s", addrServer)

var cases = testCases{
{
name: "no leader without stale flag",
args: []string{"-address", addrServer,
"-duration", "250ms", "-interval", "250ms",
"-server-id", "all", "-node-id", "all"},
expectedCode: 1,
},
{
name: "no leader with stale flag",
args: []string{
"-address", addrServer,
"-duration", "250ms", "-interval", "250ms",
"-server-id", "all", "-node-id", "all",
"-stale"},
expectedCode: 0,
expectedOutputs: []string{"Created debug archive"},
},
}

runTestCases(t, cases)
}

func testServerWithoutLeader(t *testing.T, runClient bool, cb func(*agent.Config)) (*agent.TestAgent, *api.Client, string) {
// Make a new test server
a := agent.NewTestAgent(t, t.Name(), func(config *agent.Config) {
config.Client.Enabled = runClient
config.Server.Enabled = true
config.Server.NumSchedulers = helper.IntToPtr(0)
config.Server.BootstrapExpect = 3

if cb != nil {
cb(config)
}
})
t.Cleanup(func() { a.Shutdown() })

c := a.Client()
return a, c, a.HTTPAddr()
}

0 comments on commit 04cbf5d

Please sign in to comment.