From e5e9cf6a44f3e4ccc554bd2036f8d3d97c476052 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 16 Sep 2022 15:20:56 -0400 Subject: [PATCH] operator debug: write NJSON for large collections The `operator debug` command writes JSON files from API responses as a single line containing an array of JSON objects. But some of these files can be extremely large (GB's) for large production clusters, which makes it difficult to parse them using typical line-oriented Unix command line tools that can stream their inputs without consuming a lot of memory. For collections that are typically large, instead emit newline-delimited JSON. This changeset includes some first-pass refactoring of this command. It breaks up monolithic methods that validate a path, create a file, serialize objects, and write them to disk into smaller functions, some of which can now be standalone to take advantage of generics. --- command/operator_debug.go | 203 ++++++++++++++++++++++++++++++++------ 1 file changed, 173 insertions(+), 30 deletions(-) diff --git a/command/operator_debug.go b/command/operator_debug.go index 7966f75748d3..fb2620ddc705 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -514,7 +514,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { } // Write nodes to file - c.writeJSON(clusterDir, "nodes.json", c.nodes, err) + c.reportErr(writeResponseToFile(c.nodes, c.newFile(clusterDir, "nodes.json"))) // Search all nodes If a node class is specified without a list of node id prefixes if c.nodeClass != "" && nodeIDs == "" { @@ -586,7 +586,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { } // Write complete list of server members to file - c.writeJSON(clusterDir, "members.json", c.members, err) + c.reportErr(writeResponseToFile(c.members, c.newFile(clusterDir, "members.json"))) // Get leader and write to file; there's no option for AllowStale // on this API and a stale result wouldn't even be meaningful, so @@ -597,7 +597,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { c.Ui.Warn(fmt.Sprintf("Failed to retrieve leader; err: %v", err)) } if len(leader) > 0 { - c.writeJSON(clusterDir, "leader.json", leader, err) + c.reportErr(writeResponseToFile(leader, c.newFile(clusterDir, "leader.json"))) } // Filter for servers matching criteria @@ -689,13 +689,16 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error { // Collect cluster data self, err := client.Agent().Self() - c.writeJSON(clusterDir, "agent-self.json", self, err) + c.reportErr(writeResponseOrErrorToFile( + self, err, c.newFile(clusterDir, "agent-self.json"))) namespaces, _, err := client.Namespaces().List(c.queryOpts()) - c.writeJSON(clusterDir, "namespaces.json", namespaces, err) + c.reportErr(writeResponseOrErrorToFile( + namespaces, err, c.newFile(clusterDir, "namespaces.json"))) regions, err := client.Regions().List() - c.writeJSON(clusterDir, "regions.json", regions, err) + c.reportErr(writeResponseOrErrorToFile( + regions, err, c.newFile(clusterDir, "regions.json"))) // Collect data from Consul if c.consul.addrVal == "" { @@ -921,7 +924,7 @@ func (c *OperatorDebugCommand) collectAgentHost(path, id string, client *api.Cli } path = filepath.Join(path, id) - c.writeJSON(path, "agent-host.json", host, err) + c.reportErr(writeResponseToFile(host, c.newFile(path, "agent-host.json"))) } func (c *OperatorDebugCommand) collectPeriodicPprofs(client *api.Client) { @@ -1103,60 +1106,62 @@ func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) { // collectOperator captures some cluster meta information func (c *OperatorDebugCommand) collectOperator(dir string, client *api.Client) { rc, err := client.Operator().RaftGetConfiguration(c.queryOpts()) - c.writeJSON(dir, "operator-raft.json", rc, err) + c.reportErr(writeResponseOrErrorToFile(rc, err, c.newFile(dir, "operator-raft.json"))) sc, _, err := client.Operator().SchedulerGetConfiguration(c.queryOpts()) - c.writeJSON(dir, "operator-scheduler.json", sc, err) + c.reportErr(writeResponseOrErrorToFile(sc, err, c.newFile(dir, "operator-scheduler.json"))) ah, _, err := client.Operator().AutopilotServerHealth(c.queryOpts()) - c.writeJSON(dir, "operator-autopilot-health.json", ah, err) + c.reportErr(writeResponseOrErrorToFile( + ah, err, c.newFile(dir, "operator-autopilot-health.json"))) lic, _, err := client.Operator().LicenseGet(c.queryOpts()) - c.writeJSON(dir, "license.json", lic, err) + c.reportErr(writeResponseOrErrorToFile(lic, err, c.newFile(dir, "license.json"))) } // collectNomad captures the nomad cluster state func (c *OperatorDebugCommand) collectNomad(dir string, client *api.Client) error { js, _, err := client.Jobs().List(c.queryOpts()) - c.writeJSON(dir, "jobs.json", js, err) + c.reportErr(writeResponseStreamOrErrorToFile(js, err, c.newFile(dir, "jobs.json"))) ds, _, err := client.Deployments().List(c.queryOpts()) - c.writeJSON(dir, "deployments.json", ds, err) + c.reportErr(writeResponseStreamOrErrorToFile(ds, err, c.newFile(dir, "deployments.json"))) es, _, err := client.Evaluations().List(c.queryOpts()) - c.writeJSON(dir, "evaluations.json", es, err) + c.reportErr(writeResponseStreamOrErrorToFile(es, err, c.newFile(dir, "evaluations.json"))) as, _, err := client.Allocations().List(c.queryOpts()) - c.writeJSON(dir, "allocations.json", as, err) + c.reportErr(writeResponseStreamOrErrorToFile(as, err, c.newFile(dir, "allocations.json"))) ns, _, err := client.Nodes().List(c.queryOpts()) - c.writeJSON(dir, "nodes.json", ns, err) + c.reportErr(writeResponseStreamOrErrorToFile(ns, err, c.newFile(dir, "nodes.json"))) // CSI Plugins - /v1/plugins?type=csi ps, _, err := client.CSIPlugins().List(c.queryOpts()) - c.writeJSON(dir, "csi-plugins.json", ps, err) + c.reportErr(writeResponseStreamOrErrorToFile(ps, err, c.newFile(dir, "csi-plugins.json"))) // CSI Plugin details - /v1/plugin/csi/:plugin_id for _, p := range ps { csiPlugin, _, err := client.CSIPlugins().Info(p.ID, c.queryOpts()) csiPluginFileName := fmt.Sprintf("csi-plugin-id-%s.json", p.ID) - c.writeJSON(dir, csiPluginFileName, csiPlugin, err) + c.reportErr(writeResponseOrErrorToFile(csiPlugin, err, c.newFile(dir, csiPluginFileName))) } // CSI Volumes - /v1/volumes?type=csi csiVolumes, _, err := client.CSIVolumes().List(c.queryOpts()) - c.writeJSON(dir, "csi-volumes.json", csiVolumes, err) + c.reportErr(writeResponseStreamOrErrorToFile( + csiVolumes, err, c.newFile(dir, "csi-volumes.json"))) // CSI Volume details - /v1/volumes/csi/:volume-id for _, v := range csiVolumes { csiVolume, _, err := client.CSIVolumes().Info(v.ID, c.queryOpts()) csiFileName := fmt.Sprintf("csi-volume-id-%s.json", v.ID) - c.writeJSON(dir, csiFileName, csiVolume, err) + c.reportErr(writeResponseOrErrorToFile(csiVolume, err, c.newFile(dir, csiFileName))) } metrics, _, err := client.Operator().MetricsSummary(c.queryOpts()) - c.writeJSON(dir, "metrics.json", metrics, err) + c.reportErr(writeResponseOrErrorToFile(metrics, err, c.newFile(dir, "metrics.json"))) return nil } @@ -1293,22 +1298,155 @@ func (c *OperatorDebugCommand) writeBytes(dir, file string, data []byte) error { return nil } -// writeJSON writes JSON responses from the Nomad API calls to the archive -func (c *OperatorDebugCommand) writeJSON(dir, file string, data interface{}, err error) error { +// newFilePath returns a validated filepath rooted in the provided directory and +// path. It has been checked that it falls inside the sandbox and has been added +// to the manifest tracking. +func (c *OperatorDebugCommand) newFilePath(dir, file string) (string, error) { + + // Replace invalid characters in filename + filename := helper.CleanFilename(file, "_") + + relativePath := filepath.Join(dir, filename) + c.manifest = append(c.manifest, relativePath) + dirPath := filepath.Join(c.collectDir, dir) + filePath := filepath.Join(dirPath, filename) + + // Ensure parent directories exist + err := os.MkdirAll(dirPath, os.ModePerm) if err != nil { - return c.writeError(dir, file, err) + return "", fmt.Errorf("failed to create parent directories of %q: %w", dirPath, err) } - bytes, err := json.Marshal(data) + + // Ensure filename doesn't escape the sandbox of the capture directory + escapes := escapingfs.PathEscapesSandbox(c.collectDir, filePath) + if escapes { + return "", fmt.Errorf("file path %q escapes capture directory %q", filePath, c.collectDir) + } + + return filePath, nil +} + +type writerGetter func() (io.WriteCloser, error) + +// newFile returns a func that creates a new file for writing and returns it as +// an io.WriterCloser interface. The caller is responsible for closing the +// io.Writer when its done. +// +// Note: methods cannot be generic in go, so this function returns a function +// that closes over our command so that we can still reference the command +// object's fields to validate the file. In future iterations it might be nice +// if we could move most of the command into standalone functions. +func (c *OperatorDebugCommand) newFile(dir, file string) writerGetter { + return func() (io.WriteCloser, error) { + filePath, err := c.newFilePath(dir, file) + if err != nil { + return nil, err + } + + writer, err := os.Create(filePath) + if err != nil { + return nil, fmt.Errorf("failed to create file %q: %w", filePath, err) + } + return writer, nil + } +} + +// writeResponseToFile writes a response object to a file. It returns an error +// that the caller should report to the UI. +func writeResponseToFile(obj any, getWriterFn writerGetter) error { + + writer, err := getWriterFn() if err != nil { - return c.writeError(dir, file, err) + return err } - err = c.writeBytes(dir, file, bytes) + defer writer.Close() + + err = writeJSON(obj, writer) if err != nil { - c.Ui.Error(err.Error()) + return err } return nil } +// writeResponseOrErrorToFile writes a response object to a file, or the error +// for that response if one was received. It returns an error that the caller +// should report to the UI. +func writeResponseOrErrorToFile(obj any, apiErr error, getWriterFn writerGetter) error { + + writer, err := getWriterFn() + if err != nil { + return err + } + defer writer.Close() + + if apiErr != nil { + obj = errorWrapper{Error: apiErr.Error()} + } + + err = writeJSON(obj, writer) + if err != nil { + return err + } + return nil +} + +// writeResponseStreamOrErrorToFile writes a stream of response objects to a +// file in newline-delimited JSON format, or the error for that response if one +// was received. It returns an error that the caller should report to the UI. +func writeResponseStreamOrErrorToFile[T any](obj []T, apiErr error, getWriterFn writerGetter) error { + + writer, err := getWriterFn() + if err != nil { + return err + } + defer writer.Close() + + if apiErr != nil { + wrapped := errorWrapper{Error: err.Error()} + return writeJSON(wrapped, writer) + } + + err = writeNDJSON(obj, writer) + if err != nil { + return err + } + return nil +} + +// writeNDJSON writes a single Nomad API objects (or response error) to the +// archive file as a JSON object. +func writeJSON(obj any, writer io.Writer) error { + buf, err := json.Marshal(obj) + if err != nil { + buf, err = json.Marshal(errorWrapper{Error: err.Error()}) + if err != nil { + return fmt.Errorf("could not serialize our own error: %v", err) + } + } + n, err := writer.Write(buf) + if err != nil { + return fmt.Errorf("write error, wrote %d bytes of %d: %v", n, len(buf), err) + } + return nil +} + +// writeNDJSON writes a slice of Nomad API objects to the archive file as +// newline-delimited JSON objects. +func writeNDJSON[T any](data []T, writer io.Writer) error { + for _, obj := range data { + err := writeJSON(obj, writer) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + _, err = writer.Write([]byte{'\n'}) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + } + + return nil +} + // writeError writes a JSON error object to capture errors in the debug bundle without // reporting func (c *OperatorDebugCommand) writeError(dir, file string, err error) error { @@ -1359,7 +1497,6 @@ type flagExport struct { // writeFlags exports the CLI flags to JSON file func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) { - // c.writeJSON(clusterDir, "cli-flags-complete.json", flags, nil) var f flagExport f.Name = flags.Name() @@ -1384,7 +1521,13 @@ func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) { f.Actual[flag.Name] = flag }) - c.writeJSON(clusterDir, "cli-flags.json", f, nil) + c.reportErr(writeResponseToFile(f, c.newFile(clusterDir, "cli-flags.json"))) +} + +func (c *OperatorDebugCommand) reportErr(err error) { + if err != nil { + c.Ui.Error(err.Error()) + } } // writeManifest creates the index files