Skip to content

Commit

Permalink
[FR] Refactor to more seamlessly support multiple DAC approaches (#3407)
Browse files Browse the repository at this point in the history
* [FR] Add custom rule directory support
* [FR] Add support for configurable tests and validation
* [FR] Add support to decouple actions and exceptions
* update actions schema
* add custom-rules init-config command
* update docs
---------

Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com>
  • Loading branch information
brokensound77 and brokensound77 authored Apr 26, 2024
1 parent c567d37 commit 303a64e
Show file tree
Hide file tree
Showing 35 changed files with 1,158 additions and 154 deletions.
15 changes: 4 additions & 11 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ the [README](README.md). Basic use of the CLI such as [creating a rule](CONTRIBU
[testing](CONTRIBUTING.md#testing-a-rule-with-the-cli) are referenced in the [contribution guide](CONTRIBUTING.md).


## Using a config file or environment variables
## Using a user config file or environment variables

CLI commands which are tied to Kibana and Elasticsearch are capable of parsing auth-related keyword args from a config
file or environment variables.
Expand All @@ -17,9 +17,9 @@ follows:
* config values
* prompt (this only applies to certain values)

#### Setup a config file
#### Setup a user config file

In the root directory of this repo, create the file `.detection-rules-cfg.json` and add relevant values
In the root directory of this repo, create the file `.detection-rules-cfg.json` (or `.yaml`) and add relevant values

Currently supported arguments:
* elasticsearch_url
Expand All @@ -34,13 +34,6 @@ Environment variables using the argument format: `DR_<UPPERCASED_ARG_NAME>` will
EX: `DR_USER=joe`


Using the environment variable `DR_BYPASS_NOTE_VALIDATION_AND_PARSE` will bypass the Detection Rules validation on the `note` field in toml files.

Using the environment variable `DR_BYPASS_BBR_LOOKBACK_VALIDATION` will bypass the Detection Rules lookback and interval validation
on the building block rules.

Using the environment variable `DR_BYPASS_TAGS_VALIDATION` will bypass the Detection Rules Unit Tests on the `tags` field in toml files.

## Importing rules into the repo

You can import rules into the repo using the `create-rule` or `import-rules` commands. Both of these commands will
Expand Down Expand Up @@ -646,4 +639,4 @@ value = "fast"
```

The easiest way to _update_ a rule with existing transform entries is to use `guide-plugin-convert` and manually add it
to the rule.
to the rule.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ This repository was first announced on Elastic's blog post, [Elastic Security op

Detection Rules contains more than just static rule files. This repository also contains code for unit testing in Python and integrating with the Detection Engine in Kibana.

| folder | description |
|------------------------------------------------ |------------------------------------------------------------------------------------ |
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`etc/`](detection_rules/etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](lib/kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](lib/kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`rules_building_block/`](rules_building_block) | Root directory where building block rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |
| folder | description |
|------------------------------------------------ |--------------------------------------------------------------------------------------------|
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`docs`](docs) | Additional, more verbose documentation for the repository |
| [`etc/`](detection_rules/etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](lib/kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](lib/kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`rules_building_block/`](rules_building_block) | Root directory where building block rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |


## Getting started
Expand Down
2 changes: 2 additions & 0 deletions detection_rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported"

from . import ( # noqa: E402
custom_rules,
devtools,
docs,
eswrap,
Expand All @@ -28,6 +29,7 @@
)

__all__ = (
'custom_rules',
'devtools',
'docs',
'eswrap',
Expand Down
64 changes: 64 additions & 0 deletions detection_rules/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Dataclasses for Action."""
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional

from .mixins import MarshmallowDataclassMixin
from .schemas import definitions


@dataclass(frozen=True)
class ActionMeta(MarshmallowDataclassMixin):
"""Data stored in an exception's [metadata] section of TOML."""
creation_date: definitions.Date
rule_id: List[definitions.UUIDString]
rule_name: str
updated_date: definitions.Date

# Optional fields
deprecation_date: Optional[definitions.Date]
comments: Optional[str]
maturity: Optional[definitions.Maturity]


@dataclass
class Action(MarshmallowDataclassMixin):
"""Data object for rule Action."""
@dataclass
class ActionParams:
"""Data object for rule Action params."""
body: str

action_type_id: definitions.ActionTypeId
group: str
params: ActionParams
id: Optional[str]
frequency: Optional[dict]
alerts_filter: Optional[dict]


@dataclass(frozen=True)
class TOMLActionContents(MarshmallowDataclassMixin):
"""Object for action from TOML file."""
metadata: ActionMeta
actions: List[Action]


@dataclass(frozen=True)
class TOMLAction:
"""Object for action from TOML file."""
contents: TOMLActionContents
path: Path

@property
def name(self):
return self.contents.metadata.rule_name

@property
def id(self):
return self.contents.metadata.rule_id
8 changes: 4 additions & 4 deletions detection_rules/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from .attack import matrix, tactics, build_threat_map_entry
from .rule import TOMLRule, TOMLRuleContents
from .rule_loader import (RuleCollection,
DEFAULT_RULES_DIR,
DEFAULT_BBR_DIR,
DEFAULT_PREBUILT_RULES_DIR,
DEFAULT_PREBUILT_BBR_DIR,
dict_filter)
from .schemas import definitions
from .utils import clear_caches, get_path
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_collection(*args, **kwargs):
rules.load_directories(Path(d) for d in directories)

if rule_id:
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
rules.load_directories((DEFAULT_PREBUILT_RULES_DIR, DEFAULT_PREBUILT_BBR_DIR),
obj_filter=dict_filter(rule__rule_id=rule_id))
if len(rules) != 1:
client_error(f"Could not find rule with ID {rule_id}")
Expand Down Expand Up @@ -83,7 +83,7 @@ def get_collection(*args, **kwargs):
rules.load_directories(Path(d) for d in directories)

if rule_id:
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
rules.load_directories((DEFAULT_PREBUILT_RULES_DIR, DEFAULT_PREBUILT_BBR_DIR),
obj_filter=dict_filter(rule__rule_id=rule_id))
found_ids = {rule.id for rule in rules}
missing = set(rule_id).difference(found_ids)
Expand Down
217 changes: 217 additions & 0 deletions detection_rules/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Configuration support for custom components."""
import fnmatch
import os
from dataclasses import dataclass
from pathlib import Path
from functools import cached_property
from typing import Dict, List, Optional

import yaml
from eql.utils import load_dump

from .misc import discover_tests
from .utils import cached, load_etc_dump, get_etc_path

ROOT_DIR = Path(__file__).parent.parent
CUSTOM_RULES_DIR = os.getenv('CUSTOM_RULES_DIR', None)


@dataclass
class UnitTest:
"""Base object for unit tests configuration."""
bypass: Optional[List[str]] = None
test_only: Optional[List[str]] = None

def __post_init__(self):
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass'


@dataclass
class RuleValidation:
"""Base object for rule validation configuration."""
bypass: Optional[List[str]] = None
test_only: Optional[List[str]] = None

def __post_init__(self):
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass'


@dataclass
class TestConfig:
"""Detection rules test config file"""
test_file: Optional[Path] = None
unit_tests: Optional[UnitTest] = None
rule_validation: Optional[RuleValidation] = None

@classmethod
def from_dict(cls, test_file: Optional[Path] = None, unit_tests: Optional[dict] = None,
rule_validation: Optional[dict] = None):
return cls(test_file=test_file or None, unit_tests=UnitTest(**unit_tests or {}),
rule_validation=RuleValidation(**rule_validation or {}))

@cached_property
def all_tests(self):
"""Get the list of all test names."""
return discover_tests()

def tests_by_patterns(self, *patterns: str) -> List[str]:
"""Get the list of test names by patterns."""
tests = set()
for pattern in patterns:
tests.update(list(fnmatch.filter(self.all_tests, pattern)))
return sorted(tests)

@staticmethod
def parse_out_patterns(names: List[str]) -> (List[str], List[str]):
"""Parse out test patterns from a list of test names."""
patterns = []
tests = []
for name in names:
if name.startswith('pattern:') and '*' in name:
patterns.append(name[len('pattern:'):])
else:
tests.append(name)
return patterns, tests

@staticmethod
def format_tests(tests: List[str]) -> List[str]:
"""Format unit test names into expected format for direct calling."""
raw = [t.rsplit('.', maxsplit=2) for t in tests]
formatted = []
for test in raw:
path, clazz, method = test
path = f'{path.replace(".", os.path.sep)}.py'
formatted.append('::'.join([path, clazz, method]))
return formatted

def get_test_names(self, formatted: bool = False) -> (List[str], List[str]):
"""Get the list of test names to run."""
patterns_t, tests_t = self.parse_out_patterns(self.unit_tests.test_only or [])
patterns_b, tests_b = self.parse_out_patterns(self.unit_tests.bypass or [])
defined_tests = tests_t + tests_b
patterns = patterns_t + patterns_b
unknowns = sorted(set(defined_tests) - set(self.all_tests))
assert not unknowns, f'Unrecognized test names in config ({self.test_file}): {unknowns}'

combined_tests = sorted(set(defined_tests + self.tests_by_patterns(*patterns)))

if self.unit_tests.test_only is not None:
tests = combined_tests
skipped = [t for t in self.all_tests if t not in tests]
elif self.unit_tests.bypass:
tests = []
skipped = []
for test in self.all_tests:
if test not in combined_tests:
tests.append(test)
else:
skipped.append(test)
else:
tests = self.all_tests
skipped = []

if formatted:
return self.format_tests(tests), self.format_tests(skipped)
else:
return tests, skipped

def check_skip_by_rule_id(self, rule_id: str) -> bool:
"""Check if a rule_id should be skipped."""
bypass = self.rule_validation.bypass
test_only = self.rule_validation.test_only

# neither bypass nor test_only are defined, so no rules are skipped
if not (bypass or test_only):
return False
# if defined in bypass or not defined in test_only, then skip
return (bypass and rule_id in bypass) or (test_only and rule_id not in test_only)


@dataclass
class RulesConfig:
"""Detection rules config file."""
deprecated_rules_file: Path
deprecated_rules: Dict[str, dict]
packages_file: Path
packages: Dict[str, dict]
rule_dirs: List[Path]
stack_schema_map_file: Path
stack_schema_map: Dict[str, dict]
test_config: TestConfig
version_lock_file: Path
version_lock: Dict[str, dict]

action_dir: Optional[Path] = None
exception_dir: Optional[Path] = None


@cached
def parse_rules_config(path: Optional[Path] = None) -> RulesConfig:
"""Parse the _config.yaml file for default or custom rules."""
if path:
assert path.exists(), f'rules config file does not exist: {path}'
loaded = yaml.safe_load(path.read_text())
elif CUSTOM_RULES_DIR:
path = Path(CUSTOM_RULES_DIR) / '_config.yaml'
assert path.exists(), f'_config.yaml file missing in {CUSTOM_RULES_DIR}'
loaded = yaml.safe_load(path.read_text())
else:
path = Path(get_etc_path('_config.yaml'))
loaded = load_etc_dump('_config.yaml')

assert loaded, f'No data loaded from {path}'

base_dir = path.resolve().parent

# testing
# precedence to the environment variable
# environment variable is absolute path and config file is relative to the _config.yaml file
test_config_ev = os.getenv('DETECTION_RULES_TEST_CONFIG', None)
if test_config_ev:
test_config_path = Path(test_config_ev)
else:
test_config_file = loaded.get('testing', {}).get('config')
if test_config_file:
test_config_path = base_dir.joinpath(test_config_file)
else:
test_config_path = None

if test_config_path:
test_config_data = yaml.safe_load(test_config_path.read_text())

# overwrite None with empty list to allow implicit exemption of all tests with `test_only` defined to None in
# test config
if 'unit_tests' in test_config_data and test_config_data['unit_tests'] is not None:
test_config_data['unit_tests'] = {k: v or [] for k, v in test_config_data['unit_tests'].items()}
test_config = TestConfig.from_dict(test_file=test_config_path, **test_config_data)
else:
test_config = TestConfig.from_dict()

# files
# paths are relative
files = {f'{k}_file': base_dir.joinpath(v) for k, v in loaded['files'].items()}
contents = {k: load_dump(str(base_dir.joinpath(v))) for k, v in loaded['files'].items()}
contents.update(**files)

# directories
# paths are relative
if loaded.get('directories'):
contents.update({k: base_dir.joinpath(v) for k, v in loaded['directories'].items()})

# rule_dirs
# paths are relative
contents['rule_dirs'] = [base_dir.joinpath(d) for d in loaded.get('rule_dirs', [])]

rules_config = RulesConfig(test_config=test_config, **contents)
return rules_config


@cached
def load_current_package_version() -> str:
"""Load the current package version from config file."""
return parse_rules_config().packages['package']['name']
Loading

0 comments on commit 303a64e

Please sign in to comment.