Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add versioned schemas with a downgrade path #84

Merged
merged 8 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions detection_rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import misc
from . import rule_formatter
from . import rule_loader
from . import schema
from . import schemas
from . import utils

__all__ = (
Expand All @@ -19,6 +19,6 @@
'misc',
'rule_formatter',
'rule_loader',
'schema',
'schemas',
'utils',
)
4 changes: 2 additions & 2 deletions detection_rules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR
from .rule import Rule
from .rule_formatter import toml_write
from .schema import RULE_TYPES
from .schemas import CurrentSchema
from .utils import get_path, clear_caches


Expand All @@ -36,7 +36,7 @@ def root():
@click.argument('path', type=click.Path(dir_okay=False))
@click.option('--config', '-c', type=click.Path(exists=True, dir_okay=False), help='Rule or config file')
@click.option('--required-only', is_flag=True, help='Only prompt for required fields')
@click.option('--rule-type', '-t', type=click.Choice(RULE_TYPES), help='Type of rule to create')
@click.option('--rule-type', '-t', type=click.Choice(CurrentSchema.RULE_TYPES), help='Type of rule to create')
def create_rule(path, config, required_only, rule_type):
"""Create a detection rule."""
config = load_dump(config) if config else {}
Expand Down
2 changes: 1 addition & 1 deletion detection_rules/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from collections import defaultdict

from .schema import validate_rta_mapping
from .schemas import validate_rta_mapping
from .utils import load_etc_dump, save_etc_dump, get_path


Expand Down
30 changes: 18 additions & 12 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from . import ecs, beats
from .attack import TACTICS, build_threat_map_entry, technique_lookup
from .rule_formatter import nested_normalize, toml_write
from .schema import RULE_TYPES, metadata_schema, schema_validate, get_schema
from .schemas import CurrentSchema, TomlMetadata # RULE_TYPES, metadata_schema, schema_validate, get_schema
from .utils import get_path, clear_caches, cached


Expand Down Expand Up @@ -98,13 +98,13 @@ def to_eql(self):
@cached
def get_meta_schema_required_defaults():
"""Get the default values for required properties in the metadata schema."""
required = [v for v in metadata_schema['required']]
properties = {k: v for k, v in metadata_schema['properties'].items() if k in required}
required = [v for v in TomlMetadata.get_schema()['required']]
properties = {k: v for k, v in TomlMetadata.get_schema()['properties'].items() if k in required}
return {k: v.get('default') or [v['items']['default']] for k, v in properties.items()}

def set_metadata(self, contents):
"""Parse metadata fields and set missing required fields to the default values."""
metadata = {k: v for k, v in contents.items() if k in metadata_schema['properties']}
metadata = {k: v for k, v in contents.items() if k in TomlMetadata.get_schema()['properties']}
defaults = self.get_meta_schema_required_defaults().copy()
defaults.update(metadata)
return defaults
Expand Down Expand Up @@ -141,9 +141,16 @@ def validate(self, as_rule=False, versioned=False, query=True):
self.normalize()

if as_rule:
schema_validate(self.rule_format(), as_rule=True)
schema_cls = CurrentSchema.toml_schema()
contents = self.rule_format()
elif versioned:
schema_cls = CurrentSchema.versioned()
contents = self.contents
else:
schema_validate(self.contents, versioned=versioned)
schema_cls = CurrentSchema
contents = self.contents

schema_cls.validate(contents, role=self.type)

if query and self.query and self.contents['language'] == 'kuery':
ecs_versions = self.metadata.get('ecs_version')
Expand Down Expand Up @@ -204,14 +211,13 @@ def get_hash(self):
def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwargs):
"""Build a rule from data and prompts."""
from .misc import schema_prompt
# from .rule_loader import rta_mappings

kwargs = copy.deepcopy(kwargs)

while rule_type not in RULE_TYPES:
rule_type = click.prompt('Rule type ({})'.format(', '.join(RULE_TYPES)))
rule_type = click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))

schema = get_schema(rule_type)
schema = CurrentSchema.get_schema(role=rule_type)
props = schema['properties']
opt_reqs = schema.get('required', [])
contents = {}
Expand Down Expand Up @@ -269,12 +275,12 @@ def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwarg

metadata = {}
ecs_version = schema_prompt('ecs_version', required=False, value=None,
**metadata_schema['properties']['ecs_version'])
**TomlMetadata.get_schema()['properties']['ecs_version'])
if ecs_version:
metadata['ecs_version'] = ecs_version

# validate before creating
schema_validate(contents)
CurrentSchema.toml_schema().validate(contents)

suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
Expand Down
6 changes: 3 additions & 3 deletions detection_rules/rule_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import toml

from .schema import NONFORMATTED_FIELDS
from .schemas import CurrentSchema

SQ = "'"
DQ = '"'
Expand All @@ -34,7 +34,7 @@ def nested_normalize(d, skip_cleanup=False):
if k == 'query':
# TODO: the linter still needs some work, but once up to par, uncomment to implement - kql.lint(v)
d.update({k: nested_normalize(v)})
elif k in NONFORMATTED_FIELDS:
elif k in CurrentSchema.markdown_fields():
# let these maintain newlines and whitespace for markdown support
d.update({k: nested_normalize(v, skip_cleanup=True)})
else:
Expand Down Expand Up @@ -160,7 +160,7 @@ def _do_write(_data, _contents):
bottom[k] = v
else:
top[k] = v
elif k in NONFORMATTED_FIELDS:
elif k in CurrentSchema.markdown_fields():
top[k] = NonformattedField(v)
else:
top[k] = v
Expand Down
4 changes: 2 additions & 2 deletions detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .mappings import RtaMappings
from .rule import RULES_DIR, Rule
from .schema import get_schema
from .schemas import CurrentSchema
from .utils import get_path, cached


Expand Down Expand Up @@ -171,7 +171,7 @@ def get_production_rules():

def find_unneeded_defaults(rule):
"""Remove values that are not required in the schema which are set with default values."""
schema = get_schema(rule.contents['type'])
schema = CurrentSchema.get_schema(rule.type)
props = schema['properties']
unrequired_defaults = [p for p in props if p not in schema['required'] and props[p].get('default')]
default_matches = {p: rule.contents[p] for p in unrequired_defaults
Expand Down
Loading