From 349501f82584aeb0128277c5d7b4412a3b595b45 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 22 Sep 2022 10:02:00 -0400 Subject: [PATCH] operator debug: write NDJSON for large collections (#14610) The `operator debug` command writes JSON files from API responses as a single line containing an array of JSON objects. But some of these files can be extremely large (GB's) for large production clusters, which makes it difficult to parse them using typical line-oriented Unix command line tools that can stream their inputs without consuming a lot of memory. For collections that are typically large, instead emit newline-delimited JSON. This changeset includes some first-pass refactoring of this command. It breaks up monolithic methods that validate a path, create a file, serialize objects, and write them to disk into smaller functions, some of which can now be standalone to take advantage of generics. --- .changelog/14610.txt | 3 + command/operator_debug.go | 215 +++++++++++++++++++++++++++++++------- 2 files changed, 182 insertions(+), 36 deletions(-) create mode 100644 .changelog/14610.txt diff --git a/.changelog/14610.txt b/.changelog/14610.txt new file mode 100644 index 000000000000..6ca389aeabf2 --- /dev/null +++ b/.changelog/14610.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: `operator debug` now writes newline-delimited JSON files for large collections +``` diff --git a/command/operator_debug.go b/command/operator_debug.go index 8916517e6eb4..b1ac4f9eeae8 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -516,7 +516,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { } // Write nodes to file - c.writeJSON(clusterDir, "nodes.json", c.nodes, err) + c.reportErr(writeResponseToFile(c.nodes, c.newFile(clusterDir, "nodes.json"))) // Search all nodes If a node class is specified without a list of node id prefixes if c.nodeClass != "" && nodeIDs == "" { @@ -588,7 +588,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { } // Write complete list of server members to file - c.writeJSON(clusterDir, "members.json", c.members, err) + c.reportErr(writeResponseToFile(c.members, c.newFile(clusterDir, "members.json"))) // Get leader and write to file; there's no option for AllowStale // on this API and a stale result wouldn't even be meaningful, so @@ -599,7 +599,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { c.Ui.Warn(fmt.Sprintf("Failed to retrieve leader; err: %v", err)) } if len(leader) > 0 { - c.writeJSON(clusterDir, "leader.json", leader, err) + c.reportErr(writeResponseToFile(leader, c.newFile(clusterDir, "leader.json"))) } // Filter for servers matching criteria @@ -691,13 +691,16 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error { // Collect cluster data self, err := client.Agent().Self() - c.writeJSON(clusterDir, "agent-self.json", self, err) + c.reportErr(writeResponseOrErrorToFile( + self, err, c.newFile(clusterDir, "agent-self.json"))) namespaces, _, err := client.Namespaces().List(c.queryOpts()) - c.writeJSON(clusterDir, "namespaces.json", namespaces, err) + c.reportErr(writeResponseOrErrorToFile( + namespaces, err, c.newFile(clusterDir, "namespaces.json"))) regions, err := client.Regions().List() - c.writeJSON(clusterDir, "regions.json", regions, err) + c.reportErr(writeResponseOrErrorToFile( + regions, err, c.newFile(clusterDir, "regions.json"))) // Collect data from Consul if c.consul.addrVal == "" { @@ -737,7 +740,7 @@ func (c *OperatorDebugCommand) mkdir(paths ...string) error { return fmt.Errorf("file path escapes capture directory") } - return os.MkdirAll(joinedPath, 0755) + return escapingfs.EnsurePath(joinedPath, true) } // startMonitors starts go routines for each node and client @@ -923,7 +926,7 @@ func (c *OperatorDebugCommand) collectAgentHost(path, id string, client *api.Cli } path = filepath.Join(path, id) - c.writeJSON(path, "agent-host.json", host, err) + c.reportErr(writeResponseToFile(host, c.newFile(path, "agent-host.json"))) } func (c *OperatorDebugCommand) collectPeriodicPprofs(client *api.Client) { @@ -1105,60 +1108,62 @@ func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) { // collectOperator captures some cluster meta information func (c *OperatorDebugCommand) collectOperator(dir string, client *api.Client) { rc, err := client.Operator().RaftGetConfiguration(c.queryOpts()) - c.writeJSON(dir, "operator-raft.json", rc, err) + c.reportErr(writeResponseOrErrorToFile(rc, err, c.newFile(dir, "operator-raft.json"))) sc, _, err := client.Operator().SchedulerGetConfiguration(c.queryOpts()) - c.writeJSON(dir, "operator-scheduler.json", sc, err) + c.reportErr(writeResponseOrErrorToFile(sc, err, c.newFile(dir, "operator-scheduler.json"))) ah, _, err := client.Operator().AutopilotServerHealth(c.queryOpts()) - c.writeJSON(dir, "operator-autopilot-health.json", ah, err) + c.reportErr(writeResponseOrErrorToFile( + ah, err, c.newFile(dir, "operator-autopilot-health.json"))) lic, _, err := client.Operator().LicenseGet(c.queryOpts()) - c.writeJSON(dir, "license.json", lic, err) + c.reportErr(writeResponseOrErrorToFile(lic, err, c.newFile(dir, "license.json"))) } // collectNomad captures the nomad cluster state func (c *OperatorDebugCommand) collectNomad(dir string, client *api.Client) error { js, _, err := client.Jobs().List(c.queryOpts()) - c.writeJSON(dir, "jobs.json", js, err) + c.reportErr(writeResponseStreamOrErrorToFile(js, err, c.newFile(dir, "jobs.json"))) ds, _, err := client.Deployments().List(c.queryOpts()) - c.writeJSON(dir, "deployments.json", ds, err) + c.reportErr(writeResponseStreamOrErrorToFile(ds, err, c.newFile(dir, "deployments.json"))) es, _, err := client.Evaluations().List(c.queryOpts()) - c.writeJSON(dir, "evaluations.json", es, err) + c.reportErr(writeResponseStreamOrErrorToFile(es, err, c.newFile(dir, "evaluations.json"))) as, _, err := client.Allocations().List(c.queryOpts()) - c.writeJSON(dir, "allocations.json", as, err) + c.reportErr(writeResponseStreamOrErrorToFile(as, err, c.newFile(dir, "allocations.json"))) ns, _, err := client.Nodes().List(c.queryOpts()) - c.writeJSON(dir, "nodes.json", ns, err) + c.reportErr(writeResponseStreamOrErrorToFile(ns, err, c.newFile(dir, "nodes.json"))) // CSI Plugins - /v1/plugins?type=csi ps, _, err := client.CSIPlugins().List(c.queryOpts()) - c.writeJSON(dir, "csi-plugins.json", ps, err) + c.reportErr(writeResponseStreamOrErrorToFile(ps, err, c.newFile(dir, "csi-plugins.json"))) // CSI Plugin details - /v1/plugin/csi/:plugin_id for _, p := range ps { csiPlugin, _, err := client.CSIPlugins().Info(p.ID, c.queryOpts()) csiPluginFileName := fmt.Sprintf("csi-plugin-id-%s.json", p.ID) - c.writeJSON(dir, csiPluginFileName, csiPlugin, err) + c.reportErr(writeResponseOrErrorToFile(csiPlugin, err, c.newFile(dir, csiPluginFileName))) } // CSI Volumes - /v1/volumes?type=csi csiVolumes, _, err := client.CSIVolumes().List(c.queryOpts()) - c.writeJSON(dir, "csi-volumes.json", csiVolumes, err) + c.reportErr(writeResponseStreamOrErrorToFile( + csiVolumes, err, c.newFile(dir, "csi-volumes.json"))) // CSI Volume details - /v1/volumes/csi/:volume-id for _, v := range csiVolumes { csiVolume, _, err := client.CSIVolumes().Info(v.ID, c.queryOpts()) csiFileName := fmt.Sprintf("csi-volume-id-%s.json", v.ID) - c.writeJSON(dir, csiFileName, csiVolume, err) + c.reportErr(writeResponseOrErrorToFile(csiVolume, err, c.newFile(dir, csiFileName))) } metrics, _, err := client.Operator().MetricsSummary(c.queryOpts()) - c.writeJSON(dir, "metrics.json", metrics, err) + c.reportErr(writeResponseOrErrorToFile(metrics, err, c.newFile(dir, "metrics.json"))) return nil } @@ -1270,44 +1275,177 @@ func (c *OperatorDebugCommand) writeBytes(dir, file string, data []byte) error { filePath := filepath.Join(dirPath, filename) // Ensure parent directories exist - err := os.MkdirAll(dirPath, os.ModePerm) + err := escapingfs.EnsurePath(dirPath, true) if err != nil { - return fmt.Errorf("failed to create parent directories of \"%s\": %w", dirPath, err) + return fmt.Errorf("failed to create parent directories of %q: %w", dirPath, err) } // Ensure filename doesn't escape the sandbox of the capture directory escapes := escapingfs.PathEscapesSandbox(c.collectDir, filePath) if escapes { - return fmt.Errorf("file path \"%s\" escapes capture directory \"%s\"", filePath, c.collectDir) + return fmt.Errorf("file path %q escapes capture directory %q", filePath, c.collectDir) } // Create the file fh, err := os.Create(filePath) if err != nil { - return fmt.Errorf("failed to create file \"%s\", err: %w", filePath, err) + return fmt.Errorf("failed to create file %q, err: %w", filePath, err) } defer fh.Close() _, err = fh.Write(data) if err != nil { - return fmt.Errorf("Failed to write data to file \"%s\", err: %w", filePath, err) + return fmt.Errorf("Failed to write data to file %q, err: %w", filePath, err) } return nil } -// writeJSON writes JSON responses from the Nomad API calls to the archive -func (c *OperatorDebugCommand) writeJSON(dir, file string, data interface{}, err error) error { +// newFilePath returns a validated filepath rooted in the provided directory and +// path. It has been checked that it falls inside the sandbox and has been added +// to the manifest tracking. +func (c *OperatorDebugCommand) newFilePath(dir, file string) (string, error) { + + // Replace invalid characters in filename + filename := helper.CleanFilename(file, "_") + + relativePath := filepath.Join(dir, filename) + c.manifest = append(c.manifest, relativePath) + dirPath := filepath.Join(c.collectDir, dir) + filePath := filepath.Join(dirPath, filename) + + // Ensure parent directories exist + err := escapingfs.EnsurePath(dirPath, true) + if err != nil { + return "", fmt.Errorf("failed to create parent directories of %q: %w", dirPath, err) + } + + // Ensure filename doesn't escape the sandbox of the capture directory + escapes := escapingfs.PathEscapesSandbox(c.collectDir, filePath) + if escapes { + return "", fmt.Errorf("file path %q escapes capture directory %q", filePath, c.collectDir) + } + + return filePath, nil +} + +type writerGetter func() (io.WriteCloser, error) + +// newFile returns a func that creates a new file for writing and returns it as +// an io.WriterCloser interface. The caller is responsible for closing the +// io.Writer when its done. +// +// Note: methods cannot be generic in go, so this function returns a function +// that closes over our command so that we can still reference the command +// object's fields to validate the file. In future iterations it might be nice +// if we could move most of the command into standalone functions. +func (c *OperatorDebugCommand) newFile(dir, file string) writerGetter { + return func() (io.WriteCloser, error) { + filePath, err := c.newFilePath(dir, file) + if err != nil { + return nil, err + } + + writer, err := os.Create(filePath) + if err != nil { + return nil, fmt.Errorf("failed to create file %q: %w", filePath, err) + } + return writer, nil + } +} + +// writeResponseToFile writes a response object to a file. It returns an error +// that the caller should report to the UI. +func writeResponseToFile(obj any, getWriterFn writerGetter) error { + + writer, err := getWriterFn() if err != nil { - return c.writeError(dir, file, err) + return err } - bytes, err := json.Marshal(data) + defer writer.Close() + + err = writeJSON(obj, writer) if err != nil { - return c.writeError(dir, file, err) + return err } - err = c.writeBytes(dir, file, bytes) + return nil +} + +// writeResponseOrErrorToFile writes a response object to a file, or the error +// for that response if one was received. It returns an error that the caller +// should report to the UI. +func writeResponseOrErrorToFile(obj any, apiErr error, getWriterFn writerGetter) error { + + writer, err := getWriterFn() if err != nil { - c.Ui.Error(err.Error()) + return err + } + defer writer.Close() + + if apiErr != nil { + obj = errorWrapper{Error: apiErr.Error()} } + + err = writeJSON(obj, writer) + if err != nil { + return err + } + return nil +} + +// writeResponseStreamOrErrorToFile writes a stream of response objects to a +// file in newline-delimited JSON format, or the error for that response if one +// was received. It returns an error that the caller should report to the UI. +func writeResponseStreamOrErrorToFile[T any](obj []T, apiErr error, getWriterFn writerGetter) error { + + writer, err := getWriterFn() + if err != nil { + return err + } + defer writer.Close() + + if apiErr != nil { + wrapped := errorWrapper{Error: err.Error()} + return writeJSON(wrapped, writer) + } + + err = writeNDJSON(obj, writer) + if err != nil { + return err + } + return nil +} + +// writeNDJSON writes a single Nomad API objects (or response error) to the +// archive file as a JSON object. +func writeJSON(obj any, writer io.Writer) error { + buf, err := json.Marshal(obj) + if err != nil { + buf, err = json.Marshal(errorWrapper{Error: err.Error()}) + if err != nil { + return fmt.Errorf("could not serialize our own error: %v", err) + } + } + n, err := writer.Write(buf) + if err != nil { + return fmt.Errorf("write error, wrote %d bytes of %d: %v", n, len(buf), err) + } + return nil +} + +// writeNDJSON writes a slice of Nomad API objects to the archive file as +// newline-delimited JSON objects. +func writeNDJSON[T any](data []T, writer io.Writer) error { + for _, obj := range data { + err := writeJSON(obj, writer) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + _, err = writer.Write([]byte{'\n'}) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + } + return nil } @@ -1361,7 +1499,6 @@ type flagExport struct { // writeFlags exports the CLI flags to JSON file func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) { - // c.writeJSON(clusterDir, "cli-flags-complete.json", flags, nil) var f flagExport f.Name = flags.Name() @@ -1386,7 +1523,13 @@ func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) { f.Actual[flag.Name] = flag }) - c.writeJSON(clusterDir, "cli-flags.json", f, nil) + c.reportErr(writeResponseToFile(f, c.newFile(clusterDir, "cli-flags.json"))) +} + +func (c *OperatorDebugCommand) reportErr(err error) { + if err != nil { + c.Ui.Error(err.Error()) + } } // writeManifest creates the index files