Skip to content

Commit

Permalink
cli: ensure -stale flag is respected by nomad operator debug
Browse files Browse the repository at this point in the history
When a cluster doesn't have a leader, the `nomad operator debug`
command can safely use stale queries to gracefully degrade the
consistency of almost all its queries. The query parameter for these
API calls was not being set by the command.

Some `api` package queries do not include `QueryOptions` because
they target a specific agent, but they can potentially be forwarded to
other agents. If there is no leader, these forwarded queries will
fail. Provide methods to call these APIs with `QueryOptions`.
  • Loading branch information
tgross committed Dec 14, 2021
1 parent ed91a53 commit 77dd8ed
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 40 deletions.
11 changes: 11 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func (a *Agent) Members() (*ServerMembers, error) {
return resp, nil
}

// Members is used to query all of the known server members
// with the ability to set QueryOptions
func (a *Agent) MembersOpts(opts *QueryOptions) (*ServerMembers, error) {
var resp *ServerMembers
_, err := a.client.query("/v1/agent/members", &resp, opts)
if err != nil {
return nil, err
}
return resp, nil
}

// ForceLeave is used to eject an existing node from the cluster.
func (a *Agent) ForceLeave(node string) error {
_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
Expand Down
9 changes: 9 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (n *Nodes) PrefixList(prefix string) ([]*NodeListStub, *QueryMeta, error) {
return n.List(&QueryOptions{Prefix: prefix})
}

func (n *Nodes) PrefixListOpts(prefix string, opts *QueryOptions) ([]*NodeListStub, *QueryMeta, error) {
if opts == nil {
opts = &QueryOptions{Prefix: prefix}
} else {
opts.Prefix = prefix
}
return n.List(opts)
}

// Info is used to query a specific node by its ID.
func (n *Nodes) Info(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error) {
var resp Node
Expand Down
3 changes: 2 additions & 1 deletion api/regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ func (c *Client) Regions() *Regions {
return &Regions{client: c}
}

// List returns a list of all of the regions.
// List returns a list of all of the regions from the server
// that serves the request. It is never forwarded to a leader.
func (r *Regions) List() ([]string, error) {
var resp []string
if _, err := r.client.query("/v1/regions", &resp, nil); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques
member = srv.LocalMember()
aclObj, err = srv.ResolveToken(secret)
} else {
// Not a Server; use the Client for token resolution
// Not a Server, so use the Client for token resolution. Note
// this gets forwarded to a server with AllowStale = true if
// the local ACL cache TTL has expired (30s by default)
aclObj, err = s.agent.Client().ResolveToken(secret)
}

Expand Down Expand Up @@ -677,7 +679,9 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
aclObj, err = srv.ResolveToken(secret)
enableDebug = srv.GetConfig().EnableDebug
} else {
// Not a Server; use the Client for token resolution
// Not a Server, so use the Client for token resolution. Note
// this gets forwarded to a server with AllowStale = true if
// the local ACL cache TTL has expired (30s by default)
aclObj, err = s.agent.Client().ResolveToken(secret)
enableDebug = s.agent.Client().GetConfig().EnableDebug
}
Expand Down
104 changes: 67 additions & 37 deletions command/operator_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type OperatorDebugCommand struct {
interval time.Duration
pprofDuration time.Duration
logLevel string
stale bool
maxNodes int
nodeClass string
nodeIDs []string
Expand All @@ -48,6 +47,8 @@ type OperatorDebugCommand struct {
manifest []string
ctx context.Context
cancel context.CancelFunc

opts *api.QueryOptions
}

const (
Expand Down Expand Up @@ -140,7 +141,7 @@ Debug Options:
nodes at "log-level". Defaults to 2m.
-interval=<interval>
The interval between snapshots of the Nomad state. Set interval equal to
The interval between snapshots of the Nomad state. Set interval equal to
duration to capture a single snapshot. Defaults to 30s.
-log-level=<level>
Expand Down Expand Up @@ -172,7 +173,7 @@ Debug Options:
necessary to get the configuration from a non-leader server.
-output=<path>
Path to the parent directory of the output directory. If specified, no
Path to the parent directory of the output directory. If specified, no
archive is built. Defaults to the current directory.
`
return strings.TrimSpace(helpText)
Expand Down Expand Up @@ -211,7 +212,12 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Nodes, nil)
// note we can't use the -stale flag here because we're in the
// predictor, but a stale query should be safe for prediction;
// we also can't use region forwarding because we can't rely
// on the server being up
resp, _, err := client.Search().PrefixSearch(
a.Last, contexts.Nodes, &api.QueryOptions{AllowStale: true})
if err != nil {
return []string{}
}
Expand All @@ -228,7 +234,11 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
return nil
}

nodes, _, err := client.Nodes().List(nil) // TODO: should be *api.QueryOptions that matches region
// note we can't use the -stale flag here because we're in the
// predictor, but a stale query should be safe for prediction;
// we also can't use region forwarding because we can't rely
// on the server being up
nodes, _, err := client.Nodes().List(&api.QueryOptions{AllowStale: true})
if err != nil {
return []string{}
}
Expand Down Expand Up @@ -259,7 +269,12 @@ func ServerPredictor(factory ApiClientFactory) complete.Predictor {
if err != nil {
return nil
}
members, err := client.Agent().Members()

// note we can't use the -stale flag here because we're in the
// predictor, but a stale query should be safe for prediction;
// we also can't use region forwarding because we can't rely
// on the server being up
members, err := client.Agent().MembersOpts(&api.QueryOptions{AllowStale: true})
if err != nil {
return []string{}
}
Expand All @@ -276,6 +291,14 @@ func ServerPredictor(factory ApiClientFactory) complete.Predictor {
})
}

// queryOpts returns a copy of the shared api.QueryOptions so
// that api package methods can safely modify the options
func (c *OperatorDebugCommand) queryOpts() *api.QueryOptions {
qo := new(api.QueryOptions)
*qo = *c.opts
return qo
}

func (c *OperatorDebugCommand) Name() string { return "debug" }

func (c *OperatorDebugCommand) Run(args []string) int {
Expand All @@ -284,6 +307,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {

var duration, interval, output, pprofDuration string
var nodeIDs, serverIDs string
var allowStale bool

flags.StringVar(&duration, "duration", "2m", "")
flags.StringVar(&interval, "interval", "30s", "")
Expand All @@ -292,7 +316,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags.StringVar(&c.nodeClass, "node-class", "", "")
flags.StringVar(&nodeIDs, "node-id", "all", "")
flags.StringVar(&serverIDs, "server-id", "all", "")
flags.BoolVar(&c.stale, "stale", false, "")
flags.BoolVar(&allowStale, "stale", false, "")
flags.StringVar(&output, "output", "", "")
flags.StringVar(&pprofDuration, "pprof-duration", "1s", "")

Expand Down Expand Up @@ -403,6 +427,12 @@ func (c *OperatorDebugCommand) Run(args []string) int {
return 1
}

c.opts = &api.QueryOptions{
Region: c.Meta.region,
AllowStale: allowStale,
AuthToken: c.Meta.token,
}

// Search all nodes If a node class is specified without a list of node id prefixes
if c.nodeClass != "" && nodeIDs == "" {
nodeIDs = "all"
Expand All @@ -421,7 +451,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
// Capture from nodes starting with prefix id
id = sanitizeUUIDPrefix(id)
}
nodes, _, err := client.Nodes().PrefixList(id)
nodes, _, err := client.Nodes().PrefixListOpts(id, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err))
return 1
Expand Down Expand Up @@ -466,7 +496,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
}

// Resolve servers
members, err := client.Agent().Members()
members, err := client.Agent().MembersOpts(c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to retrieve server list; err: %v", err))
return 1
Expand Down Expand Up @@ -559,8 +589,7 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
self, err := client.Agent().Self()
c.writeJSON(clusterDir, "agent-self.json", self, err)

var qo *api.QueryOptions
namespaces, _, err := client.Namespaces().List(qo)
namespaces, _, err := client.Namespaces().List(c.queryOpts())
c.writeJSON(clusterDir, "namespaces.json", namespaces, err)

regions, err := client.Regions().List()
Expand Down Expand Up @@ -635,6 +664,7 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client *
idKey: nodeID,
"log_level": c.logLevel,
},
AllowStale: c.queryOpts().AllowStale,
}

outCh, errCh := client.Agent().Monitor(c.ctx.Done(), &qo)
Expand Down Expand Up @@ -672,9 +702,9 @@ func (c *OperatorDebugCommand) collectAgentHost(path, id string, client *api.Cli
var host *api.HostDataResponse
var err error
if path == serverDir {
host, err = client.Agent().Host(id, "", nil)
host, err = client.Agent().Host(id, "", c.queryOpts())
} else {
host, err = client.Agent().Host("", id, nil)
host, err = client.Agent().Host("", id, c.queryOpts())
}

if err != nil {
Expand Down Expand Up @@ -714,7 +744,7 @@ func (c *OperatorDebugCommand) collectPprof(path, id string, client *api.Client)

path = filepath.Join(path, id)

bs, err := client.Agent().CPUProfile(opts, nil)
bs, err := client.Agent().CPUProfile(opts, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("%s: Failed to retrieve pprof profile.prof, err: %v", path, err))
if structs.IsErrPermissionDenied(err) {
Expand Down Expand Up @@ -763,7 +793,7 @@ func (c *OperatorDebugCommand) savePprofProfile(path string, profile string, opt
fileName = fmt.Sprintf("%s-debug%d.txt", profile, opts.Debug)
}

bs, err := retrievePprofProfile(profile, opts, client)
bs, err := retrievePprofProfile(profile, opts, client, c.queryOpts())
if err != nil {
c.Ui.Error(fmt.Sprintf("%s: Failed to retrieve pprof %s, err: %s", path, fileName, err.Error()))
}
Expand All @@ -774,22 +804,23 @@ func (c *OperatorDebugCommand) savePprofProfile(path string, profile string, opt
}
}

// retrievePprofProfile gets a pprof profile from the node specified in opts using the API client
func retrievePprofProfile(profile string, opts api.PprofOptions, client *api.Client) (bs []byte, err error) {
// retrievePprofProfile gets a pprof profile from the node specified
// in opts using the API client
func retrievePprofProfile(profile string, opts api.PprofOptions, client *api.Client, qopts *api.QueryOptions) (bs []byte, err error) {
switch profile {
case "cpuprofile":
bs, err = client.Agent().CPUProfile(opts, nil)
bs, err = client.Agent().CPUProfile(opts, qopts)
case "trace":
bs, err = client.Agent().Trace(opts, nil)
bs, err = client.Agent().Trace(opts, qopts)
default:
bs, err = client.Agent().Lookup(profile, opts, nil)
bs, err = client.Agent().Lookup(profile, opts, qopts)
}

return bs, err
}

// collectPeriodic runs for duration, capturing the cluster state every interval. It flushes and stops
// the monitor requests
// collectPeriodic runs for duration, capturing the cluster state
// every interval. It flushes and stops the monitor requests
func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) {
duration := time.After(c.duration)
// Set interval to 0 so that we immediately execute, wait the interval next time
Expand Down Expand Up @@ -820,61 +851,60 @@ func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) {

// collectOperator captures some cluster meta information
func (c *OperatorDebugCommand) collectOperator(dir string, client *api.Client) {
rc, err := client.Operator().RaftGetConfiguration(nil)
rc, err := client.Operator().RaftGetConfiguration(c.queryOpts())
c.writeJSON(dir, "operator-raft.json", rc, err)

sc, _, err := client.Operator().SchedulerGetConfiguration(nil)
sc, _, err := client.Operator().SchedulerGetConfiguration(c.queryOpts())
c.writeJSON(dir, "operator-scheduler.json", sc, err)

ah, _, err := client.Operator().AutopilotServerHealth(nil)
ah, _, err := client.Operator().AutopilotServerHealth(c.queryOpts())
c.writeJSON(dir, "operator-autopilot-health.json", ah, err)

lic, _, err := client.Operator().LicenseGet(nil)
lic, _, err := client.Operator().LicenseGet(c.queryOpts())
c.writeJSON(dir, "license.json", lic, err)
}

// collectNomad captures the nomad cluster state
func (c *OperatorDebugCommand) collectNomad(dir string, client *api.Client) error {
var qo *api.QueryOptions

js, _, err := client.Jobs().List(qo)
js, _, err := client.Jobs().List(c.queryOpts())
c.writeJSON(dir, "jobs.json", js, err)

ds, _, err := client.Deployments().List(qo)
ds, _, err := client.Deployments().List(c.queryOpts())
c.writeJSON(dir, "deployments.json", ds, err)

es, _, err := client.Evaluations().List(qo)
es, _, err := client.Evaluations().List(c.queryOpts())
c.writeJSON(dir, "evaluations.json", es, err)

as, _, err := client.Allocations().List(qo)
as, _, err := client.Allocations().List(c.queryOpts())
c.writeJSON(dir, "allocations.json", as, err)

ns, _, err := client.Nodes().List(qo)
ns, _, err := client.Nodes().List(c.queryOpts())
c.writeJSON(dir, "nodes.json", ns, err)

// CSI Plugins - /v1/plugins?type=csi
ps, _, err := client.CSIPlugins().List(qo)
ps, _, err := client.CSIPlugins().List(c.queryOpts())
c.writeJSON(dir, "csi-plugins.json", ps, err)

// CSI Plugin details - /v1/plugin/csi/:plugin_id
for _, p := range ps {
csiPlugin, _, err := client.CSIPlugins().Info(p.ID, qo)
csiPlugin, _, err := client.CSIPlugins().Info(p.ID, c.queryOpts())
csiPluginFileName := fmt.Sprintf("csi-plugin-id-%s.json", p.ID)
c.writeJSON(dir, csiPluginFileName, csiPlugin, err)
}

// CSI Volumes - /v1/volumes?type=csi
csiVolumes, _, err := client.CSIVolumes().List(qo)
csiVolumes, _, err := client.CSIVolumes().List(c.queryOpts())
c.writeJSON(dir, "csi-volumes.json", csiVolumes, err)

// CSI Volume details - /v1/volumes/csi/:volume-id
for _, v := range csiVolumes {
csiVolume, _, err := client.CSIVolumes().Info(v.ID, qo)
csiVolume, _, err := client.CSIVolumes().Info(v.ID, c.queryOpts())
csiFileName := fmt.Sprintf("csi-volume-id-%s.json", v.ID)
c.writeJSON(dir, csiFileName, csiVolume, err)
}

metrics, _, err := client.Operator().MetricsSummary(qo)
metrics, _, err := client.Operator().MetricsSummary(c.queryOpts())
c.writeJSON(dir, "metrics.json", metrics, err)

return nil
Expand Down
Loading

0 comments on commit 77dd8ed

Please sign in to comment.