Skip to content

Commit

Permalink
Osquerybeat: Add action responses data stream (#39143)
Browse files Browse the repository at this point in the history
* Add action responses data stream

* Add missing copyright header

* Make linter happy

* Linting

* Fix the compatibility with older stack
  • Loading branch information
aleksmaus authored May 15, 2024
1 parent d2ebffe commit 86ef638
Show file tree
Hide file tree
Showing 17 changed files with 1,173 additions and 54 deletions.
4 changes: 4 additions & 0 deletions x-pack/osquerybeat/beater/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
ErrNoQueryExecutor = errors.New("no query executor configures")
)

type actionResultPublisher interface {
PublishActionResult(req map[string]interface{}, res map[string]interface{})
}

type publisher interface {
Publish(index, actionID, responseID string, meta map[string]interface{}, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{})
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
}

// Set reseable action handler
rah := newResetableActionHandler(bt.log)
rah := newResetableActionHandler(bt.pub, bt.log)
defer rah.Clear()

g, ctx := errgroup.WithContext(ctx)
Expand Down
8 changes: 7 additions & 1 deletion x-pack/osquerybeat/beater/resetable_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
//
// The lifetime of this should the a scope of the beat Run
type resetableActionHandler struct {
pub actionResultPublisher

log *logp.Logger

ah client.Action
Expand All @@ -43,8 +45,9 @@ type resetableActionHandler struct {

type optionFunc func(a *resetableActionHandler)

func newResetableActionHandler(log *logp.Logger, opts ...optionFunc) *resetableActionHandler {
func newResetableActionHandler(pub actionResultPublisher, log *logp.Logger, opts ...optionFunc) *resetableActionHandler {
a := &resetableActionHandler{
pub: pub,
log: log,
timeout: defaultTimeout,
}
Expand All @@ -69,6 +72,9 @@ func (a *resetableActionHandler) Execute(ctx context.Context, req map[string]int
res = renderResult(res, err)
err = nil
}
if a.pub != nil {
a.pub.PublishActionResult(req, res)
}
}()

res, err = a.execute(ctx, req)
Expand Down
12 changes: 11 additions & 1 deletion x-pack/osquerybeat/beater/resetable_action_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (a *mockActionHandler) Name() string {
return "osquery"
}

type mockActionResultPublisher struct {
req, res map[string]interface{}
}

func (p *mockActionResultPublisher) PublishActionResult(req map[string]interface{}, res map[string]interface{}) {
p.req = req
p.res = res
}

func TestResetableActionHandler(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
Expand Down Expand Up @@ -78,7 +87,8 @@ func TestResetableActionHandler(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
rah := newResetableActionHandler(log, resetableActionHandlerWithTimeout(testActionHandlerTimeout))
pub := &mockActionResultPublisher{}
rah := newResetableActionHandler(pub, log, resetableActionHandlerWithTimeout(testActionHandlerTimeout))
defer rah.Clear()

if tc.ah != nil {
Expand Down
89 changes: 88 additions & 1 deletion x-pack/osquerybeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -81,6 +82,92 @@ func genVerifyCmd(_ instance.Settings) *cobra.Command {
}

func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
// For the older stack there were no streams, creating one
if len(rawIn.GetStreams()) == 0 {
return osquerybeatCfgNoStreams(rawIn, agentInfo)
}
return osquerybeatCfgFromStreams(rawIn, agentInfo)
}

func osquerybeatCfgFromStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {

streams := make([]*proto.Stream, 0, len(rawIn.Streams))

// Attach osquery configuration to the osquery_manager.result stream and set it as a first stream
for _, stream := range rawIn.Streams {
if stream.DataStream != nil && stream.DataStream.Dataset == config.DefaultDataset {
if stream.Source == nil {
// If for any reason the stream source is missing completely, use datastream source as before
stream.Source = rawIn.Source
} else {
// Set osquery configuration value
fieldsSrc := rawIn.Source.Fields
fieldsDst := stream.Source.Fields
var osqVal *structpb.Value
if fieldsSrc != nil {
osqVal = fieldsSrc["osquery"]
}
if osqVal != nil {
fieldsDst["osquery"] = osqVal
}
// Setting id to the source because it is being picked up from there in shared management.CreateInputsFromStreams
vId, ok := fieldsDst["id"]
shouldSet := false
if !ok || vId == nil {
shouldSet = true
} else {
if _, ok := vId.GetKind().(*structpb.Value_NullValue); ok {
shouldSet = true
}
}
if shouldSet {
fieldsDst["id"] = structpb.NewStringValue(rawIn.Id)
}
}
streams = append([]*proto.Stream{stream}, streams...)
continue
}
streams = append(streams, stream)
}
rawIn.Streams = streams

streamList, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}

var ns string
if rawIn.DataStream != nil {
ns = rawIn.DataStream.Namespace
if ns == "" {
ns = config.DefaultNamespace
}
}

for iter := range streamList {
if _, ok := streamList[iter]["type"]; !ok {
streamList[iter]["type"] = rawIn.Type
}
if v, ok := streamList[iter]["data_stream"]; ok {
if m, ok := v.(map[string]interface{}); ok {
if _, ok := m["namespace"]; !ok {
m["namespace"] = ns
}
}
}
}

// format for the reloadable list needed by the cm.Reload() method
configList, err := management.CreateReloadConfigFromInputs(streamList)
if err != nil {
return nil, fmt.Errorf("error creating config for reloader: %w", err)
}

return configList, nil
}

// This is needed for compatibility with the legacy implementation where kibana set empty streams array [] into the policy
func osquerybeatCfgNoStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
// Convert to streams, osquerybeat doesn't use streams
streams := make([]*proto.Stream, 1)

Expand Down Expand Up @@ -113,7 +200,7 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo
modules[iter]["type"] = "log"
}

// format for the reloadable list needed bythe cm.Reload() method
// format for the reloadable list needed by the cm.Reload() method
configList, err := management.CreateReloadConfigFromInputs(modules)
if err != nil {
return nil, fmt.Errorf("error creating config for reloader: %w", err)
Expand Down
98 changes: 98 additions & 0 deletions x-pack/osquerybeat/cmd/root_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/beats/v7/libbeat/common/reload"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestOsquerybeatCfg(t *testing.T) {
matches, err := filepath.Glob("testdata/osquerycfg/*.in.json")
if err != nil {
t.Fatal(err)
}

for _, match := range matches {
dir := filepath.Dir(match)
key := strings.TrimSuffix(filepath.Base(match), `.in.json`)

out := filepath.Join(dir, key+".out.json")
t.Run(key, func(in, out string) func(t *testing.T) {
return func(t *testing.T) {
var rawIn proto.UnitExpectedConfig
err := readRawIn(in, &rawIn)
if err != nil {
t.Fatal(err)
}

want, err := readOut(out)
if err != nil {
t.Fatal(err)
}

cfg, err := osquerybeatCfg(&rawIn, &client.AgentInfo{ID: "abc7d0a8-ce04-4663-95da-ff6d537c268f", Version: "8.13.1"})
if err != nil {
t.Fatal(err)
}
got, err := cfgToArrMap(cfg)
if err != nil {
t.Fatal(err)
}

diff := cmp.Diff(want, got)
if diff != "" {
t.Fatal(diff)
}
}
}(match, out))
}
}

func readRawIn(filename string, rawIn *proto.UnitExpectedConfig) error {
b, err := os.ReadFile(filename)
if err != nil {
return err
}
err = json.Unmarshal(b, rawIn)
return err
}

func readOut(filename string) (cfg []map[string]interface{}, err error) {
b, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &cfg)
if err != nil {
return nil, err
}
return cfg, err
}

func cfgToArrMap(cfg []*reload.ConfigWithMeta) ([]map[string]interface{}, error) {
res := make([]map[string]interface{}, 0, len(cfg))
for _, c := range cfg {
var m mapstr.M
err := c.Config.Unpack(&m)
if err != nil {
return nil, err
}
res = append(res, map[string]interface{}(m))
}
return res, nil
}
51 changes: 51 additions & 0 deletions x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"source": {
"data_stream": {
"namespace": "default"
},
"id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"meta": {
"package": {
"name": "osquery_manager",
"version": "1.12.1"
}
},
"name": "osquery_manager-1",
"package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"policy": {
"revision": 2
},
"revision": 1,
"streams": [
],
"type": "osquery"
},
"id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"type": "osquery",
"name": "osquery_manager-1",
"revision": 1,
"meta": {
"source": {
"package": {
"name": "osquery_manager",
"version": "1.12.1"
}
},
"package": {
"source": {
"name": "osquery_manager",
"version": "1.12.1"
},
"name": "osquery_manager",
"version": "1.12.1"
}
},
"data_stream": {
"source": {
"namespace": "default"
},
"namespace": "default"
},
"streams": [
]
}
Loading

0 comments on commit 86ef638

Please sign in to comment.