diff --git a/internal/pkg/agent/control/server/server.go b/internal/pkg/agent/control/server/server.go index 366676540d9..643656dcda5 100644 --- a/internal/pkg/agent/control/server/server.go +++ b/internal/pkg/agent/control/server/server.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/control/proto" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/program" - "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats" monitoring "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats" monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/core/socket" @@ -37,6 +36,10 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +const ( + agentName = "elastic-agent" +) + // Server is the daemon side of the control protocol. type Server struct { logger *logger.Logger @@ -178,6 +181,7 @@ func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*p } cb, err := u.Upgrade(ctx, &upgradeRequest{request}, false) if err != nil { + //nolint:nilerr // error is exposed in a response return &proto.UpgradeResponse{ Status: proto.ActionStatus_FAILURE, Error: err.Error(), @@ -225,7 +229,8 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR // gather spec data for all rk/apps running specs := s.getSpecInfo("", "") for _, si := range specs { - endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + isSidecar := strings.HasSuffix(si.app, "_monitoring") + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar) client := newSocketRequester(si.app, si.rk, endpoint) procMeta := client.procMeta(ctx) @@ -258,9 +263,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr ch := make(chan *proto.PprofResult, 1) // retrieve elastic-agent pprof data if requested or application is unspecified. - if req.AppName == "" || req.AppName == "elastic-agent" { - endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) - c := newSocketRequester("elastic-agent", "", endpoint) + if req.AppName == "" || req.AppName == agentName { + endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) + c := newSocketRequester(agentName, "", endpoint) for _, opt := range req.PprofType { wg.Add(1) go func(opt proto.PprofOption) { @@ -273,11 +278,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr // get requested rk/appname spec or all specs var specs []specInfo - if req.AppName != "elastic-agent" { + if req.AppName != agentName { specs = s.getSpecInfo(req.RouteKey, req.AppName) } for _, si := range specs { - endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, false) c := newSocketRequester(si.app, si.rk, endpoint) // Launch a concurrent goroutine to gather all pprof endpoints from a socket. for _, opt := range req.PprofType { @@ -315,8 +320,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe } // gather metrics buffer data from the elastic-agent - endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) - c := newSocketRequester("elastic-agent", "", endpoint) + endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP) + c := newSocketRequester(agentName, "", endpoint) metrics := c.procMetrics(ctx) resp := &proto.ProcMetricsResponse{ @@ -326,7 +331,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe // gather metrics buffer data from all other processes specs := s.getSpecInfo("", "") for _, si := range specs { - endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk) + isSidecar := strings.HasSuffix(si.app, "_monitoring") + endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar) client := newSocketRequester(si.app, si.rk, endpoint) s.logger.Infof("gather metrics from %s", endpoint) diff --git a/internal/pkg/agent/operation/monitoring_test.go b/internal/pkg/agent/operation/monitoring_test.go index cc365cae540..06a9cfbe23b 100644 --- a/internal/pkg/agent/operation/monitoring_test.go +++ b/internal/pkg/agent/operation/monitoring_test.go @@ -212,7 +212,7 @@ type testMonitor struct { // EnrichArgs enriches arguments provided to application, in order to enable // monitoring -func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string { +func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string) []string { return args } diff --git a/internal/pkg/agent/operation/operator.go b/internal/pkg/agent/operation/operator.go index ed28b7cb633..afe19bf702b 100644 --- a/internal/pkg/agent/operation/operator.go +++ b/internal/pkg/agent/operation/operator.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/core/app" "github.com/elastic/elastic-agent/internal/pkg/core/monitoring" - "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop" + "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats" "github.com/elastic/elastic-agent/internal/pkg/core/plugin/process" "github.com/elastic/elastic-agent/internal/pkg/core/plugin/service" "github.com/elastic/elastic-agent/internal/pkg/core/state" @@ -387,7 +387,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { appName := p.BinaryName() if app.IsSidecar(p) { // make watchers unmonitorable - monitor = noop.NewMonitor() + monitor = beats.NewSidecarMonitor(o.config.DownloadConfig, o.config.MonitoringConfig) appName += "_monitoring" } diff --git a/internal/pkg/core/monitoring/beats/beats_monitor.go b/internal/pkg/core/monitoring/beats/beats_monitor.go index a513729497b..3ea17ae1384 100644 --- a/internal/pkg/core/monitoring/beats/beats_monitor.go +++ b/internal/pkg/core/monitoring/beats/beats_monitor.go @@ -5,7 +5,6 @@ package beats import ( - "fmt" "net/url" "os" "path/filepath" @@ -20,8 +19,13 @@ import ( monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" ) -const httpPlusPrefix = "http+" -const defaultMonitoringNamespace = "default" +const ( + httpPlusPrefix = "http+" + defaultMonitoringNamespace = "default" + fileSchemePrefix = "file" + unixSchemePrefix = "unix" + windowsOS = "windows" +) // Monitor implements the monitoring.Monitor interface providing information // about beats. @@ -99,15 +103,11 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics } func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string { - return MonitoringEndpoint(spec, b.operatingSystem, pipelineID) -} - -func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string { - return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID) + return MonitoringEndpoint(spec, b.operatingSystem, pipelineID, false) } func (b *Monitor) generateLoggingPath(spec program.Spec, pipelineID string) string { - return filepath.Dir(b.generateLoggingFile(spec, pipelineID)) + return filepath.Dir(getLoggingFile(spec, b.operatingSystem, pipelineID)) } func (b *Monitor) ownLoggingPath(spec program.Spec) bool { @@ -118,15 +118,10 @@ func (b *Monitor) ownLoggingPath(spec program.Spec) bool { // EnrichArgs enriches arguments provided to application, in order to enable // monitoring -func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string { +func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string { appendix := make([]string, 0, 7) - monitoringEndpoint := b.generateMonitoringEndpoint(spec, pipelineID) - if monitoringEndpoint != "" { - endpoint := monitoringEndpoint - if isSidecar { - endpoint += "_monitor" - } + if endpoint := b.generateMonitoringEndpoint(spec, pipelineID); endpoint != "" { appendix = append(appendix, "-E", "http.enabled=true", "-E", "http.host="+endpoint, @@ -146,10 +141,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string loggingPath := b.generateLoggingPath(spec, pipelineID) if loggingPath != "" { logFile := spec.Cmd - if isSidecar { - logFile += "_monitor" - } - logFile = fmt.Sprintf("%s", logFile) appendix = append(appendix, "-E", "logging.files.path="+loggingPath, "-E", "logging.files.name="+logFile, @@ -224,7 +215,7 @@ func (b *Monitor) LogPath(spec program.Spec, pipelineID string) string { return "" } - return b.generateLoggingFile(spec, pipelineID) + return getLoggingFile(spec, b.operatingSystem, pipelineID) } // MetricsPath describes a location where application exposes metrics @@ -272,15 +263,15 @@ func monitoringDrop(path string) (drop string) { } u, _ := url.Parse(path) - if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") { + if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) { return "" } - if u.Scheme == "file" { + if u.Scheme == fileSchemePrefix { return strings.TrimPrefix(path, "file://") } - if u.Scheme == "unix" { + if u.Scheme == unixSchemePrefix { return strings.TrimPrefix(path, "unix://") } @@ -299,7 +290,7 @@ func isWindowsPath(path string) bool { } func changeOwner(path string, uid, gid int) error { - if runtime.GOOS == "windows" { + if runtime.GOOS == windowsOS { // on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError return nil } diff --git a/internal/pkg/core/monitoring/beats/monitoring.go b/internal/pkg/core/monitoring/beats/monitoring.go index a724e6f4246..94f3078ddee 100644 --- a/internal/pkg/core/monitoring/beats/monitoring.go +++ b/internal/pkg/core/monitoring/beats/monitoring.go @@ -27,19 +27,27 @@ const ( agentMbEndpointFileFormatWin = `npipe:///elastic-agent` // agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint agentMbEndpointHTTP = "http://%s:%d" + + monitorSuffix = "_monitor" ) // MonitoringEndpoint is an endpoint where process is exposing its metrics. -func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { +func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string, isSidecar bool) (endpointPath string) { + defer func() { + if isSidecar && endpointPath != "" { + endpointPath += monitorSuffix + } + }() + if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok { return endpoint } - if operatingSystem == "windows" { + if operatingSystem == windowsOS { return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd) } // unix socket path must be less than 104 characters path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd)) - if len(path) < 104 { + if (isSidecar && len(path) < 104-len(monitorSuffix)) || (!isSidecar && len(path) < 104) { return path } // place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long @@ -47,11 +55,11 @@ func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) s return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } -func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string { +func getLoggingFile(spec program.Spec, operatingSystem, pipelineID string) string { if path, ok := spec.LogPaths[operatingSystem]; ok { return path } - if operatingSystem == "windows" { + if operatingSystem == windowsOS { return fmt.Sprintf(logFileFormatWin, paths.Home(), pipelineID, spec.Cmd) } return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd) @@ -63,7 +71,7 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit return fmt.Sprintf(agentMbEndpointHTTP, cfg.Host, cfg.Port) } - if operatingSystem == "windows" { + if operatingSystem == windowsOS { return agentMbEndpointFileFormatWin } // unix socket path must be less than 104 characters diff --git a/internal/pkg/core/monitoring/beats/sidecar_monitor.go b/internal/pkg/core/monitoring/beats/sidecar_monitor.go new file mode 100644 index 00000000000..aa249bafa0f --- /dev/null +++ b/internal/pkg/core/monitoring/beats/sidecar_monitor.go @@ -0,0 +1,145 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beats + +import ( + "fmt" + "os" + + "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/internal/pkg/agent/program" + "github.com/elastic/elastic-agent/internal/pkg/artifact" + "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +// SidecarMonitor provides information about the way how beat is monitored +type SidecarMonitor struct { + operatingSystem string + config *monitoringConfig.MonitoringConfig +} + +// NewSidecarMonitor creates a beats sidecar monitor, functionality is restricted purely on exposing +// http endpoint for diagnostics. +func NewSidecarMonitor(downloadConfig *artifact.Config, monitoringCfg *monitoringConfig.MonitoringConfig) *SidecarMonitor { + if monitoringCfg == nil { + monitoringCfg = monitoringConfig.DefaultConfig() + monitoringCfg.Pprof = &monitoringConfig.PprofConfig{Enabled: false} + monitoringCfg.HTTP.Buffer = &monitoringConfig.BufferConfig{Enabled: false} + } + + return &SidecarMonitor{ + operatingSystem: downloadConfig.OS(), + config: monitoringCfg, + } +} + +// Reload reloads state of the monitoring based on config. +func (b *SidecarMonitor) Reload(rawConfig *config.Config) error { + cfg := configuration.DefaultConfiguration() + if err := rawConfig.Unpack(&cfg); err != nil { + return err + } + + if cfg == nil || cfg.Settings == nil || cfg.Settings.MonitoringConfig == nil { + b.config = monitoringConfig.DefaultConfig() + } else { + if cfg.Settings.MonitoringConfig.Pprof == nil { + cfg.Settings.MonitoringConfig.Pprof = b.config.Pprof + } + if cfg.Settings.MonitoringConfig.HTTP.Buffer == nil { + cfg.Settings.MonitoringConfig.HTTP.Buffer = b.config.HTTP.Buffer + } + b.config = cfg.Settings.MonitoringConfig + } + + return nil +} + +// EnrichArgs enriches arguments provided to application, in order to enable +// monitoring +func (b *SidecarMonitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string { + appendix := make([]string, 0, 7) + + if endpoint := MonitoringEndpoint(spec, b.operatingSystem, pipelineID, true); endpoint != "" { + appendix = append(appendix, + "-E", "http.enabled=true", + "-E", "http.host="+endpoint, + ) + if b.config.Pprof != nil && b.config.Pprof.Enabled { + appendix = append(appendix, + "-E", "http.pprof.enabled=true", + ) + } + if b.config.HTTP.Buffer != nil && b.config.HTTP.Buffer.Enabled { + appendix = append(appendix, + "-E", "http.buffer.enabled=true", + ) + } + } + + return append(args, appendix...) +} + +// Cleanup cleans up all drops. +func (b *SidecarMonitor) Cleanup(spec program.Spec, pipelineID string) error { + endpoint := MonitoringEndpoint(spec, b.operatingSystem, pipelineID, true) + drop := monitoringDrop(endpoint) + + return os.RemoveAll(drop) +} + +// Close disables monitoring +func (b *SidecarMonitor) Close() { + b.config.Enabled = false + b.config.MonitorMetrics = false + b.config.MonitorLogs = false +} + +// Prepare executes steps in order for monitoring to work correctly +func (b *SidecarMonitor) Prepare(spec program.Spec, pipelineID string, uid, gid int) error { + endpoint := MonitoringEndpoint(spec, b.operatingSystem, pipelineID, true) + drop := monitoringDrop(endpoint) + + if err := os.MkdirAll(drop, 0775); err != nil { + return errors.New(err, fmt.Sprintf("failed to create a directory %q", drop)) + } + + if err := changeOwner(drop, uid, gid); err != nil { + return errors.New(err, fmt.Sprintf("failed to change owner of a directory %q", drop)) + } + + return nil +} + +// LogPath describes a path where application stores logs. Empty if +// application is not monitorable +func (b *SidecarMonitor) LogPath(program.Spec, string) string { + return "" +} + +// MetricsPath describes a location where application exposes metrics +// collectable by metricbeat. +func (b *SidecarMonitor) MetricsPath(program.Spec, string) string { + return "" +} + +// MetricsPathPrefixed return metrics path prefixed with http+ prefix. +func (b *SidecarMonitor) MetricsPathPrefixed(program.Spec, string) string { + return "" +} + +// IsMonitoringEnabled returns true if monitoring is configured. +func (b *SidecarMonitor) IsMonitoringEnabled() bool { return false } + +// WatchLogs return true if monitoring is configured and monitoring logs is enabled. +func (b *SidecarMonitor) WatchLogs() bool { return false } + +// WatchMetrics return true if monitoring is configured and monitoring metrics is enabled. +func (b *SidecarMonitor) WatchMetrics() bool { return false } + +// MonitoringNamespace returns monitoring namespace configured. +func (b *SidecarMonitor) MonitoringNamespace() string { return "default" } diff --git a/internal/pkg/core/monitoring/monitor.go b/internal/pkg/core/monitoring/monitor.go index 2c87e384976..2c6ff4bd5bc 100644 --- a/internal/pkg/core/monitoring/monitor.go +++ b/internal/pkg/core/monitoring/monitor.go @@ -19,7 +19,7 @@ type Monitor interface { MetricsPathPrefixed(spec program.Spec, pipelineID string) string Prepare(spec program.Spec, pipelineID string, uid, gid int) error - EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string + EnrichArgs(spec program.Spec, pipelineID string, args []string) []string Cleanup(spec program.Spec, pipelineID string) error Reload(cfg *config.Config) error IsMonitoringEnabled() bool diff --git a/internal/pkg/core/monitoring/noop/noop_monitor.go b/internal/pkg/core/monitoring/noop/noop_monitor.go index 44e47982455..d04eb08feec 100644 --- a/internal/pkg/core/monitoring/noop/noop_monitor.go +++ b/internal/pkg/core/monitoring/noop/noop_monitor.go @@ -11,8 +11,7 @@ import ( // Monitor is a monitoring interface providing information about the way // how beat is monitored -type Monitor struct { -} +type Monitor struct{} // NewMonitor creates a beats monitor. func NewMonitor() *Monitor { @@ -21,7 +20,7 @@ func NewMonitor() *Monitor { // EnrichArgs enriches arguments provided to application, in order to enable // monitoring -func (b *Monitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string { +func (b *Monitor) EnrichArgs(_ program.Spec, _ string, args []string) []string { return args } diff --git a/internal/pkg/core/monitoring/server/process.go b/internal/pkg/core/monitoring/server/process.go index 56f7d26eb78..2c1fbf04bcf 100644 --- a/internal/pkg/core/monitoring/server/process.go +++ b/internal/pkg/core/monitoring/server/process.go @@ -150,15 +150,12 @@ func generateEndpoint(id string) (string, error) { return "", err } - endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output) + endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output, detail.isMonitoring) if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") { // add prefix for npipe and unix endpoint = httpPlusPrefix + endpoint } - if detail.isMonitoring { - endpoint += "_monitor" - } return endpoint, nil } diff --git a/internal/pkg/core/plugin/process/start.go b/internal/pkg/core/plugin/process/start.go index 29770ae714b..1f193a21690 100644 --- a/internal/pkg/core/plugin/process/start.go +++ b/internal/pkg/core/plugin/process/start.go @@ -23,6 +23,13 @@ import ( "github.com/elastic/elastic-agent/pkg/core/server" ) +const ( + levelInfo = "info" + levelDebug = "debug" + levelWarning = "warning" + levelError = "error" +) + // Start starts the application with a specified config. func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) error { a.appLock.Lock() @@ -74,8 +81,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] // Failed applications can be started again. if srvState != nil { a.setState(state.Starting, "Starting", nil) - srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload) - srvState.UpdateConfig(srvState.Config()) + _ = srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload) + _ = srvState.UpdateConfig(srvState.Config()) } else { a.srvState, err = a.srv.Register(a, string(cfgStr)) if err != nil { @@ -119,8 +126,7 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] spec.Args = injectLogLevel(a.logLevel, spec.Args) // use separate file - isSidecar := app.IsSidecar(t) - spec.Args = a.monitor.EnrichArgs(a.desc.Spec(), a.pipelineID, spec.Args, isSidecar) + spec.Args = a.monitor.EnrichArgs(a.desc.Spec(), a.pipelineID, spec.Args) // specify beat name to avoid data lock conflicts // as for https://github.com/elastic/beats/v7/pull/14030 more than one instance @@ -165,24 +171,18 @@ func (a *Application) writeToStdin(as *server.ApplicationState, wc io.WriteClose } func injectLogLevel(logLevel string, args []string) []string { - var level string - // Translate to level beat understands - switch logLevel { - case "info": - level = "info" - case "debug": - level = "debug" - case "warning": - level = "warning" - case "error": - level = "error" + if args == nil || logLevel == "" { + return args } - if args == nil || level == "" { - return args + if logLevel == levelDebug || + logLevel == levelInfo || + logLevel == levelWarning || + logLevel == levelError { + return append(args, "-E", "logging.level="+logLevel) } - return append(args, "-E", "logging.level="+level) + return args } func injectDataPath(args []string, pipelineID, id string) []string {