Skip to content

Commit

Permalink
Add versioned schemas with a downgrade path (#84)
Browse files Browse the repository at this point in the history
* Add versioned schemas with a downgrade path
* Remove and move unused variables
* Add missing license
* Skip NotField for output_index
* Add strip_additional_properties for kibana import
* Remove stray comment
* Apply suggestions from code review

Co-authored-by: Justin Ibarra <brokensound77@users.noreply.github.com>
  • Loading branch information
rw-access and brokensound77 authored Jul 23, 2020
1 parent 4ba23ad commit d15da0a
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 299 deletions.
4 changes: 2 additions & 2 deletions detection_rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import misc
from . import rule_formatter
from . import rule_loader
from . import schema
from . import schemas
from . import utils

__all__ = (
Expand All @@ -19,6 +19,6 @@
'misc',
'rule_formatter',
'rule_loader',
'schema',
'schemas',
'utils',
)
4 changes: 2 additions & 2 deletions detection_rules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR
from .rule import Rule
from .rule_formatter import toml_write
from .schema import RULE_TYPES
from .schemas import CurrentSchema
from .utils import get_path, clear_caches


Expand All @@ -36,7 +36,7 @@ def root():
@click.argument('path', type=click.Path(dir_okay=False))
@click.option('--config', '-c', type=click.Path(exists=True, dir_okay=False), help='Rule or config file')
@click.option('--required-only', is_flag=True, help='Only prompt for required fields')
@click.option('--rule-type', '-t', type=click.Choice(RULE_TYPES), help='Type of rule to create')
@click.option('--rule-type', '-t', type=click.Choice(CurrentSchema.RULE_TYPES), help='Type of rule to create')
def create_rule(path, config, required_only, rule_type):
"""Create a detection rule."""
config = load_dump(config) if config else {}
Expand Down
2 changes: 1 addition & 1 deletion detection_rules/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from collections import defaultdict

from .schema import validate_rta_mapping
from .schemas import validate_rta_mapping
from .utils import load_etc_dump, save_etc_dump, get_path


Expand Down
30 changes: 18 additions & 12 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from . import ecs, beats
from .attack import TACTICS, build_threat_map_entry, technique_lookup
from .rule_formatter import nested_normalize, toml_write
from .schema import RULE_TYPES, metadata_schema, schema_validate, get_schema
from .schemas import CurrentSchema, TomlMetadata # RULE_TYPES, metadata_schema, schema_validate, get_schema
from .utils import get_path, clear_caches, cached


Expand Down Expand Up @@ -98,13 +98,13 @@ def to_eql(self):
@cached
def get_meta_schema_required_defaults():
"""Get the default values for required properties in the metadata schema."""
required = [v for v in metadata_schema['required']]
properties = {k: v for k, v in metadata_schema['properties'].items() if k in required}
required = [v for v in TomlMetadata.get_schema()['required']]
properties = {k: v for k, v in TomlMetadata.get_schema()['properties'].items() if k in required}
return {k: v.get('default') or [v['items']['default']] for k, v in properties.items()}

def set_metadata(self, contents):
"""Parse metadata fields and set missing required fields to the default values."""
metadata = {k: v for k, v in contents.items() if k in metadata_schema['properties']}
metadata = {k: v for k, v in contents.items() if k in TomlMetadata.get_schema()['properties']}
defaults = self.get_meta_schema_required_defaults().copy()
defaults.update(metadata)
return defaults
Expand Down Expand Up @@ -141,9 +141,16 @@ def validate(self, as_rule=False, versioned=False, query=True):
self.normalize()

if as_rule:
schema_validate(self.rule_format(), as_rule=True)
schema_cls = CurrentSchema.toml_schema()
contents = self.rule_format()
elif versioned:
schema_cls = CurrentSchema.versioned()
contents = self.contents
else:
schema_validate(self.contents, versioned=versioned)
schema_cls = CurrentSchema
contents = self.contents

schema_cls.validate(contents, role=self.type)

if query and self.query and self.contents['language'] == 'kuery':
ecs_versions = self.metadata.get('ecs_version')
Expand Down Expand Up @@ -204,14 +211,13 @@ def get_hash(self):
def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwargs):
"""Build a rule from data and prompts."""
from .misc import schema_prompt
# from .rule_loader import rta_mappings

kwargs = copy.deepcopy(kwargs)

while rule_type not in RULE_TYPES:
rule_type = click.prompt('Rule type ({})'.format(', '.join(RULE_TYPES)))
rule_type = click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))

schema = get_schema(rule_type)
schema = CurrentSchema.get_schema(role=rule_type)
props = schema['properties']
opt_reqs = schema.get('required', [])
contents = {}
Expand Down Expand Up @@ -269,12 +275,12 @@ def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwarg

metadata = {}
ecs_version = schema_prompt('ecs_version', required=False, value=None,
**metadata_schema['properties']['ecs_version'])
**TomlMetadata.get_schema()['properties']['ecs_version'])
if ecs_version:
metadata['ecs_version'] = ecs_version

# validate before creating
schema_validate(contents)
CurrentSchema.toml_schema().validate(contents)

suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
Expand Down
6 changes: 3 additions & 3 deletions detection_rules/rule_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import toml

from .schema import NONFORMATTED_FIELDS
from .schemas import CurrentSchema

SQ = "'"
DQ = '"'
Expand All @@ -34,7 +34,7 @@ def nested_normalize(d, skip_cleanup=False):
if k == 'query':
# TODO: the linter still needs some work, but once up to par, uncomment to implement - kql.lint(v)
d.update({k: nested_normalize(v)})
elif k in NONFORMATTED_FIELDS:
elif k in CurrentSchema.markdown_fields():
# let these maintain newlines and whitespace for markdown support
d.update({k: nested_normalize(v, skip_cleanup=True)})
else:
Expand Down Expand Up @@ -160,7 +160,7 @@ def _do_write(_data, _contents):
bottom[k] = v
else:
top[k] = v
elif k in NONFORMATTED_FIELDS:
elif k in CurrentSchema.markdown_fields():
top[k] = NonformattedField(v)
else:
top[k] = v
Expand Down
4 changes: 2 additions & 2 deletions detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .mappings import RtaMappings
from .rule import RULES_DIR, Rule
from .schema import get_schema
from .schemas import CurrentSchema
from .utils import get_path, cached


Expand Down Expand Up @@ -171,7 +171,7 @@ def get_production_rules():

def find_unneeded_defaults(rule):
"""Remove values that are not required in the schema which are set with default values."""
schema = get_schema(rule.contents['type'])
schema = CurrentSchema.get_schema(rule.type)
props = schema['properties']
unrequired_defaults = [p for p in props if p not in schema['required'] and props[p].get('default')]
default_matches = {p: rule.contents[p] for p in unrequired_defaults
Expand Down
Loading

0 comments on commit d15da0a

Please sign in to comment.