Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Osquerybeat: Add action responses data stream #39143

Merged
merged 9 commits into from
May 15, 2024
4 changes: 4 additions & 0 deletions x-pack/osquerybeat/beater/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
ErrNoQueryExecutor = errors.New("no query executor configures")
)

type actionResultPublisher interface {
PublishActionResult(req map[string]interface{}, res map[string]interface{})
}

type publisher interface {
Publish(index, actionID, responseID string, meta map[string]interface{}, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{})
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
}

// Set reseable action handler
rah := newResetableActionHandler(bt.log)
rah := newResetableActionHandler(bt.pub, bt.log)
defer rah.Clear()

g, ctx := errgroup.WithContext(ctx)
Expand Down
8 changes: 7 additions & 1 deletion x-pack/osquerybeat/beater/resetable_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
//
// The lifetime of this should the a scope of the beat Run
type resetableActionHandler struct {
pub actionResultPublisher

log *logp.Logger

ah client.Action
Expand All @@ -43,8 +45,9 @@ type resetableActionHandler struct {

type optionFunc func(a *resetableActionHandler)

func newResetableActionHandler(log *logp.Logger, opts ...optionFunc) *resetableActionHandler {
func newResetableActionHandler(pub actionResultPublisher, log *logp.Logger, opts ...optionFunc) *resetableActionHandler {
a := &resetableActionHandler{
pub: pub,
log: log,
timeout: defaultTimeout,
}
Expand All @@ -69,6 +72,9 @@ func (a *resetableActionHandler) Execute(ctx context.Context, req map[string]int
res = renderResult(res, err)
err = nil
}
if a.pub != nil {
a.pub.PublishActionResult(req, res)
}
}()

res, err = a.execute(ctx, req)
Expand Down
12 changes: 11 additions & 1 deletion x-pack/osquerybeat/beater/resetable_action_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (a *mockActionHandler) Name() string {
return "osquery"
}

type mockActionResultPublisher struct {
req, res map[string]interface{}
}

func (p *mockActionResultPublisher) PublishActionResult(req map[string]interface{}, res map[string]interface{}) {
p.req = req
p.res = res
}

func TestResetableActionHandler(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
Expand Down Expand Up @@ -78,7 +87,8 @@ func TestResetableActionHandler(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
rah := newResetableActionHandler(log, resetableActionHandlerWithTimeout(testActionHandlerTimeout))
pub := &mockActionResultPublisher{}
rah := newResetableActionHandler(pub, log, resetableActionHandlerWithTimeout(testActionHandlerTimeout))
defer rah.Clear()

if tc.ah != nil {
Expand Down
89 changes: 88 additions & 1 deletion x-pack/osquerybeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -81,6 +82,92 @@ func genVerifyCmd(_ instance.Settings) *cobra.Command {
}

func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
// For the older stack there were no streams, creating one
if len(rawIn.GetStreams()) == 0 {
return osquerybeatCfgNoStreams(rawIn, agentInfo)
}
return osquerybeatCfgFromStreams(rawIn, agentInfo)
}

func osquerybeatCfgFromStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {

streams := make([]*proto.Stream, 0, len(rawIn.Streams))

// Attach osquery configuration to the osquery_manager.result stream and set it as a first stream
for _, stream := range rawIn.Streams {
if stream.DataStream != nil && stream.DataStream.Dataset == config.DefaultDataset {
if stream.Source == nil {
// If for any reason the stream source is missing completely, use datastream source as before
stream.Source = rawIn.Source
} else {
// Set osquery configuration value
fieldsSrc := rawIn.Source.Fields
fieldsDst := stream.Source.Fields
var osqVal *structpb.Value
if fieldsSrc != nil {
osqVal = fieldsSrc["osquery"]
}
if osqVal != nil {
fieldsDst["osquery"] = osqVal
}
// Setting id to the source because it is being picked up from there in shared management.CreateInputsFromStreams
vId, ok := fieldsDst["id"]
shouldSet := false
if !ok || vId == nil {
shouldSet = true
} else {
if _, ok := vId.GetKind().(*structpb.Value_NullValue); ok {
shouldSet = true
}
}
if shouldSet {
fieldsDst["id"] = structpb.NewStringValue(rawIn.Id)
}
}
streams = append([]*proto.Stream{stream}, streams...)
continue
}
streams = append(streams, stream)
}
rawIn.Streams = streams

streamList, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}

var ns string
if rawIn.DataStream != nil {
ns = rawIn.DataStream.Namespace
if ns == "" {
ns = config.DefaultNamespace
}
}

for iter := range streamList {
if _, ok := streamList[iter]["type"]; !ok {
streamList[iter]["type"] = rawIn.Type
}
if v, ok := streamList[iter]["data_stream"]; ok {
if m, ok := v.(map[string]interface{}); ok {
if _, ok := m["namespace"]; !ok {
m["namespace"] = ns
}
}
}
}

// format for the reloadable list needed by the cm.Reload() method
configList, err := management.CreateReloadConfigFromInputs(streamList)
if err != nil {
return nil, fmt.Errorf("error creating config for reloader: %w", err)
}

return configList, nil
}

// This is needed for compatibility with the legacy implementation where kibana set empty streams array [] into the policy
func osquerybeatCfgNoStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
// Convert to streams, osquerybeat doesn't use streams
streams := make([]*proto.Stream, 1)

Expand Down Expand Up @@ -113,7 +200,7 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo
modules[iter]["type"] = "log"
}

// format for the reloadable list needed bythe cm.Reload() method
// format for the reloadable list needed by the cm.Reload() method
configList, err := management.CreateReloadConfigFromInputs(modules)
if err != nil {
return nil, fmt.Errorf("error creating config for reloader: %w", err)
Expand Down
98 changes: 98 additions & 0 deletions x-pack/osquerybeat/cmd/root_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/beats/v7/libbeat/common/reload"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestOsquerybeatCfg(t *testing.T) {
matches, err := filepath.Glob("testdata/osquerycfg/*.in.json")
if err != nil {
t.Fatal(err)
}

for _, match := range matches {
dir := filepath.Dir(match)
key := strings.TrimSuffix(filepath.Base(match), `.in.json`)

out := filepath.Join(dir, key+".out.json")
t.Run(key, func(in, out string) func(t *testing.T) {
return func(t *testing.T) {
var rawIn proto.UnitExpectedConfig
err := readRawIn(in, &rawIn)
if err != nil {
t.Fatal(err)
}

want, err := readOut(out)
if err != nil {
t.Fatal(err)
}

cfg, err := osquerybeatCfg(&rawIn, &client.AgentInfo{ID: "abc7d0a8-ce04-4663-95da-ff6d537c268f", Version: "8.13.1"})
if err != nil {
t.Fatal(err)
}
got, err := cfgToArrMap(cfg)
if err != nil {
t.Fatal(err)
}

diff := cmp.Diff(want, got)
if diff != "" {
t.Fatal(diff)
}
}
}(match, out))
}
}

func readRawIn(filename string, rawIn *proto.UnitExpectedConfig) error {
b, err := os.ReadFile(filename)
if err != nil {
return err
}
err = json.Unmarshal(b, rawIn)
return err
}

func readOut(filename string) (cfg []map[string]interface{}, err error) {
b, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &cfg)
if err != nil {
return nil, err
}
return cfg, err
}

func cfgToArrMap(cfg []*reload.ConfigWithMeta) ([]map[string]interface{}, error) {
res := make([]map[string]interface{}, 0, len(cfg))
for _, c := range cfg {
var m mapstr.M
err := c.Config.Unpack(&m)
if err != nil {
return nil, err
}
res = append(res, map[string]interface{}(m))
}
return res, nil
}
51 changes: 51 additions & 0 deletions x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"source": {
"data_stream": {
"namespace": "default"
},
"id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"meta": {
"package": {
"name": "osquery_manager",
"version": "1.12.1"
}
},
"name": "osquery_manager-1",
"package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"policy": {
"revision": 2
},
"revision": 1,
"streams": [
],
"type": "osquery"
},
"id": "74c7d0a8-ce04-4663-95da-ff6d537c268c",
"type": "osquery",
"name": "osquery_manager-1",
"revision": 1,
"meta": {
"source": {
"package": {
"name": "osquery_manager",
"version": "1.12.1"
}
},
"package": {
"source": {
"name": "osquery_manager",
"version": "1.12.1"
},
"name": "osquery_manager",
"version": "1.12.1"
}
},
"data_stream": {
"source": {
"namespace": "default"
},
"namespace": "default"
},
"streams": [
]
}
Loading
Loading