Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KATC] Update config schema, including overlays #1772

Merged
merged 21 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
88097df
Rename source column to path
RebeccaMahany Jul 3, 2024
93aa764
Rename Source to SourcePaths and support multiple paths
RebeccaMahany Jul 3, 2024
77a31a6
Rename query to source_query
RebeccaMahany Jul 3, 2024
b3f251c
Fix typos in test
RebeccaMahany Jul 3, 2024
5fa2de9
Rename Platform to Filter
RebeccaMahany Jul 3, 2024
03777f0
Move name inside config and make config a list of tables instead of a…
RebeccaMahany Jul 3, 2024
4d68485
Merge remote-tracking branch 'upstream/main' into becca/katc-cfg-update
RebeccaMahany Jul 3, 2024
2367637
Rename sourceConstraints to pathConstraints in function signature
RebeccaMahany Jul 3, 2024
479571c
Add comment
RebeccaMahany Jul 8, 2024
e2c93da
Implement overlays
RebeccaMahany Jul 8, 2024
b698ac7
Cleanup
RebeccaMahany Jul 8, 2024
cef7a88
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 8, 2024
2d276fd
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 10, 2024
2efd17e
dataFunc should take queryContext instead of constraint list for grea…
RebeccaMahany Jul 11, 2024
50b478e
Standardize log messages a little
RebeccaMahany Jul 12, 2024
8894bc2
Add config consumer that can handle non-string values
RebeccaMahany Jul 12, 2024
70ec1c9
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 15, 2024
4b9b56b
Update comments for clarity
RebeccaMahany Jul 15, 2024
428a872
Remove source paths from logs to avoid overly verbose logs
RebeccaMahany Jul 15, 2024
8c46927
Fix comments
RebeccaMahany Jul 15, 2024
606ade6
Combine table definition structs
RebeccaMahany Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
// agentFlagConsumer handles agent flags pushed from the control server
controlService.RegisterConsumer(agentFlagsSubsystemName, keyvalueconsumer.New(flagController))
// katcConfigConsumer handles updates to Kolide's custom ATC tables
controlService.RegisterConsumer(katcSubsystemName, keyvalueconsumer.New(k.KatcConfigStore()))
controlService.RegisterConsumer(katcSubsystemName, keyvalueconsumer.NewConfigConsumer(k.KatcConfigStore()))
controlService.RegisterSubscriber(katcSubsystemName, osqueryRunner)
controlService.RegisterSubscriber(katcSubsystemName, startupSettingsWriter)

Expand Down
47 changes: 47 additions & 0 deletions ee/control/consumers/keyvalueconsumer/config_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package keyvalueconsumer

import (
"encoding/json"
"errors"
"fmt"
"io"

"github.com/kolide/launcher/ee/agent/types"
)

type ConfigConsumer struct {
updater types.Updater
}

func NewConfigConsumer(updater types.Updater) *ConfigConsumer {
c := &ConfigConsumer{
updater: updater,
}

return c
}

func (c *ConfigConsumer) Update(data io.Reader) error {
if c == nil {
return errors.New("key value consumer is nil")
}

var kvPairs map[string]any
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovered when testing with @jbeker that the existing KV consumer in this package does not work for this config because Golang won't unmarshal the config as a map[string]string since it's a more complex object, which makes sense.

This is a quick fix, but maybe we want something cleverer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud....

Hrm. To be honest, I had to pull them down and diff them to understand the difference.

If I read this right, the difference is that in the new case, k2 is sending map[string]any, while the old case is map[string]string.

The thing that seems weird, is that this is then json.Marshal back down to map[string]string I assume because the underlying ~everything expects that.

So I wonder... If we're going to json.Marshal down to strings anyhow, is it cleaner if K2 does that for us? It might not be, it's sorta of nice of the API is just the json, and not marshalled json. But I want to ask...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this handles the error that Jeremy and I were seeing: failed to decode key-value json: json: cannot unmarshal object into Go value of type string.

I wasn't sure if it would be cleaner for K2 to update or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be pretty trivial on the k2 side. And I think it's pretty trivial here.

So I think we get to decide what feels like a cleaner protocol.

Having everything as a string feels clean, but having the KATC definitions marshalled into strings feels kinda weird. 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I like it this way a little better, then -- it feels tidy that k2 doesn't need to know about the marshalling and unmarshalling to a string; launcher handles that itself for ease of storage in bucket/startup_settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's step towards a more structured interface and API between k2 and launcher. (vs what we have now, which feels like it requires a very complete knowledge of all the parts)

if err := json.NewDecoder(data).Decode(&kvPairs); err != nil {
return fmt.Errorf("failed to decode key-value json: %w", err)
}

kvStringPairs := make(map[string]string)
for k, v := range kvPairs {
b, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("unable to marshal value for `%s`: %w", k, err)
}
kvStringPairs[k] = string(b)
}

// Turn the map into a slice of key, value, ... and send it to the thing storing this data
_, err := c.updater.Update(kvStringPairs)

return err
}
44 changes: 26 additions & 18 deletions ee/katc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"runtime"

"github.com/kolide/launcher/ee/indexeddb"
"github.com/osquery/osquery-go"
Expand All @@ -16,8 +15,9 @@ import (
// identifier parsed from the JSON KATC config, and the `dataFunc` is the function
// that performs the query against the source.
type katcSourceType struct {
name string
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error)
name string
// queryContext contains the constraints from the WHERE clause of the query against the KATC table.
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, queryContext table.QueryContext) ([]sourceData, error)
}

// sourceData holds the result of calling `katcSourceType.dataFunc`. It maps the
Expand Down Expand Up @@ -98,36 +98,44 @@ func (r *rowTransformStep) UnmarshalJSON(data []byte) error {
}
}

// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
type katcTableConfig struct {
SourceType katcSourceType `json:"source_type"`
Source string `json:"source"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
Platform string `json:"platform"`
Columns []string `json:"columns"`
Query string `json:"query"` // Query to run against `path`
RowTransformSteps []rowTransformStep `json:"row_transform_steps"`
}
type (
// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
katcTableConfig struct {
Columns []string `json:"columns"`
katcTableDefinition
Overlays []katcTableConfigOverlay `json:"overlays"`
}

katcTableConfigOverlay struct {
Filters map[string]string `json:"filters"` // determines if this overlay is applicable to this launcher installation
katcTableDefinition
}

katcTableDefinition struct {
SourceType *katcSourceType `json:"source_type,omitempty"`
SourcePaths *[]string `json:"source_paths,omitempty"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
SourceQuery *string `json:"source_query,omitempty"` // Query to run against each source path
RowTransformSteps *[]rowTransformStep `json:"row_transform_steps,omitempty"`
}
)

// ConstructKATCTables takes stored configuration of KATC tables, parses the configuration,
// and returns the constructed tables.
func ConstructKATCTables(config map[string]string, slogger *slog.Logger) []osquery.OsqueryPlugin {
plugins := make([]osquery.OsqueryPlugin, 0)

for tableName, tableConfigStr := range config {
var cfg katcTableConfig
if err := json.Unmarshal([]byte(tableConfigStr), &cfg); err != nil {
slogger.Log(context.TODO(), slog.LevelWarn,
"unable to unmarshal config for Kolide ATC table, skipping",
"unable to unmarshal config for KATC table, skipping",
"table_name", tableName,
"err", err,
)
continue
}

if cfg.Platform != runtime.GOOS {
continue
}

t, columns := newKatcTable(tableName, cfg, slogger)
plugins = append(plugins, table.NewPlugin(tableName, columns, t.generate))
}
Expand Down
108 changes: 75 additions & 33 deletions ee/katc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,96 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "snappy_sqlite",
katcConfig: map[string]string{
"kolide_snappy_sqlite_test": fmt.Sprintf(`{
"kolide_snappy_sqlite_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "indexeddb_leveldb",
katcConfig: map[string]string{
"kolide_indexeddb_leveldb_test": fmt.Sprintf(`{
"kolide_indexeddb_leveldb_test": `{
"source_type": "indexeddb_leveldb",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.indexeddb.leveldb",
"query": "db.store",
"row_transform_steps": ["deserialize_chrome"]
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "overlay",
katcConfig: map[string]string{
"kolide_overlay_test": fmt.Sprintf(`{
"source_type": "indexeddb_leveldb",
"columns": ["data"],
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": [
{
"filters": {
"goos": "%s"
},
"source_paths": ["/some/different/path/to/db.indexeddb.leveldb"]
}
]
}`, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "multiple plugins",
katcConfig: map[string]string{
"test_1": fmt.Sprintf(`{
"test_1": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"test_2": fmt.Sprintf(`{
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
"test_2": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["col1", "col2"],
"source": "/some/path/to/a/different/db.sqlite",
"query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"],
"overlays": []
}`,
},
expectedPluginCount: 2,
},
{
testCaseName: "skips invalid tables and returns valid tables",
katcConfig: map[string]string{
"not_a_valid_table": `{
"source_type": "not a real type",
"columns": ["col1", "col2"],
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["not a real row transform step"],
"overlays": []
}`,
"valid_table": `{
"source_type": "sqlite",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "malformed config",
katcConfig: map[string]string{
Expand All @@ -78,27 +121,26 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "invalid table source",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "unknown_source",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;"
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"overlays": []
}`,
},
expectedPluginCount: 0,
},
{
testCaseName: "invalid data processing step type",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["unknown_step"]
}`, runtime.GOOS),
}`,
},
expectedPluginCount: 0,
},
Expand Down
58 changes: 32 additions & 26 deletions ee/katc/indexeddb_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,45 @@ import (
// found at the filepath in `sourcePattern`. It retrieves all rows from the database
// and object store specified in `query`, which it expects to be in the format
// `<db name>.<object store name>`.
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("getting db and object store names: %w", err)
}
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, queryContext table.QueryContext) ([]sourceData, error) {
// Pull out path constraints from the query against the KATC table, to avoid querying more leveldb files than we need to.
pathConstraintsFromQuery := getPathConstraint(queryContext)

// Query databases
results := make([]sourceData, 0)
for _, db := range leveldbs {
// Check to make sure `db` adheres to sourceConstraints
valid, err := checkSourceConstraints(db, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
for _, db := range leveldbs {
// Check to make sure `db` adheres to pathConstraintsFromQuery. This is an
// optimization to avoid work, if osquery sqlite filtering is going to exclude it.
valid, err := checkPathConstraints(db, pathConstraintsFromQuery)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
Loading
Loading