Skip to content

Commit

Permalink
Merge branch 'master' into kafka-version
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Sep 3, 2021
2 parents 526175d + 29da912 commit 1589522
Show file tree
Hide file tree
Showing 43 changed files with 698 additions and 242 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add daemonset.name in pods controlled by DaemonSets {pull}26808[26808], {issue}25816[25816]
- Kubernetes autodiscover fails in node scope if node name cannot be discovered {pull}26947[26947]
- Loading Kibana assets (dashboards, index templates) rely on Saved Object API. So to provide a reliable service, Beats can only import and export dasbhboards using at least Kibana 7.15. {issue}20672[20672] {pull}27220[27220]
- Skip add_kubernetes_metadata processor when kubernetes metadata are already present {pull}27689[27689]

*Auditbeat*

Expand Down Expand Up @@ -82,6 +83,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- threatintel module: Changed the type of `threatintel.indicator.first_seen` from `keyword` to `date`. {pull}26765[26765]
- Remove all alias fields pointing to ECS fields from modules. This affects the Suricata and Traefik modules. {issue}10535[10535] {pull}26627[26627]
- Add option for S3 input to work without SQS notification {issue}18205[18205] {pull}27332[27332]
- Fix Crowdstrike ingest pipeline that was creating flattened `process` fields. {issue}27622[27622] {pull}27623[27623]

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
Expand Down Expand Up @@ -204,6 +206,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Do not try to load ILM policy if `check_exists` is `false`. {pull}27508[27508] {issue}26322[26322]
- Fix bug with cgroups hierarchy override path in cgroups {pull}27620[27620]
- Beat `setup kibana` command may use the elasticsearch API key defined in `output.elasticsearch.api_key`. {issue}24015[24015] {pull}27540[27540]
- Seperate namespaces for V1 and V2 controller paths {pull}27676[27676]

*Auditbeat*

Expand Down Expand Up @@ -306,6 +309,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638]
- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688]
- Fix `httpjson` input rate limit processing and documentation. {pull}[]

*Heartbeat*

Expand Down Expand Up @@ -736,6 +740,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update `tags` and `threatintel.indicator.provider` fields in `threatintel.anomali` ingest pipeline {issue}24746[24746] {pull}27141[27141]
- Move AWS module and filesets to GA. {pull}27428[27428]
- update ecs.version to ECS 1.11.0. {pull}27107[27107]
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]


*Heartbeat*
Expand Down
18 changes: 10 additions & 8 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,16 @@ def generateStages(Map args = [:]) {
}

def cloud(Map args = [:]) {
withNode(labels: args.label, forceWorkspace: true){
startCloudTestEnv(name: args.directory, dirs: args.dirs)
}
withCloudTestEnv() {
try {
target(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id)
} finally {
terraformCleanup(name: args.directory, dir: args.directory)
withGithubNotify(context: args.context) {
withNode(labels: args.label, forceWorkspace: true){
startCloudTestEnv(name: args.directory, dirs: args.dirs)
}
withCloudTestEnv() {
try {
target(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id)
} finally {
terraformCleanup(name: args.directory, dir: args.directory)
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions docs/devguide/newdashboards.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ MODULE=redis ID=AV4REOpp5NkDleZmzKkE mage exportDashboard

[source,shell]
---------------
./filebeat export dashboard -id 7fea2930-478e-11e7-b1f0-cb29bac6bf8b >> Filebeat-redis.ndjson
./filebeat export dashboard -id 7fea2930-478e-11e7-b1f0-cb29bac6bf8b -folder module/redis
---------------

This generates a `AV4REOpp5NkDleZmzKkE.ndjson` file inside dashboard directory in the redis module.
It contains all dependencies like visualizations and searches.
This generates an appropriate folder under module/redis for the dashboard, separating assets into dashboards, searches, vizualizations, etc.
Each exported file is a JSON and their names are the IDs of the assets.

NOTE: The dashboard ID is available in the dashboard URL. For example, in case the dashboard URL is
`app/kibana#/dashboard/AV4REOpp5NkDleZmzKkE?_g=()&_a=(description:'Overview%2...`, the dashboard ID is `AV4REOpp5NkDleZmzKkE`.
Expand Down Expand Up @@ -289,7 +289,7 @@ By passing the yml file to the `export_dashboards.go` script or to the Beat, you

[source,shell]
-------------------
go run dev-tools/cmd/dashboards/export_dashboards.go -yml filebeat/module/system/module.yml
go run dev-tools/cmd/dashboards/export_dashboards.go -yml filebeat/module/system/module.yml -folder dashboards
-------------------

[source,shell]
Expand Down
14 changes: 7 additions & 7 deletions libbeat/metric/system/cgroup/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ func (r *Reader) GetV1StatsForProcess(pid int) (*StatsV1, error) {
if err != nil {
return nil, err
}

stats := StatsV1{}
stats.Path, stats.ID = getCommonCgroupMetadata(paths)
stats.Path, stats.ID = getCommonCgroupMetadata(paths.V1)
stats.Version = CgroupsV1
for conName, cgPath := range paths {
for conName, cgPath := range paths.V1 {
if r.ignoreRootCgroups && (cgPath.ControllerPath == "/" && r.cgroupsHierarchyOverride != cgPath.ControllerPath) {
continue
}
Expand All @@ -209,9 +210,9 @@ func (r *Reader) GetV2StatsForProcess(pid int) (*StatsV2, error) {
return nil, err
}
stats := StatsV2{}
stats.Path, stats.ID = getCommonCgroupMetadata(paths)
stats.Path, stats.ID = getCommonCgroupMetadata(paths.V2)
stats.Version = CgroupsV2
for conName, cgPath := range paths {
for conName, cgPath := range paths.V2 {
if r.ignoreRootCgroups && (cgPath.ControllerPath == "/" && r.cgroupsHierarchyOverride != cgPath.ControllerPath) {
continue
}
Expand All @@ -225,10 +226,10 @@ func (r *Reader) GetV2StatsForProcess(pid int) (*StatsV2, error) {

// ProcessCgroupPaths is a wrapper around Reader.ProcessCgroupPaths for libraries that only need the slimmer functionality from
// the gosigar cgroups code. This does not have the same function signature, and consumers still need to distinguish between v1 and v2 cgroups.
func ProcessCgroupPaths(hostfs string, pid int) (map[string]ControllerPath, error) {
func ProcessCgroupPaths(hostfs string, pid int) (PathList, error) {
reader, err := NewReader(hostfs, false)
if err != nil {
return nil, errors.Wrap(err, "error creating cgroups reader")
return PathList{}, errors.Wrap(err, "error creating cgroups reader")
}
return reader.ProcessCgroupPaths(pid)
}
Expand Down Expand Up @@ -267,7 +268,6 @@ func getStatsV2(path ControllerPath, name string, stats *StatsV2) error {
}

func getStatsV1(path ControllerPath, name string, stats *StatsV1) error {

id := filepath.Base(path.ControllerPath)

switch name {
Expand Down
32 changes: 26 additions & 6 deletions libbeat/metric/system/cgroup/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ type ControllerPath struct {
IsV2 bool
}

// PathList contains the V1 and V2 controller paths in a process
// Separate the V1 and V2 cgroups so we don't have hybrid cgroups fighting for one namespace
type PathList struct {
V1 map[string]ControllerPath
V2 map[string]ControllerPath
}

// Flatten combines the V1 and V2 cgroups in cases where we don't need a map with keys
func (pl PathList) Flatten() []ControllerPath {
list := []ControllerPath{}
for _, v1 := range pl.V1 {
list = append(list, v1)
}
for _, v2 := range pl.V2 {
list = append(list, v2)
}

return list
}

// parseMountinfoLine parses a line from the /proc/[pid]/mountinfo file on
// Linux. The format of the line is specified in section 3.5 of
// https://www.kernel.org/doc/Documentation/filesystems/proc.txt.
Expand Down Expand Up @@ -217,14 +237,14 @@ func SubsystemMountpoints(rootfsMountpoint string, subsystems map[string]struct{

// ProcessCgroupPaths returns the cgroups to which a process belongs and the
// pathname of the cgroup relative to the mountpoint of the subsystem.
func (r Reader) ProcessCgroupPaths(pid int) (map[string]ControllerPath, error) {
func (r Reader) ProcessCgroupPaths(pid int) (PathList, error) {
cgroup, err := os.Open(filepath.Join(r.rootfsMountpoint, "proc", strconv.Itoa(pid), "cgroup"))
if err != nil {
return nil, err //return a blank error so other events can use any file not found errors
return PathList{}, err //return a blank error so other events can use any file not found errors
}
defer cgroup.Close()

cPaths := map[string]ControllerPath{}
cPaths := PathList{V1: map[string]ControllerPath{}, V2: map[string]ControllerPath{}}
sc := bufio.NewScanner(cgroup)
for sc.Scan() {
// http://man7.org/linux/man-pages/man7/cgroups.7.html
Expand Down Expand Up @@ -265,22 +285,22 @@ the container as /sys/fs/cgroup/unified and start metricbeat with --system.hostf

cgpaths, err := ioutil.ReadDir(controllerPath)
if err != nil {
return nil, errors.Wrapf(err, "error fetching cgroupV2 controllers for cgroup location '%s' and path line '%s'", r.cgroupMountpoints.V2Loc, line)
return cPaths, errors.Wrapf(err, "error fetching cgroupV2 controllers for cgroup location '%s' and path line '%s'", r.cgroupMountpoints.V2Loc, line)
}
// In order to produce the same kind of data for cgroups V1 and V2 controllers,
// We iterate over the group, and look for controllers, since the V2 unified system doesn't list them under the PID
for _, singlePath := range cgpaths {
if strings.Contains(singlePath.Name(), "stat") {
controllerName := strings.TrimSuffix(singlePath.Name(), ".stat")
cPaths[controllerName] = ControllerPath{ControllerPath: path, FullPath: controllerPath, IsV2: true}
cPaths.V2[controllerName] = ControllerPath{ControllerPath: path, FullPath: controllerPath, IsV2: true}
}
}
// cgroup v1
} else {
subsystems := strings.Split(fields[1], ",")
for _, subsystem := range subsystems {
fullPath := filepath.Join(r.cgroupMountpoints.V1Mounts[subsystem], path)
cPaths[subsystem] = ControllerPath{ControllerPath: path, FullPath: fullPath, IsV2: false}
cPaths.V1[subsystem] = ControllerPath{ControllerPath: path, FullPath: fullPath, IsV2: false}
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions libbeat/metric/system/cgroup/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ func TestProcessCgroupPaths(t *testing.T) {
}

path := "/docker/b29faf21b7eff959f64b4192c34d5d67a707fe8561e9eaa608cb27693fba4242"
assert.Equal(t, path, paths["blkio"].ControllerPath)
assert.Equal(t, path, paths["cpu"].ControllerPath)
assert.Equal(t, path, paths["cpuacct"].ControllerPath)
assert.Equal(t, path, paths["cpuset"].ControllerPath)
assert.Equal(t, path, paths["devices"].ControllerPath)
assert.Equal(t, path, paths["freezer"].ControllerPath)
assert.Equal(t, path, paths["memory"].ControllerPath)
assert.Equal(t, path, paths["net_cls"].ControllerPath)
assert.Equal(t, path, paths["net_prio"].ControllerPath)
assert.Equal(t, path, paths["perf_event"].ControllerPath)
assert.Len(t, paths, 10)
assert.Equal(t, path, paths.V1["blkio"].ControllerPath)
assert.Equal(t, path, paths.V1["cpu"].ControllerPath)
assert.Equal(t, path, paths.V1["cpuacct"].ControllerPath)
assert.Equal(t, path, paths.V1["cpuset"].ControllerPath)
assert.Equal(t, path, paths.V1["devices"].ControllerPath)
assert.Equal(t, path, paths.V1["freezer"].ControllerPath)
assert.Equal(t, path, paths.V1["memory"].ControllerPath)
assert.Equal(t, path, paths.V1["net_cls"].ControllerPath)
assert.Equal(t, path, paths.V1["net_prio"].ControllerPath)
assert.Equal(t, path, paths.V1["perf_event"].ControllerPath)
assert.Len(t, paths.Flatten(), 10)
}

func TestProcessCgroupPathsV2(t *testing.T) {
Expand All @@ -195,10 +195,10 @@ func TestProcessCgroupPathsV2(t *testing.T) {
t.Fatalf("error in ProcessCgroupPaths: %s", err)
}

assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths["cgroup"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths["cpu"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths["io"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths["memory"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths.V2["cgroup"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths.V2["cpu"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths.V2["io"].FullPath)
assert.Equal(t, "testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope", paths.V2["memory"].FullPath)
}

func assertContains(t testing.TB, m map[string]struct{}, key string) {
Expand Down
12 changes: 6 additions & 6 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (d *addDockerMetadata) String() string {
// lookupContainerIDByPID finds the container ID based on PID fields contained
// in the event.
func (d *addDockerMetadata) lookupContainerIDByPID(event *beat.Event) (string, error) {
var cgroups map[string]cgroup.ControllerPath
var cgroups cgroup.PathList
for _, field := range d.pidFields {
v, err := event.GetValue(field)
if err != nil {
Expand Down Expand Up @@ -264,19 +264,19 @@ func (d *addDockerMetadata) lookupContainerIDByPID(event *beat.Event) (string, e

// getProcessCgroups returns a mapping of cgroup subsystem name to path. It
// returns an error if it failed to retrieve the cgroup info.
func (d *addDockerMetadata) getProcessCgroups(pid int) (map[string]cgroup.ControllerPath, error) {
func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error) {
// Initialize at time of first use.
lazyCgroupCacheInit(d)

cgroups, ok := d.cgroups.Get(pid).(map[string]cgroup.ControllerPath)
cgroups, ok := d.cgroups.Get(pid).(cgroup.PathList)
if ok {
d.log.Debugf("Using cached cgroups for pid=%v", pid)
return cgroups, nil
}

cgroups, err := processCgroupPaths(d.hostFS, pid)
if err != nil {
return nil, errors.Wrapf(err, "failed to read cgroups for pid=%v", pid)
return cgroups, errors.Wrapf(err, "failed to read cgroups for pid=%v", pid)
}

d.cgroups.Put(pid, cgroups)
Expand All @@ -287,8 +287,8 @@ func (d *addDockerMetadata) getProcessCgroups(pid int) (map[string]cgroup.Contro
// of them are associated with Docker. For cgroups V1, Docker uses /docker/<CID> when
// naming cgroups and we use this to determine the container ID. For V2,
// it's part of a more complex string.
func getContainerIDFromCgroups(cgroups map[string]cgroup.ControllerPath) (string, error) {
for _, path := range cgroups {
func getContainerIDFromCgroups(cgroups cgroup.PathList) (string, error) {
for _, path := range cgroups.Flatten() {
re := regexp.MustCompile(`[\w]{64}`)
rs := re.FindStringSubmatch(path.ControllerPath)
if rs != nil {
Expand Down
19 changes: 12 additions & 7 deletions libbeat/processors/add_docker_metadata/add_docker_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,26 @@ import (

func init() {
// Stub out the procfs.
processCgroupPaths = func(_ string, pid int) (map[string]cgroup.ControllerPath, error) {
processCgroupPaths = func(_ string, pid int) (cgroup.PathList, error) {

switch pid {
case 1000:
return map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return nil, fmt.Errorf("cgroup parse failure")
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return nil, os.ErrNotExist
return cgroup.PathList{}, os.ErrNotExist
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {
}
}

// kubernetesMetadataExist checks whether an event is already enriched with kubernetes metadata
func kubernetesMetadataExist(event *beat.Event) bool {
if _, err := event.GetValue("kubernetes"); err != nil {
return false
}
return true
}

// New constructs a new add_kubernetes_metadata processor.
func New(cfg *common.Config) (processors.Processor, error) {
config, err := newProcessorConfig(cfg, Indexing)
Expand Down Expand Up @@ -251,6 +259,10 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
return event, nil
}
if kubernetesMetadataExist(event) {
k.log.Debug("Skipping add_kubernetes_metadata processor as kubernetes metadata already exist")
return event, nil
}
index := k.matchers.MetadataIndex(event.Fields)
if index == "" {
k.log.Debug("No container match string, not adding kubernetes data")
Expand Down
11 changes: 3 additions & 8 deletions libbeat/processors/add_kubernetes_metadata/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

// Test metadata updates don't replace existing pod metrics
func TestAnnotatorDeepUpdate(t *testing.T) {
// Test Annotator is skipped if kubernetes metadata already exist
func TestAnnotatorSkipped(t *testing.T) {
cfg := common.MustNewConfigFrom(map[string]interface{}{
"lookup_fields": []string{"kubernetes.pod.name"},
})
Expand All @@ -53,8 +53,7 @@ func TestAnnotatorDeepUpdate(t *testing.T) {
"kubernetes": common.MapStr{
"pod": common.MapStr{
"labels": common.MapStr{
"dont": "replace",
"original": "fields",
"added": "should not",
},
},
},
Expand Down Expand Up @@ -85,10 +84,6 @@ func TestAnnotatorDeepUpdate(t *testing.T) {
"a": 1,
"b": 2,
},
"labels": common.MapStr{
"dont": "replace",
"original": "fields",
},
},
},
}, event.Fields)
Expand Down
Loading

0 comments on commit 1589522

Please sign in to comment.