Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.5](backport #1034) Fix: Endpoint collision between monitoring and regular beats #1270

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ spec:
# - -c
# - >-
# mkdir -p /etc/elastic-agent/inputs.d &&
# wget -O - https://github.com/elastic/elastic-agent/archive/8.3.0.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-main/deploy/kubernetes/elastic-agent-standalone/templates.d"
# wget -O - https://github.com/elastic/elastic-agent/archive/8.5.0.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-main/deploy/kubernetes/elastic-agent-standalone/templates.d"
# volumeMounts:
# - name: external-inputs
# mountPath: /etc/elastic-agent/inputs.d
Expand Down
25 changes: 15 additions & 10 deletions internal/pkg/agent/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/control/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoring "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/internal/pkg/core/socket"
Expand All @@ -37,6 +36,10 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
agentName = "elastic-agent"
)

// Server is the daemon side of the control protocol.
type Server struct {
logger *logger.Logger
Expand Down Expand Up @@ -225,7 +228,8 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR
// gather spec data for all rk/apps running
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

procMeta := client.procMeta(ctx)
Expand Down Expand Up @@ -258,9 +262,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr
ch := make(chan *proto.PprofResult, 1)

// retrieve elastic-agent pprof data if requested or application is unspecified.
if req.AppName == "" || req.AppName == "elastic-agent" {
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
if req.AppName == "" || req.AppName == agentName {
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
for _, opt := range req.PprofType {
wg.Add(1)
go func(opt proto.PprofOption) {
Expand All @@ -273,11 +277,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr

// get requested rk/appname spec or all specs
var specs []specInfo
if req.AppName != "elastic-agent" {
if req.AppName != agentName {
specs = s.getSpecInfo(req.RouteKey, req.AppName)
}
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, false)
c := newSocketRequester(si.app, si.rk, endpoint)
// Launch a concurrent goroutine to gather all pprof endpoints from a socket.
for _, opt := range req.PprofType {
Expand Down Expand Up @@ -315,8 +319,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
}

// gather metrics buffer data from the elastic-agent
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester("elastic-agent", "", endpoint)
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
c := newSocketRequester(agentName, "", endpoint)
metrics := c.procMetrics(ctx)

resp := &proto.ProcMetricsResponse{
Expand All @@ -326,7 +330,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
// gather metrics buffer data from all other processes
specs := s.getSpecInfo("", "")
for _, si := range specs {
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
isSidecar := strings.HasSuffix(si.app, "_monitoring")
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
client := newSocketRequester(si.app, si.rk, endpoint)

s.logger.Infof("gather metrics from %s", endpoint)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type testMonitor struct {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string) []string {
return args
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/process"
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/service"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
Expand Down Expand Up @@ -387,7 +387,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
appName := p.BinaryName()
if app.IsSidecar(p) {
// make watchers unmonitorable
monitor = noop.NewMonitor()
monitor = beats.NewSidecarMonitor(o.config.DownloadConfig, o.config.MonitoringConfig)
appName += "_monitoring"
}

Expand Down
41 changes: 16 additions & 25 deletions internal/pkg/core/monitoring/beats/beats_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package beats

import (
"fmt"
"net/url"
"os"
"path/filepath"
Expand All @@ -20,8 +19,13 @@ import (
monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
)

const httpPlusPrefix = "http+"
const defaultMonitoringNamespace = "default"
const (
httpPlusPrefix = "http+"
defaultMonitoringNamespace = "default"
fileSchemePrefix = "file"
unixSchemePrefix = "unix"
windowsOS = "windows"
)

// Monitor implements the monitoring.Monitor interface providing information
// about beats.
Expand Down Expand Up @@ -99,15 +103,11 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor
func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics }

func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string {
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID)
}

func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string {
return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID)
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID, false)
}

func (b *Monitor) generateLoggingPath(spec program.Spec, pipelineID string) string {
return filepath.Dir(b.generateLoggingFile(spec, pipelineID))
return filepath.Dir(getLoggingFile(spec, b.operatingSystem, pipelineID))
}

func (b *Monitor) ownLoggingPath(spec program.Spec) bool {
Expand All @@ -118,15 +118,10 @@ func (b *Monitor) ownLoggingPath(spec program.Spec) bool {

// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string {
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string {
appendix := make([]string, 0, 7)

monitoringEndpoint := b.generateMonitoringEndpoint(spec, pipelineID)
if monitoringEndpoint != "" {
endpoint := monitoringEndpoint
if isSidecar {
endpoint += "_monitor"
}
if endpoint := b.generateMonitoringEndpoint(spec, pipelineID); endpoint != "" {
appendix = append(appendix,
"-E", "http.enabled=true",
"-E", "http.host="+endpoint,
Expand All @@ -146,10 +141,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string
loggingPath := b.generateLoggingPath(spec, pipelineID)
if loggingPath != "" {
logFile := spec.Cmd
if isSidecar {
logFile += "_monitor"
}
logFile = fmt.Sprintf("%s", logFile)
appendix = append(appendix,
"-E", "logging.files.path="+loggingPath,
"-E", "logging.files.name="+logFile,
Expand Down Expand Up @@ -224,7 +215,7 @@ func (b *Monitor) LogPath(spec program.Spec, pipelineID string) string {
return ""
}

return b.generateLoggingFile(spec, pipelineID)
return getLoggingFile(spec, b.operatingSystem, pipelineID)
}

// MetricsPath describes a location where application exposes metrics
Expand Down Expand Up @@ -272,15 +263,15 @@ func monitoringDrop(path string) (drop string) {
}

u, _ := url.Parse(path)
if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") {
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
return ""
}

if u.Scheme == "file" {
if u.Scheme == fileSchemePrefix {
return strings.TrimPrefix(path, "file://")
}

if u.Scheme == "unix" {
if u.Scheme == unixSchemePrefix {
return strings.TrimPrefix(path, "unix://")
}

Expand All @@ -299,7 +290,7 @@ func isWindowsPath(path string) bool {
}

func changeOwner(path string, uid, gid int) error {
if runtime.GOOS == "windows" {
if runtime.GOOS == windowsOS {
// on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError
return nil
}
Expand Down
20 changes: 14 additions & 6 deletions internal/pkg/core/monitoring/beats/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,39 @@ const (
agentMbEndpointFileFormatWin = `npipe:///elastic-agent`
// agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint
agentMbEndpointHTTP = "http://%s:%d"

monitorSuffix = "_monitor"
)

// MonitoringEndpoint is an endpoint where process is exposing its metrics.
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string {
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string, isSidecar bool) (endpointPath string) {
defer func() {
if isSidecar && endpointPath != "" {
endpointPath += monitorSuffix
}
}()

if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok {
return endpoint
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd)
}
// unix socket path must be less than 104 characters
path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd))
if len(path) < 104 {
if (isSidecar && len(path) < 104-len(monitorSuffix)) || (!isSidecar && len(path) < 104) {
return path
}
// place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long
// for it to be used, but needs to be unique per Agent (in the case that multiple are running)
return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path)))
}

func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string {
func getLoggingFile(spec program.Spec, operatingSystem, pipelineID string) string {
if path, ok := spec.LogPaths[operatingSystem]; ok {
return path
}
if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return fmt.Sprintf(logFileFormatWin, paths.Home(), pipelineID, spec.Cmd)
}
return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd)
Expand All @@ -63,7 +71,7 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit
return fmt.Sprintf(agentMbEndpointHTTP, cfg.Host, cfg.Port)
}

if operatingSystem == "windows" {
if operatingSystem == windowsOS {
return agentMbEndpointFileFormatWin
}
// unix socket path must be less than 104 characters
Expand Down
Loading