-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Capture stdout/stderr of spawned components #1702
Changes from 54 commits
837db32
3b9a723
7b2ec07
7bb4acd
2679c82
7f6b42a
8812fc9
5acdc40
e141de5
ff667df
9b68ea4
2cc2338
2705093
d56e3f5
9bba975
43ad01d
dee2403
712b300
5f1e54f
6ff50ac
f95c9ed
5051218
e91e3ce
e6a038e
9d1cea3
90523cc
bfc490a
ec83c2c
0602091
9420497
96e071e
fc3eba3
bd36958
4b17703
2a84a70
3ecddd6
d7e7d16
73b3d2e
2ebabb2
21e5966
5fda6fe
692aaa6
c877db0
99b5427
81b01ee
4903c1b
4d7e0fb
b6e2216
06e4a39
925c5b6
b3f0751
c3f45d4
639144c
79292d5
6d3db5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: feature | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: Capture stdout/stderr of all spawned components to simplify logging | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
#description: | ||
|
||
# Affected component; a word indicating the component this changeset affects. | ||
component: | ||
|
||
# PR number; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
pr: 1702 | ||
|
||
# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: 221 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,7 +58,7 @@ var ( | |
supportedBeatsComponents = []string{"filebeat", "metricbeat", "apm-server", "fleet-server", "auditbeat", "cloudbeat", "heartbeat", "osquerybeat", "packetbeat"} | ||
) | ||
|
||
// Beats monitor is providing V1 monitoring support. | ||
// BeatsMonitor is providing V1 monitoring support for metrics and logs for endpoint-security only. | ||
type BeatsMonitor struct { | ||
enabled bool // feature flag disabling whole v1 monitoring story | ||
config *monitoringConfig | ||
|
@@ -178,21 +178,10 @@ func (b *BeatsMonitor) EnrichArgs(unit, binary string, args []string) []string { | |
} | ||
} | ||
|
||
loggingPath := loggingPath(unit, b.operatingSystem) | ||
if loggingPath != "" { | ||
if !b.config.C.LogMetrics { | ||
appendix = append(appendix, | ||
"-E", "logging.files.path="+filepath.Dir(loggingPath), | ||
"-E", "logging.files.name="+filepath.Base(loggingPath), | ||
"-E", "logging.files.keepfiles=7", | ||
"-E", "logging.files.permission=0640", | ||
"-E", "logging.files.interval=1h", | ||
"-E", "logging.metrics.enabled=false", | ||
) | ||
|
||
if !b.config.C.LogMetrics { | ||
appendix = append(appendix, | ||
"-E", "logging.metrics.enabled=false", | ||
) | ||
} | ||
} | ||
|
||
return append(args, appendix...) | ||
|
@@ -291,24 +280,21 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{ | |
|
||
func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutput string) error { | ||
monitoringNamespace := b.monitoringNamespace() | ||
//fixedAgentName := strings.ReplaceAll(agentName, "-", "_") | ||
logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem)) | ||
|
||
streams := []interface{}{ | ||
map[string]interface{}{ | ||
idKey: "filestream-monitoring-agent", | ||
// "data_stream" is not used when creating an Input on Filebeat | ||
"data_stream": map[string]interface{}{ | ||
"type": "filestream", | ||
"dataset": "elastic_agent", | ||
"namespace": monitoringNamespace, | ||
}, | ||
idKey: "filestream-monitoring-agent", | ||
"type": "filestream", | ||
"paths": []interface{}{ | ||
filepath.Join(logsDrop, agentName+"-*.ndjson"), | ||
filepath.Join(logsDrop, agentName+"-watcher-*.ndjson"), | ||
}, | ||
"index": fmt.Sprintf("logs-elastic_agent-%s", monitoringNamespace), | ||
"data_stream": map[string]interface{}{ | ||
"type": "logs", | ||
"dataset": "elastic_agent", | ||
"namespace": monitoringNamespace, | ||
}, | ||
"close": map[string]interface{}{ | ||
"on_state_change": map[string]interface{}{ | ||
"inactive": "5m", | ||
|
@@ -325,133 +311,86 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDTo | |
}, | ||
}, | ||
"processors": []interface{}{ | ||
// copy original dataset so we can drop the dataset field | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "data_stream", | ||
"fields": map[string]interface{}{ | ||
"type": "logs", | ||
"dataset": "elastic_agent", | ||
"namespace": monitoringNamespace, | ||
}, | ||
}, | ||
}, | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "event", | ||
"fields": map[string]interface{}{ | ||
"dataset": "elastic_agent", | ||
}, | ||
}, | ||
}, | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "elastic_agent", | ||
"fields": map[string]interface{}{ | ||
"id": b.agentInfo.AgentID(), | ||
"version": b.agentInfo.Version(), | ||
"snapshot": b.agentInfo.Snapshot(), | ||
}, | ||
}, | ||
}, | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "agent", | ||
"fields": map[string]interface{}{ | ||
"id": b.agentInfo.AgentID(), | ||
"copy_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
map[string]interface{}{ | ||
"from": "data_stream.dataset", | ||
"to": "data_stream.dataset_original", | ||
}, | ||
}, | ||
}, | ||
}, | ||
// drop the dataset field so following copy_field can copy to it | ||
map[string]interface{}{ | ||
"drop_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
"ecs.version", //coming from logger, already added by libbeat | ||
"data_stream.dataset", | ||
}, | ||
"ignore_missing": true, | ||
}, | ||
}}, | ||
}, | ||
} | ||
for unit, binaryName := range componentIDToBinary { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm just wondering, have you checked overall CPU consumption with the move towards stdout/err monitoring? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know about the overall load change, I would say its a net wash. Being that the components no longer need to worry about rotation, syncing files, or worrying about deleting old logs. I think the benefits of components not needing to really have to worry about logging, rotation, etc; is a win! |
||
if !isSupportedBinary(binaryName) { | ||
continue | ||
} | ||
|
||
fixedBinaryName := strings.ReplaceAll(binaryName, "-", "_") | ||
name := strings.ReplaceAll(unit, "-", "_") // conform with index naming policy | ||
logFile := loggingPath(unit, b.operatingSystem) | ||
streams = append(streams, map[string]interface{}{ | ||
idKey: "filestream-monitoring-" + name, | ||
"data_stream": map[string]interface{}{ | ||
// "data_stream" is not used when creating an Input on Filebeat | ||
"type": "filestream", | ||
"dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), | ||
"namespace": monitoringNamespace, | ||
}, | ||
"type": "filestream", | ||
"index": fmt.Sprintf("logs-elastic_agent.%s-%s", fixedBinaryName, monitoringNamespace), | ||
"paths": []interface{}{logFile, logFile + "*"}, | ||
"close": map[string]interface{}{ | ||
"on_state_change": map[string]interface{}{ | ||
"inactive": "5m", | ||
}, | ||
}, | ||
"parsers": []interface{}{ | ||
map[string]interface{}{ | ||
"ndjson": map[string]interface{}{ | ||
"message_key": "message", | ||
"overwrite_keys": true, | ||
"add_error_key": true, | ||
"target": "", | ||
}, | ||
}, | ||
}, | ||
"processors": []interface{}{ | ||
// copy component.dataset as the real dataset | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "data_stream", | ||
"fields": map[string]interface{}{ | ||
"type": "logs", | ||
"dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), | ||
"namespace": monitoringNamespace, | ||
"copy_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
map[string]interface{}{ | ||
"from": "component.dataset", | ||
"to": "data_stream.dataset", | ||
}, | ||
}, | ||
"fail_on_error": false, | ||
"ignore_missing": true, | ||
}, | ||
}, | ||
// possible it's a log message from agent itself (doesn't have component.dataset) | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "event", | ||
"fields": map[string]interface{}{ | ||
"dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), | ||
"copy_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
map[string]interface{}{ | ||
"from": "data_stream.dataset_original", | ||
"to": "data_stream.dataset", | ||
}, | ||
}, | ||
"fail_on_error": false, | ||
}, | ||
}, | ||
// drop the original dataset copied and the event.dataset (as it will be updated) | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "elastic_agent", | ||
"fields": map[string]interface{}{ | ||
"id": b.agentInfo.AgentID(), | ||
"version": b.agentInfo.Version(), | ||
"snapshot": b.agentInfo.Snapshot(), | ||
"drop_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
"data_stream.dataset_original", | ||
"event.dataset", | ||
}, | ||
}, | ||
}, | ||
// update event.dataset with the now used data_stream.dataset | ||
map[string]interface{}{ | ||
"add_fields": map[string]interface{}{ | ||
"target": "agent", | ||
"fields": map[string]interface{}{ | ||
"id": b.agentInfo.AgentID(), | ||
"copy_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
map[string]interface{}{ | ||
"from": "data_stream.dataset", | ||
"to": "event.dataset", | ||
}, | ||
}, | ||
}, | ||
}, | ||
// coming from logger, added by agent (drop) | ||
map[string]interface{}{ | ||
"drop_fields": map[string]interface{}{ | ||
"fields": []interface{}{ | ||
"ecs.version", //coming from logger, already added by libbeat | ||
"ecs.version", | ||
}, | ||
"ignore_missing": true, | ||
}, | ||
}, | ||
}, | ||
}) | ||
// adjust destination data_stream based on the data_stream fields | ||
map[string]interface{}{ | ||
"add_formatted_index": map[string]interface{}{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the destination datastreams all the same as they were before this change? Are there any new datastreams created here? I haven't tested this locally yet, I just want to do an early check that we haven't introduced anything that could be considered a breaking change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The resulting datastreams are different in that now they are grouped by spawned component. This is done on purpose and a design decision of V2. The allows logs from each component that now has its own status to be grouped properly. So you can correlate status of a component to the exact logs of that component. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also needs to be in the changelog. I agree it makes more sense and better fits how datastreams are supposed to be used under agent. |
||
"index": "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice approach |
||
}, | ||
}}, | ||
}, | ||
} | ||
|
||
inputs := []interface{}{ | ||
|
@@ -460,10 +399,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDTo | |
"name": "filestream-monitoring-agent", | ||
"type": "filestream", | ||
useOutputKey: monitoringOutput, | ||
"data_stream": map[string]interface{}{ | ||
"namespace": monitoringNamespace, | ||
}, | ||
"streams": streams, | ||
"streams": streams, | ||
}, | ||
} | ||
inputsNode, found := cfg[inputsKey] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should mention that the default log level for Beat sub-processes is not longer debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not going to fit in the 80ish character limit. Should I add that as another changelog entry? Or just make it really long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just got it to fit in the same one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the description field if you need it to be longer than 80 characters.