diff --git a/internal/pkg/dashboard/components/logviewer.go b/internal/pkg/dashboard/components/logviewer.go index 332952d381..44a918cb42 100644 --- a/internal/pkg/dashboard/components/logviewer.go +++ b/internal/pkg/dashboard/components/logviewer.go @@ -23,6 +23,7 @@ func NewLogViewer() *LogViewer { } widget.logs.ScrollToEnd(). + SetDynamicColors(true). SetMaxLines(maxLogLines). SetText(noData). SetBorderPadding(0, 0, 1, 1). @@ -54,7 +55,12 @@ func NewLogViewer() *LogViewer { } // WriteLog writes the log line to the widget. -func (widget *LogViewer) WriteLog(logLine string) { +func (widget *LogViewer) WriteLog(logLine, logError string) { + if logError != "" { + logLine = "[red]" + tview.Escape(logError) + "[-]\n" + } else { + logLine = tview.Escape(logLine) + "\n" + } + widget.logs.Write([]byte(logLine)) //nolint:errcheck - widget.logs.Write([]byte("\n")) //nolint:errcheck } diff --git a/internal/pkg/dashboard/dashboard.go b/internal/pkg/dashboard/dashboard.go index 13af827cf9..7d7775db76 100644 --- a/internal/pkg/dashboard/dashboard.go +++ b/internal/pkg/dashboard/dashboard.go @@ -70,7 +70,7 @@ type ResourceDataListener interface { // LogDataListener is a listener which is notified when a log line is received. type LogDataListener interface { - OnLogDataChange(node string, logLine string) + OnLogDataChange(node, logLine, logError string) } // NodeSelectListener is a listener which is notified when a node is selected. @@ -376,10 +376,7 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error { defer d.resourceDataSource.Stop() //nolint:errcheck // start logs data source - if err := d.logDataSource.Start(ctx); err != nil { - return err - } - + d.logDataSource.Start(ctx) defer d.logDataSource.Stop() //nolint:errcheck lastLogTime := time.Now() @@ -393,11 +390,11 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error { if time.Since(lastLogTime) < 50*time.Millisecond { d.app.QueueUpdate(func() { - d.processLog(nodeAlias, nodeLog.Log) + d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error) }) } else { d.app.QueueUpdateDraw(func() { - d.processLog(nodeAlias, nodeLog.Log) + d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error) }) } @@ -461,9 +458,9 @@ func (d *Dashboard) processNodeResource(nodeResource resourcedata.Data) { } // processLog re-renders the log components with new log data. -func (d *Dashboard) processLog(node, line string) { +func (d *Dashboard) processLog(node, logLine, logError string) { for _, component := range d.logDataListeners { - component.OnLogDataChange(node, line) + component.OnLogDataChange(node, logLine, logError) } } diff --git a/internal/pkg/dashboard/logdata/logdata.go b/internal/pkg/dashboard/logdata/logdata.go index 99b5cafccd..7bd20f817c 100644 --- a/internal/pkg/dashboard/logdata/logdata.go +++ b/internal/pkg/dashboard/logdata/logdata.go @@ -8,20 +8,26 @@ package logdata import ( "context" "errors" + "fmt" "strings" "sync" + "time" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers" + "github.com/siderolabs/talos/internal/pkg/dashboard/util" "github.com/siderolabs/talos/pkg/machinery/api/common" "github.com/siderolabs/talos/pkg/machinery/client" ) // Data is a log line from a node. type Data struct { - Node string - Log string + Node string + Log string + Error string } // Source is a data source for Kernel (dmesg) logs. @@ -45,14 +51,10 @@ func NewSource(client *client.Client) *Source { } // Start starts the data source. -func (source *Source) Start(ctx context.Context) error { - var err error - +func (source *Source) Start(ctx context.Context) { source.once.Do(func() { - err = source.start(ctx) + source.start(ctx) }) - - return err } // Stop stops the data source. @@ -62,38 +64,70 @@ func (source *Source) Stop() error { return source.eg.Wait() } -func (source *Source) start(ctx context.Context) error { +func (source *Source) start(ctx context.Context) { ctx, source.logCtxCancel = context.WithCancel(ctx) - dmesgStream, err := source.client.Dmesg(ctx, true, false) - if err != nil { - return err + for _, nodeContext := range util.NodeContexts(ctx) { + source.eg.Go(func() error { + return source.tailNodeWithRetries(nodeContext.Ctx, nodeContext.Node) + }) } +} - source.eg.Go(func() error { - return helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, node string, multipleNodes bool) error { - if len(data.Bytes) == 0 { - return nil - } +func (source *Source) tailNodeWithRetries(ctx context.Context, node string) error { + for { + readErr := source.readDmesg(ctx, node) + if errors.Is(readErr, context.Canceled) || status.Code(readErr) == codes.Canceled { + return nil + } - line := strings.TrimSpace(string(data.Bytes)) - if line == "" { - return nil - } + if readErr != nil { + source.LogCh <- Data{Node: node, Error: readErr.Error()} + } - select { - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } + // back off a bit before retrying + sleepWithContext(ctx, 30*time.Second) + } +} - return ctx.Err() - case source.LogCh <- Data{Node: node, Log: line}: - } +func (source *Source) readDmesg(ctx context.Context, node string) error { + dmesgStream, err := source.client.Dmesg(ctx, true, false) + if err != nil { + return fmt.Errorf("dashboard: error opening dmesg stream: %w", err) + } + readErr := helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, _ string, _ bool) error { + if len(data.Bytes) == 0 { return nil - }) + } + + line := strings.TrimSpace(string(data.Bytes)) + if line == "" { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case source.LogCh <- Data{Node: node, Log: line}: + } + + return nil }) + if readErr != nil { + return fmt.Errorf("error reading dmesg stream: %w", readErr) + } return nil } + +func sleepWithContext(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + } +} diff --git a/internal/pkg/dashboard/resourcedata/resourcedata.go b/internal/pkg/dashboard/resourcedata/resourcedata.go index 6075b70489..9786790533 100644 --- a/internal/pkg/dashboard/resourcedata/resourcedata.go +++ b/internal/pkg/dashboard/resourcedata/resourcedata.go @@ -16,9 +16,8 @@ import ( "github.com/cosi-project/runtime/pkg/state" "github.com/siderolabs/gen/channel" "golang.org/x/sync/errgroup" - "google.golang.org/grpc/metadata" - "github.com/siderolabs/talos/pkg/machinery/client" + "github.com/siderolabs/talos/internal/pkg/dashboard/util" "github.com/siderolabs/talos/pkg/machinery/constants" "github.com/siderolabs/talos/pkg/machinery/resources/cluster" "github.com/siderolabs/talos/pkg/machinery/resources/config" @@ -70,10 +69,9 @@ func (source *Source) run(ctx context.Context) { source.NodeResourceCh = source.ch - nodes := source.nodes(ctx) - for _, node := range nodes { + for _, nodeContext := range util.NodeContexts(ctx) { source.eg.Go(func() error { - source.runResourceWatchWithRetries(ctx, node) + source.runResourceWatchWithRetries(nodeContext.Ctx, nodeContext.Node) return nil }) @@ -101,10 +99,6 @@ func (source *Source) runResourceWatchWithRetries(ctx context.Context, node stri //nolint:gocyclo,cyclop func (source *Source) runResourceWatch(ctx context.Context, node string) error { - if node != "" { - ctx = client.WithNode(ctx, node) - } - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -211,22 +205,3 @@ func (source *Source) runResourceWatch(ctx context.Context, node string) error { } } } - -func (source *Source) nodes(ctx context.Context) []string { - md, mdOk := metadata.FromOutgoingContext(ctx) - if !mdOk { - return []string{""} // local node - } - - nodeVal := md.Get("node") - if len(nodeVal) > 0 { - return []string{nodeVal[0]} - } - - nodesVal := md.Get("nodes") - if len(nodesVal) == 0 { - return []string{""} // local node - } - - return nodesVal -} diff --git a/internal/pkg/dashboard/summary.go b/internal/pkg/dashboard/summary.go index 03ffe24011..b8482ea4b2 100644 --- a/internal/pkg/dashboard/summary.go +++ b/internal/pkg/dashboard/summary.go @@ -91,8 +91,8 @@ func (widget *SummaryGrid) OnResourceDataChange(nodeResource resourcedata.Data) } // OnLogDataChange implements the LogDataListener interface. -func (widget *SummaryGrid) OnLogDataChange(node string, logLine string) { - widget.logViewer(node).WriteLog(logLine) +func (widget *SummaryGrid) OnLogDataChange(node, logLine, logError string) { + widget.logViewer(node).WriteLog(logLine, logError) } func (widget *SummaryGrid) updateLogViewer() { diff --git a/internal/pkg/dashboard/util/util.go b/internal/pkg/dashboard/util/util.go new file mode 100644 index 0000000000..844b222739 --- /dev/null +++ b/internal/pkg/dashboard/util/util.go @@ -0,0 +1,49 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package util provides utility functions for the dashboard. +package util + +import ( + "context" + + "google.golang.org/grpc/metadata" + + "github.com/siderolabs/talos/pkg/machinery/client" +) + +// NodeContext contains the context.Context for a single node and the node name. +type NodeContext struct { + Ctx context.Context //nolint:containedctx + Node string +} + +// NodeContexts returns a list of NodeContexts from the given context. +// +// It extracts the node names from the outgoing GRPC context metadata. +// If the node name is not present in the metadata, context will be returned as-is with an empty node name. +func NodeContexts(ctx context.Context) []NodeContext { + md, mdOk := metadata.FromOutgoingContext(ctx) + if !mdOk { + return []NodeContext{{Ctx: ctx}} + } + + nodeVal := md.Get("node") + if len(nodeVal) > 0 { + return []NodeContext{{Ctx: ctx, Node: nodeVal[0]}} + } + + nodesVal := md.Get("nodes") + if len(nodesVal) == 0 { + return []NodeContext{{Ctx: ctx}} + } + + nodeContexts := make([]NodeContext, 0, len(nodesVal)) + + for _, node := range nodesVal { + nodeContexts = append(nodeContexts, NodeContext{Ctx: client.WithNode(ctx, node), Node: node}) + } + + return nodeContexts +}