From fb3c06735406d6106e6ea9151e647f1f99384711 Mon Sep 17 00:00:00 2001 From: janosch Date: Thu, 5 Sep 2024 11:19:45 +0000 Subject: [PATCH 01/14] DFIQ Analyzer implementation * Dynamic import of analyzers * Integration into the analyzer framework * Trigger via DFIQ Approaches being added to a sketch --- timesketch/api/v1/resources/analysis.py | 27 ++- timesketch/api/v1/resources/scenarios.py | 6 +- timesketch/lib/analyzers/__init__.py | 1 + .../lib/analyzers/dfiq_plugins/__init__.py | 51 ++++++ .../lib/analyzers/dfiq_plugins/manager.py | 173 ++++++++++++++++++ timesketch/lib/analyzers/interface.py | 1 + timesketch/lib/analyzers/manager.py | 12 +- timesketch/lib/tasks.py | 6 +- timesketch/models/sketch.py | 16 ++ 9 files changed, 280 insertions(+), 13 deletions(-) create mode 100644 timesketch/lib/analyzers/dfiq_plugins/__init__.py create mode 100644 timesketch/lib/analyzers/dfiq_plugins/manager.py diff --git a/timesketch/api/v1/resources/analysis.py b/timesketch/api/v1/resources/analysis.py index e141f80421..5335c92916 100644 --- a/timesketch/api/v1/resources/analysis.py +++ b/timesketch/api/v1/resources/analysis.py @@ -180,6 +180,7 @@ def get(self, sketch_id): * display_name: Display name of the analyzer for the UI * description: Description of the analyzer provided in the class * is_multi: Boolean indicating if the analyzer is a multi analyzer + * is_dfiq: Boolean indicating if the analyzer is a dfiq analyzer """ sketch = Sketch.get_with_acl(sketch_id) if not sketch: @@ -188,22 +189,29 @@ def get(self, sketch_id): abort( HTTP_STATUS_CODE_FORBIDDEN, "User does not have read access to sketch" ) - analyzers = [x for x, y in analyzer_manager.AnalysisManager.get_analyzers()] - analyzers = analyzer_manager.AnalysisManager.get_analyzers() + include_dfiq = ( + request.args.get("include_dfiq", default="false").lower() == "true" + ) + + # This line feels double the work, do we need this? + # analyzers = [x for x, y in analyzer_manager.AnalysisManager.get_analyzers()] + + analyzers = analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=include_dfiq + ) analyzers_detail = [] for analyzer_name, analyzer_class in analyzers: # TODO: update the multi_analyzer detection logic for edgecases # where analyzers are using custom parameters (e.g. misp) - multi = False - if len(analyzer_class.get_kwargs()) > 0: - multi = True analyzers_detail.append( { "name": analyzer_name, "display_name": analyzer_class.DISPLAY_NAME, "description": analyzer_class.DESCRIPTION, - "is_multi": multi, + "is_multi": len(analyzer_class.get_kwargs()) > 0, + "is_dfiq": hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER, } ) @@ -266,8 +274,12 @@ def post(self, sketch_id): if form.get("analyzer_force_run"): analyzer_force_run = True + include_dfiq = False + if form.get("include_dfiq"): + include_dfiq = True + analyzers = [] - all_analyzers = [x for x, _ in analyzer_manager.AnalysisManager.get_analyzers()] + all_analyzers = [x for x, _ in analyzer_manager.AnalysisManager.get_analyzers(include_dfiq=include_dfiq)] for analyzer in analyzer_names: for correct_name in all_analyzers: if fnmatch.fnmatch(correct_name, analyzer): @@ -301,6 +313,7 @@ def post(self, sketch_id): analyzer_kwargs=analyzer_kwargs, timeline_id=timeline_id, analyzer_force_run=analyzer_force_run, + include_dfiq=include_dfiq, ) except KeyError as e: logger.warning( diff --git a/timesketch/api/v1/resources/scenarios.py b/timesketch/api/v1/resources/scenarios.py index d825c4bc61..064b5a1ce7 100644 --- a/timesketch/api/v1/resources/scenarios.py +++ b/timesketch/api/v1/resources/scenarios.py @@ -229,7 +229,8 @@ def post(self, sketch_id): display_name=approach.name, description=approach.description, spec_json=approach.to_json(), - user=current_user, + user_id=current_user.id, + sketch=sketch, ) for search_template in approach.search_templates: @@ -553,7 +554,8 @@ def post(self, sketch_id): display_name=approach.name, description=approach.description, spec_json=approach.to_json(), - user=current_user, + user_id=current_user.id, + sketch=sketch, ) for search_template in approach.search_templates: diff --git a/timesketch/lib/analyzers/__init__.py b/timesketch/lib/analyzers/__init__.py index dd63913dbd..1e0ab72e75 100644 --- a/timesketch/lib/analyzers/__init__.py +++ b/timesketch/lib/analyzers/__init__.py @@ -40,3 +40,4 @@ import timesketch.lib.analyzers.authentication import timesketch.lib.analyzers.contrib +import timesketch.lib.analyzers.dfiq_plugins diff --git a/timesketch/lib/analyzers/dfiq_plugins/__init__.py b/timesketch/lib/analyzers/dfiq_plugins/__init__.py new file mode 100644 index 0000000000..5ec1774a95 --- /dev/null +++ b/timesketch/lib/analyzers/dfiq_plugins/__init__.py @@ -0,0 +1,51 @@ +"""DFIQ Analyzer module.""" + +import os +import importlib +import inspect +import logging +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager as analyzer_manager + +import timesketch.lib.analyzers.dfiq_plugins.manager + +logger = logging.getLogger("timesketch.lib.analyzers.dfiq_plugins") + +# Dynamically load DFIQ Analyzers +DFIQ_ANALYZER_PATH = os.path.dirname(os.path.abspath(__file__)) + +for filename in os.listdir(DFIQ_ANALYZER_PATH): + if filename.endswith(".py") and not filename.startswith("__"): + module_name = filename[:-3] # Remove .py extension + if module_name == "manager": + continue + module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" + try: + module = importlib.import_module(module_path) + for name, obj in inspect.getmembers(module): + if name not in [ + "interface", + "logger", + "logging", + "manager", + ] and not name.startswith("__"): + if ( + inspect.isclass(obj) + and issubclass(obj, interface.BaseAnalyzer) + and hasattr(obj, "IS_DFIQ_ANALYZER") + and obj.IS_DFIQ_ANALYZER + ): + analyzer_manager.AnalysisManager.register_analyzer(obj) + logger.info("Registered DFIQ analyzer: %s", obj.NAME) + else: + logger.error( + 'Skipped loading "%s" as analyzer, since it did ' + "not meet the requirements.", + str(module_path), + ) + except ImportError as error: + logger.error( + "Failed to import dfiq analyzer module: %s, %s", + str(module_path), + str(error), + ) diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py new file mode 100644 index 0000000000..00eac35af9 --- /dev/null +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -0,0 +1,173 @@ +# Copyright 2024 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import json + +from timesketch.models.sketch import Timeline +from timesketch.lib.aggregators import manager as aggregator_manager +from timesketch.models.sketch import approach_created # Import the signal + +logger = logging.getLogger("timesketch.analyzers.dfiq") + +@approach_created.connect +def trigger_dfiq_analyzsis(approach, **extra): + analyzer_manager = DFIQAnalyzerManager(approach=approach) + analyzer_manager.check_for_dfiq_analyzer() +# This approach triggrs the following SQLA messages: +# /usr/local/src/timesketch/timesketch/models/annotations.py:403: SAWarning: Object of type not in session, add operation along 'User.investigative_questions' will not proceed db_session.commit() +# /usr/local/src/timesketch/timesketch/models/annotations.py:403: SAWarning: Object of type not in session, add operation along 'Sketch.questions' will not proceed db_session.commit() +# To fix this, the InvestigativeQuestion needs to be added to the Database before we trigger the analyzers! + +class DFIQAnalyzerManager: + """Manager for executing DFIQ analyzers.""" + + def __init__(self, approach): + """Initializes the manager.""" + self.sketch = approach.sketch + self.user_id = approach.user_id + self.approach_spec = json.loads(approach.spec_json) + + self.aggregator_manager = aggregator_manager + + def check_for_dfiq_analyzer(self): + """Checks if the created approach has analyzer steps to execute.""" + dfiq_analyzers = set() + if self.approach_spec.get("steps"): + for step in self.approach_spec.get("steps"): + if step.get("stage") == "analysis" and step.get("type") == "timesketch-analyzer": + dfiq_analyzers.add(step.get("value")) + + if dfiq_analyzers: + print(f"### Next gonna run {dfiq_analyzers}") + analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers) + return analyzer_sessions + + def _get_analyzers_by_data_type(self, dfiq_analyzers): + """Groups DFIQ analyzers by their required data types. + + Args: + dfiq_analyzers (set): A set of DFIQ analyzer names. + + Returns: + dict: A dictionary mapping data types to lists of analyzer names. + The special key "ALL" will be used for classical analyzers + and DFIQ analyzers that don't have a REQUIRED_DATA_TYPES + attribute (i.e., an empty list). It will trigger the analyzer + to run on all timelines in the sketch. + """ + from timesketch.lib.analyzers import manager as analyzer_manager + analyzer_by_datatypes = {} + for analyzer_name, analyzer_class in analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=True + ): + if analyzer_name not in dfiq_analyzers: + continue + + required_data_types = getattr(analyzer_class, "REQUIRED_DATA_TYPES", []) + if not required_data_types: + # Classical or DFIQ analyzer without REQUIRED_DATA_TYPES + analyzer_by_datatypes.setdefault("ALL", []).append(analyzer_class.NAME) + else: + for data_type in required_data_types: + analyzer_by_datatypes.setdefault(data_type, []).append( + analyzer_class.NAME + ) + return analyzer_by_datatypes + + def _get_data_types_per_timeline(self): + """Retrieves data types present in each eligible timeline. + + Returns: + dict: A dictionary mapping timeline IDs to lists of data types. + """ + datatype_per_timeline = {} + for timeline in self.sketch.timelines: + if timeline.get_status.status.lower() != "ready": + continue + + aggregation = self.aggregator_manager.AggregatorManager.get_aggregator( + "field_bucket" + )(sketch_id=self.sketch.id, timeline_ids=[timeline.id]) + agg_result = aggregation.run(field="data_type", limit="1000") + datatype_per_timeline[timeline.id] = [ + entry["data_type"] for entry in agg_result.values + ] + return datatype_per_timeline + + def _run_dfiq_analyzers(self, dfiq_analyzers): + """Executes DFIQ analyzers on matching timelines. + + Args: + dfiq_analyzers (set): A set of DFIQ analyzer names. + + Returns: + list: A list of analyzer sessions (potentially empty). + """ + analyzer_by_datatypes = self._get_analyzers_by_data_type(dfiq_analyzers) + if not analyzer_by_datatypes: + logger.error( + "None of the requested DFIQ analyzers have required data " + "types defined. Aborting." + ) + return [] + + datatype_per_timeline = self._get_data_types_per_timeline() + + analyzer_by_timeline = {} + for timeline_id, timeline_datatypes in datatype_per_timeline.items(): + analyzer_by_timeline[timeline_id] = [] + for data_type, analyzer_names in analyzer_by_datatypes.items(): + # Handle classical analyzers by always including them. + if data_type == "ALL": + analyzer_by_timeline[timeline_id].extend(analyzer_names) + elif data_type in timeline_datatypes: + analyzer_by_timeline[timeline_id].extend(analyzer_names) + + from timesketch.lib import tasks + + sessions = [] + for timeline_id, analyzer_names in analyzer_by_timeline.items(): + if not analyzer_names: + continue + timeline = Timeline.get_by_id(timeline_id) + if not timeline or timeline.status[0].status != "ready": + continue + try: + analyzer_group, session = tasks.build_sketch_analysis_pipeline( + sketch_id=self.sketch.id, + searchindex_id=timeline.searchindex.id, + user_id=self.user_id, + analyzer_names=analyzer_names, + analyzer_kwargs=None, + timeline_id=timeline_id, + analyzer_force_run=True, + include_dfiq=True, + ) + except KeyError as e: + logger.warning( + f"Unable to build analyzer pipeline, analyzer does not exist: {e}" + ) + continue + if analyzer_group: + pipeline = ( + tasks.run_sketch_init.s([timeline.searchindex.index_name]) + | analyzer_group + ) + pipeline.apply_async() + + if session: + sessions.append(session) + + return sessions diff --git a/timesketch/lib/analyzers/interface.py b/timesketch/lib/analyzers/interface.py index 7da05681d2..54cd90c5c1 100644 --- a/timesketch/lib/analyzers/interface.py +++ b/timesketch/lib/analyzers/interface.py @@ -904,6 +904,7 @@ class BaseAnalyzer: NAME = "name" DISPLAY_NAME = None DESCRIPTION = None + IS_DFIQ_ANALYZER = False # If this analyzer depends on another analyzer # it needs to be included in this frozenset by using diff --git a/timesketch/lib/analyzers/manager.py b/timesketch/lib/analyzers/manager.py index 7785d723cd..24df99cacc 100644 --- a/timesketch/lib/analyzers/manager.py +++ b/timesketch/lib/analyzers/manager.py @@ -84,11 +84,13 @@ def clear_registration(cls): cls._class_registry = {} @classmethod - def get_analyzers(cls, analyzer_names=None): + def get_analyzers(cls, analyzer_names=None, include_dfiq=False): """Retrieves the registered analyzers. Args: analyzer_names (list): List of analyzer names. + include_dfiq (bool): Optional. Whether to include DFIQ analyzers + in the results. Defaults to False. Yields: tuple: containing: @@ -105,6 +107,14 @@ def get_analyzers(cls, analyzer_names=None): if analyzer_name in completed_analyzers: continue analyzer_class = cls.get_analyzer(analyzer_name) + # Apply DFIQ filtering + if ( + not include_dfiq + and hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER + ): + continue + yield analyzer_name, analyzer_class completed_analyzers.add(analyzer_name) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 64b23723a5..318e0f11e0 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -332,6 +332,7 @@ def build_sketch_analysis_pipeline( analyzer_kwargs=None, analyzer_force_run=False, timeline_id=None, + include_dfiq=False, ): """Build a pipeline for sketch analysis. @@ -349,13 +350,13 @@ def build_sketch_analysis_pipeline( analyzer_kwargs (dict): Arguments to the analyzers. analyzer_force_run (bool): If true then force the analyzer to run. timeline_id (int): Optional int of the timeline to run the analyzer on. + include_dfiq (bool): If trie then include dfiq analyzers in the task. Returns: A tuple with a Celery group with analysis tasks or None if no analyzers are enabled and an analyzer session ID. """ tasks = [] - if not analyzer_names: analyzer_names = current_app.config.get("AUTO_SKETCH_ANALYZERS", []) if not analyzer_kwargs: @@ -377,7 +378,7 @@ def build_sketch_analysis_pipeline( analysis_session = AnalysisSession(user=user, sketch=sketch) db_session.add(analysis_session) - analyzers = manager.AnalysisManager.get_analyzers(analyzer_names) + analyzers = manager.AnalysisManager.get_analyzers(analyzer_names, include_dfiq) for analyzer_name, analyzer_class in analyzers: base_kwargs = analyzer_kwargs.get(analyzer_name, {}) searchindex = SearchIndex.get_by_id(searchindex_id) @@ -452,7 +453,6 @@ def build_sketch_analysis_pipeline( **kwargs, ) ) - # Commit the analysis session to the database. if len(analysis_session.analyses) > 0: db_session.add(analysis_session) diff --git a/timesketch/models/sketch.py b/timesketch/models/sketch.py index 4e1c76d5f7..449531a600 100644 --- a/timesketch/models/sketch.py +++ b/timesketch/models/sketch.py @@ -44,6 +44,11 @@ from timesketch.lib.utils import random_color from timesketch.models import db_session +from blinker import signal + +# Create the signal +approach_created = signal('approach-created') + logger = logging.getLogger("timesketch.sketch") @@ -807,3 +812,14 @@ class InvestigativeQuestionApproach( search_histories = relationship( "SearchHistory", backref="investigativequestionapproach", lazy="select" ) + + def __init__(self, *args, **kwargs): + self.sketch = kwargs.pop('sketch', None) + super().__init__(*args, **kwargs) + # Import here to avoid circular imports. + # pylint: disable=import-outside-toplevel + # from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager + # analyzer_manager = DFIQAnalyzerManager(approach=self) + # analyzer_manager.check_for_dfiq_analyzer() + approach_created.send(self) + print("### Blinker signal sent!") From 581e058161b977d5d7182c38dbae28db1842b6dc Mon Sep 17 00:00:00 2001 From: janosch Date: Fri, 6 Sep 2024 15:42:12 +0000 Subject: [PATCH 02/14] DFIQ Analyzer implementation * Linked Analysis with Approach objects * Trigger chck for analysis from the API Endpoint * Remove Signals (no need anymore) * Add open TODOs --- timesketch/api/v1/resources/analysis.py | 11 +-- timesketch/api/v1/resources/scenarios.py | 67 +++++++++++++++++-- .../lib/analyzers/dfiq_plugins/manager.py | 66 +++++++++--------- timesketch/lib/tasks.py | 3 + timesketch/models/sketch.py | 20 ++---- 5 files changed, 113 insertions(+), 54 deletions(-) diff --git a/timesketch/api/v1/resources/analysis.py b/timesketch/api/v1/resources/analysis.py index 5335c92916..08b0bced6c 100644 --- a/timesketch/api/v1/resources/analysis.py +++ b/timesketch/api/v1/resources/analysis.py @@ -51,6 +51,7 @@ logger = logging.getLogger("timesketch.analysis_api") +# TODO: Filter DFIQ analyzer results from this! class AnalysisResource(resources.ResourceMixin, Resource): """Resource to get analyzer session.""" @@ -194,9 +195,6 @@ def get(self, sketch_id): request.args.get("include_dfiq", default="false").lower() == "true" ) - # This line feels double the work, do we need this? - # analyzers = [x for x, y in analyzer_manager.AnalysisManager.get_analyzers()] - analyzers = analyzer_manager.AnalysisManager.get_analyzers( include_dfiq=include_dfiq ) @@ -279,7 +277,12 @@ def post(self, sketch_id): include_dfiq = True analyzers = [] - all_analyzers = [x for x, _ in analyzer_manager.AnalysisManager.get_analyzers(include_dfiq=include_dfiq)] + all_analyzers = [ + x + for x, _ in analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=include_dfiq + ) + ] for analyzer in analyzer_names: for correct_name in all_analyzers: if fnmatch.fnmatch(correct_name, analyzer): diff --git a/timesketch/api/v1/resources/scenarios.py b/timesketch/api/v1/resources/scenarios.py index 064b5a1ce7..0823e587fe 100644 --- a/timesketch/api/v1/resources/scenarios.py +++ b/timesketch/api/v1/resources/scenarios.py @@ -37,6 +37,7 @@ from timesketch.models.sketch import InvestigativeQuestion from timesketch.models.sketch import InvestigativeQuestionApproach from timesketch.models.sketch import InvestigativeQuestionConclusion +from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager logger = logging.getLogger("timesketch.scenario_api") @@ -58,6 +59,50 @@ def load_dfiq_from_config(): return DFIQ(dfiq_path) +def check_and_run_dfiq_analysis_steps(dfiq_obj, sketch): + """Checks if any DFIQ analyzers need to be executed for the given DFIQ object. + + Args: + dfiq_obj: The DFIQ object (Scenario, Question, or Approach). + sketch: The sketch object associated with the DFIQ object. + + Returns: + List of analyzer_session objects (can be empty) or False. + """ + analyzer_sessions = [] + if isinstance(dfiq_obj, InvestigativeQuestionApproach): + analyzer_manager = DFIQAnalyzerManager(approach=dfiq_obj, sketch=sketch) + session = analyzer_manager.check_for_dfiq_analyzer_steps() + if session: + analyzer_sessions.extend(session) + elif isinstance(dfiq_obj, InvestigativeQuestion): + for approach in dfiq_obj.approaches: + analyzer_manager = DFIQAnalyzerManager(approach=approach, sketch=sketch) + session = analyzer_manager.check_for_dfiq_analyzer_steps() + if session: + analyzer_sessions.extend(session) + elif isinstance(dfiq_obj, Facet): + for question in dfiq_obj.questions: + result = check_and_run_dfiq_analysis_steps(question, sketch) + if result: + analyzer_sessions.extend(result) + elif isinstance(dfiq_obj, Scenario): + if dfiq_obj.facets: + for facet in dfiq_obj.facets: + result = check_and_run_dfiq_analysis_steps(facet, sketch) + if result: + analyzer_sessions.extend(result) + if dfiq_obj.questions: + for question in dfiq_obj.questions: + result = check_and_run_dfiq_analysis_steps(question, sketch) + if result: + analyzer_sessions.extend(result) + else: + return False # Invalid DFIQ object type + + return analyzer_sessions if analyzer_sessions else False + + class ScenarioTemplateListResource(resources.ResourceMixin, Resource): """List all scenarios available.""" @@ -229,8 +274,7 @@ def post(self, sketch_id): display_name=approach.name, description=approach.description, spec_json=approach.to_json(), - user_id=current_user.id, - sketch=sketch, + user=current_user, ) for search_template in approach.search_templates: @@ -242,9 +286,23 @@ def post(self, sketch_id): question_sql.approaches.append(approach_sql) + db_session.add(question_sql) + + # TODO: Remove commit and check function here when questions are + # linked to Scenarios again! + # Needs a tmp commit here so we can run the analyzer on the question. + db_session.commit() + # Check if any of the questions contains analyzer approaches + check_and_run_dfiq_analysis_steps(question_sql, sketch) + db_session.add(scenario_sql) db_session.commit() + # This does not work, since we don't have Scnearios linked down to + # Approaches anymore! We intentionally broke the link to facets to show + # Questions in the frontend. + # check_and_run_dfiq_analysis_steps(scenario_sql, sketch) + return self.to_json(scenario_sql) @@ -554,8 +612,7 @@ def post(self, sketch_id): display_name=approach.name, description=approach.description, spec_json=approach.to_json(), - user_id=current_user.id, - sketch=sketch, + user=current_user, ) for search_template in approach.search_templates: @@ -596,6 +653,8 @@ def post(self, sketch_id): db_session.add(new_question) db_session.commit() + check_and_run_dfiq_analysis_steps(new_question, sketch) + return self.to_json(new_question) diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py index 00eac35af9..d4e36f4608 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/manager.py +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -11,48 +11,45 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""This file contains a class for managing DFIQ analyzers.""" import logging import json from timesketch.models.sketch import Timeline from timesketch.lib.aggregators import manager as aggregator_manager -from timesketch.models.sketch import approach_created # Import the signal logger = logging.getLogger("timesketch.analyzers.dfiq") -@approach_created.connect -def trigger_dfiq_analyzsis(approach, **extra): - analyzer_manager = DFIQAnalyzerManager(approach=approach) - analyzer_manager.check_for_dfiq_analyzer() -# This approach triggrs the following SQLA messages: -# /usr/local/src/timesketch/timesketch/models/annotations.py:403: SAWarning: Object of type not in session, add operation along 'User.investigative_questions' will not proceed db_session.commit() -# /usr/local/src/timesketch/timesketch/models/annotations.py:403: SAWarning: Object of type not in session, add operation along 'Sketch.questions' will not proceed db_session.commit() -# To fix this, the InvestigativeQuestion needs to be added to the Database before we trigger the analyzers! class DFIQAnalyzerManager: """Manager for executing DFIQ analyzers.""" - def __init__(self, approach): + def __init__(self, approach, sketch): """Initializes the manager.""" - self.sketch = approach.sketch - self.user_id = approach.user_id + self.sketch = sketch + self.user_id = approach.user.id self.approach_spec = json.loads(approach.spec_json) + self.approach_id = approach.id self.aggregator_manager = aggregator_manager - def check_for_dfiq_analyzer(self): - """Checks if the created approach has analyzer steps to execute.""" - dfiq_analyzers = set() - if self.approach_spec.get("steps"): - for step in self.approach_spec.get("steps"): - if step.get("stage") == "analysis" and step.get("type") == "timesketch-analyzer": - dfiq_analyzers.add(step.get("value")) + def check_for_dfiq_analyzer_steps(self): + """Checks if the created approach has analyzer steps to execute.""" + dfiq_analyzers = set() + analyzer_sessions = [] + if self.approach_spec.get("steps"): + for step in self.approach_spec.get("steps"): + if ( + step.get("stage") == "analysis" + and step.get("type") == "timesketch-analyzer" + ): + dfiq_analyzers.add(step.get("value")) - if dfiq_analyzers: - print(f"### Next gonna run {dfiq_analyzers}") - analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers) - return analyzer_sessions + if dfiq_analyzers: + analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers) + + return analyzer_sessions def _get_analyzers_by_data_type(self, dfiq_analyzers): """Groups DFIQ analyzers by their required data types. @@ -67,11 +64,15 @@ def _get_analyzers_by_data_type(self, dfiq_analyzers): attribute (i.e., an empty list). It will trigger the analyzer to run on all timelines in the sketch. """ + # Import here to avoid circular imports. + # pylint: disable=import-outside-toplevel from timesketch.lib.analyzers import manager as analyzer_manager + analyzer_by_datatypes = {} - for analyzer_name, analyzer_class in analyzer_manager.AnalysisManager.get_analyzers( - include_dfiq=True - ): + for ( + analyzer_name, + analyzer_class, + ) in analyzer_manager.AnalysisManager.get_analyzers(include_dfiq=True): if analyzer_name not in dfiq_analyzers: continue @@ -118,8 +119,9 @@ def _run_dfiq_analyzers(self, dfiq_analyzers): analyzer_by_datatypes = self._get_analyzers_by_data_type(dfiq_analyzers) if not analyzer_by_datatypes: logger.error( - "None of the requested DFIQ analyzers have required data " - "types defined. Aborting." + "None of the requested DFIQ analyzers exist on this Timesketch " + "instance Requested: %s", + str(dfiq_analyzers), ) return [] @@ -135,6 +137,8 @@ def _run_dfiq_analyzers(self, dfiq_analyzers): elif data_type in timeline_datatypes: analyzer_by_timeline[timeline_id].extend(analyzer_names) + # Import here to avoid circular imports. + # pylint: disable=import-outside-toplevel from timesketch.lib import tasks sessions = [] @@ -152,12 +156,14 @@ def _run_dfiq_analyzers(self, dfiq_analyzers): analyzer_names=analyzer_names, analyzer_kwargs=None, timeline_id=timeline_id, - analyzer_force_run=True, + analyzer_force_run=False, include_dfiq=True, + approach_id=self.approach_id, ) except KeyError as e: logger.warning( - f"Unable to build analyzer pipeline, analyzer does not exist: {e}" + "Unable to build analyzer pipeline, analyzer does not exist: %s", + str(e), ) continue if analyzer_group: diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 318e0f11e0..9f8e4c57ea 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -333,6 +333,7 @@ def build_sketch_analysis_pipeline( analyzer_force_run=False, timeline_id=None, include_dfiq=False, + approach_id=None, ): """Build a pipeline for sketch analysis. @@ -351,6 +352,7 @@ def build_sketch_analysis_pipeline( analyzer_force_run (bool): If true then force the analyzer to run. timeline_id (int): Optional int of the timeline to run the analyzer on. include_dfiq (bool): If trie then include dfiq analyzers in the task. + approach_id (int): Optional ID of the approach triggering the analyzer. Returns: A tuple with a Celery group with analysis tasks or None if no analyzers @@ -437,6 +439,7 @@ def build_sketch_analysis_pipeline( user=user, sketch=sketch, timeline=timeline, + approach_id=approach_id, ) analysis.add_attribute(name="kwargs_hash", value=kwargs_list_hash) analysis.set_status("PENDING") diff --git a/timesketch/models/sketch.py b/timesketch/models/sketch.py index 449531a600..db650e4e78 100644 --- a/timesketch/models/sketch.py +++ b/timesketch/models/sketch.py @@ -44,11 +44,6 @@ from timesketch.lib.utils import random_color from timesketch.models import db_session -from blinker import signal - -# Create the signal -approach_created = signal('approach-created') - logger = logging.getLogger("timesketch.sketch") @@ -356,6 +351,7 @@ class Analysis(GenericAttributeMixin, LabelMixin, StatusMixin, CommentMixin, Bas sketch_id = Column(Integer, ForeignKey("sketch.id")) timeline_id = Column(Integer, ForeignKey("timeline.id")) searchindex_id = Column(Integer, ForeignKey("searchindex.id")) + approach_id = Column(Integer, ForeignKey("investigativequestionapproach.id")) class AnalysisSession(LabelMixin, StatusMixin, CommentMixin, BaseModel): @@ -812,14 +808,6 @@ class InvestigativeQuestionApproach( search_histories = relationship( "SearchHistory", backref="investigativequestionapproach", lazy="select" ) - - def __init__(self, *args, **kwargs): - self.sketch = kwargs.pop('sketch', None) - super().__init__(*args, **kwargs) - # Import here to avoid circular imports. - # pylint: disable=import-outside-toplevel - # from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager - # analyzer_manager = DFIQAnalyzerManager(approach=self) - # analyzer_manager.check_for_dfiq_analyzer() - approach_created.send(self) - print("### Blinker signal sent!") + analysis = relationship( + "Analysis", backref="investigativequestionapproach", lazy="dynamic" + ) From 9bf4f2ef098b00de2db5e8e98307ebb2fbea3901 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 9 Sep 2024 16:26:30 +0000 Subject: [PATCH 03/14] * Added DFIQ analyzer trigger via uploaded timeline * Restructured the dfiq_plugins/manager to be more versatile. * Adding a function to deregister analyzers to the manager * Ensuring the index is ready before analyzers are executed --- timesketch/api/v1/resources/scenarios.py | 25 ++- .../lib/analyzers/dfiq_plugins/__init__.py | 50 +---- .../lib/analyzers/dfiq_plugins/manager.py | 181 +++++++++++++++--- timesketch/lib/analyzers/manager.py | 17 ++ timesketch/lib/tasks.py | 71 ++++--- 5 files changed, 241 insertions(+), 103 deletions(-) diff --git a/timesketch/api/v1/resources/scenarios.py b/timesketch/api/v1/resources/scenarios.py index 0823e587fe..04846a593c 100644 --- a/timesketch/api/v1/resources/scenarios.py +++ b/timesketch/api/v1/resources/scenarios.py @@ -59,42 +59,51 @@ def load_dfiq_from_config(): return DFIQ(dfiq_path) -def check_and_run_dfiq_analysis_steps(dfiq_obj, sketch): +def check_and_run_dfiq_analysis_steps(dfiq_obj, sketch, analyzer_manager=None): """Checks if any DFIQ analyzers need to be executed for the given DFIQ object. Args: dfiq_obj: The DFIQ object (Scenario, Question, or Approach). sketch: The sketch object associated with the DFIQ object. + analyzer_manager: Optional. An existing instance of DFIQAnalyzerManager. Returns: List of analyzer_session objects (can be empty) or False. """ + # Initialize the analyzer manager only once. + if not analyzer_manager: + analyzer_manager = DFIQAnalyzerManager(sketch=sketch) + analyzer_sessions = [] if isinstance(dfiq_obj, InvestigativeQuestionApproach): - analyzer_manager = DFIQAnalyzerManager(approach=dfiq_obj, sketch=sketch) - session = analyzer_manager.check_for_dfiq_analyzer_steps() + session = analyzer_manager.trigger_analyzers_for_approach(approach=dfiq_obj) if session: analyzer_sessions.extend(session) elif isinstance(dfiq_obj, InvestigativeQuestion): for approach in dfiq_obj.approaches: - analyzer_manager = DFIQAnalyzerManager(approach=approach, sketch=sketch) - session = analyzer_manager.check_for_dfiq_analyzer_steps() + session = analyzer_manager.trigger_analyzers_for_approach(approach=approach) if session: analyzer_sessions.extend(session) elif isinstance(dfiq_obj, Facet): for question in dfiq_obj.questions: - result = check_and_run_dfiq_analysis_steps(question, sketch) + result = check_and_run_dfiq_analysis_steps( + question, sketch, analyzer_manager + ) if result: analyzer_sessions.extend(result) elif isinstance(dfiq_obj, Scenario): if dfiq_obj.facets: for facet in dfiq_obj.facets: - result = check_and_run_dfiq_analysis_steps(facet, sketch) + result = check_and_run_dfiq_analysis_steps( + facet, sketch, analyzer_manager + ) if result: analyzer_sessions.extend(result) if dfiq_obj.questions: for question in dfiq_obj.questions: - result = check_and_run_dfiq_analysis_steps(question, sketch) + result = check_and_run_dfiq_analysis_steps( + question, sketch, analyzer_manager + ) if result: analyzer_sessions.extend(result) else: diff --git a/timesketch/lib/analyzers/dfiq_plugins/__init__.py b/timesketch/lib/analyzers/dfiq_plugins/__init__.py index 5ec1774a95..c107e0c6cb 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/__init__.py +++ b/timesketch/lib/analyzers/dfiq_plugins/__init__.py @@ -1,51 +1,5 @@ """DFIQ Analyzer module.""" -import os -import importlib -import inspect -import logging -from timesketch.lib.analyzers import interface -from timesketch.lib.analyzers import manager as analyzer_manager +from timesketch.lib.analyzers.dfiq_plugins import manager as dfiq_analyzer_manager -import timesketch.lib.analyzers.dfiq_plugins.manager - -logger = logging.getLogger("timesketch.lib.analyzers.dfiq_plugins") - -# Dynamically load DFIQ Analyzers -DFIQ_ANALYZER_PATH = os.path.dirname(os.path.abspath(__file__)) - -for filename in os.listdir(DFIQ_ANALYZER_PATH): - if filename.endswith(".py") and not filename.startswith("__"): - module_name = filename[:-3] # Remove .py extension - if module_name == "manager": - continue - module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" - try: - module = importlib.import_module(module_path) - for name, obj in inspect.getmembers(module): - if name not in [ - "interface", - "logger", - "logging", - "manager", - ] and not name.startswith("__"): - if ( - inspect.isclass(obj) - and issubclass(obj, interface.BaseAnalyzer) - and hasattr(obj, "IS_DFIQ_ANALYZER") - and obj.IS_DFIQ_ANALYZER - ): - analyzer_manager.AnalysisManager.register_analyzer(obj) - logger.info("Registered DFIQ analyzer: %s", obj.NAME) - else: - logger.error( - 'Skipped loading "%s" as analyzer, since it did ' - "not meet the requirements.", - str(module_path), - ) - except ImportError as error: - logger.error( - "Failed to import dfiq analyzer module: %s, %s", - str(module_path), - str(error), - ) +dfiq_analyzer_manager.load_dfiq_analyzers() diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py index d4e36f4608..76a1ab11e6 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/manager.py +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -13,43 +13,167 @@ # limitations under the License. """This file contains a class for managing DFIQ analyzers.""" -import logging +import importlib +import inspect import json +import logging +import os -from timesketch.models.sketch import Timeline from timesketch.lib.aggregators import manager as aggregator_manager +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager as analyzer_manager +from timesketch.models.sketch import Timeline + + +logger = logging.getLogger("timesketch.analyzers.dfiq_plugins.manager") + -logger = logging.getLogger("timesketch.analyzers.dfiq") +def load_dfiq_analyzers(): + """Loads DFIQ analyzer classes.""" + + DFIQ_ANALYZER_PATH = os.path.dirname(os.path.abspath(__file__)) + + # Clear existing registrations before reloading + for name, analyzer_class in analyzer_manager.AnalysisManager.get_analyzers( + include_dfiq=True + ): + if ( + hasattr(analyzer_class, "IS_DFIQ_ANALYZER") + and analyzer_class.IS_DFIQ_ANALYZER + ): + try: + analyzer_manager.AnalysisManager.deregister_analyzer(name) + logger.info("Deregistered DFIQ analyzer: %s", name) + except KeyError as e: + logger.error(str(e)) + + # Dynamically load DFIQ Analyzers + for filename in os.listdir(DFIQ_ANALYZER_PATH): + if filename.endswith(".py") and not filename.startswith("__"): + module_name = filename[:-3] # Remove .py extension + if module_name == "manager": + continue + module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" + try: + module = importlib.import_module(module_path) + for name, obj in inspect.getmembers(module): + if name not in [ + "interface", + "logger", + "logging", + "manager", + ] and not name.startswith("__"): + if ( + inspect.isclass(obj) + and issubclass(obj, interface.BaseAnalyzer) + and hasattr(obj, "IS_DFIQ_ANALYZER") + and obj.IS_DFIQ_ANALYZER + ): + analyzer_manager.AnalysisManager.register_analyzer(obj) + logger.info("Registered DFIQ analyzer: %s", obj.NAME) + else: + logger.error( + 'Skipped loading "%s" as analyzer, since it did' + " not meet the requirements.", + str(module_path), + ) + except ImportError as error: + logger.error( + "Failed to import dfiq analyzer module: %s, %s", + str(module_path), + str(error), + ) class DFIQAnalyzerManager: """Manager for executing DFIQ analyzers.""" - def __init__(self, approach, sketch): - """Initializes the manager.""" - self.sketch = sketch - self.user_id = approach.user.id - self.approach_spec = json.loads(approach.spec_json) - self.approach_id = approach.id + def __init__(self, sketch): + """Initializes the manager. + Args: + sketch: The sketch object. + """ + self.sketch = sketch self.aggregator_manager = aggregator_manager + self.aggregation_max_tries = 3 + + def trigger_analyzers_for_approach(self, approach): + """Triggers DFIQ analyzers for a newly added approach. + + Args: + approach (InvestigativeQuestionApproach): An approach object to link + with the analyssis + + Returns: + analyzer_sessions or None + """ + dfiq_analyzers = self._get_dfiq_analyzer(approach) - def check_for_dfiq_analyzer_steps(self): - """Checks if the created approach has analyzer steps to execute.""" - dfiq_analyzers = set() analyzer_sessions = [] - if self.approach_spec.get("steps"): - for step in self.approach_spec.get("steps"): + if dfiq_analyzers: + analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers, approach) + + return analyzer_sessions if analyzer_sessions else False + + def trigger_analyzers_for_timelines(self, timelines): + """Triggers DFIQ analyzers for a newly added timeline. + + Args: + timelines []: List of timeline + objects. + + Returns: + analyzer_sessions or None + """ + if isinstance(timelines, Timeline): + timelines = [timelines] + analyzer_sessions = [] + for approach in self._find_analyzer_approaches(): + dfiq_analyzers = self._get_dfiq_analyzer(approach) + if dfiq_analyzers: + session = self._run_dfiq_analyzers( + dfiq_analyzers=dfiq_analyzers, + approach=approach, + timelines=timelines, + ) + if session: + analyzer_sessions.extend(session) + + return analyzer_sessions if analyzer_sessions else False + + def _find_analyzer_approaches(self): + """Finds approaches with a defined analyzer step. + + Returns: + A list of InvestigativeQuestionApproach objects that have a defined + analyzer step in their specification. + """ + approaches = [] + for question in self.sketch.questions: + for approach in question.approaches: + approach_spec = json.loads(approach.spec_json) + if any( + step.get("stage") == "analysis" + and step.get("type") == "timesketch-analyzer" + for step in approach_spec.get("steps", []) + ): + approaches.append(approach) + return approaches + + def _get_dfiq_analyzer(self, approach): + """Checks if the approach has analyzer steps to execute.""" + dfiq_analyzers = set() + approach_spec = json.loads(approach.spec_json) + if approach_spec.get("steps"): + for step in approach_spec.get("steps"): if ( step.get("stage") == "analysis" and step.get("type") == "timesketch-analyzer" ): dfiq_analyzers.add(step.get("value")) - if dfiq_analyzers: - analyzer_sessions = self._run_dfiq_analyzers(dfiq_analyzers) - - return analyzer_sessions + return dfiq_analyzers def _get_analyzers_by_data_type(self, dfiq_analyzers): """Groups DFIQ analyzers by their required data types. @@ -87,14 +211,20 @@ def _get_analyzers_by_data_type(self, dfiq_analyzers): ) return analyzer_by_datatypes - def _get_data_types_per_timeline(self): + def _get_data_types_per_timeline(self, timelines=[]): """Retrieves data types present in each eligible timeline. + Args: + timelines: (optional) A list of timeline objects. + Returns: dict: A dictionary mapping timeline IDs to lists of data types. """ + if not timelines: + timelines = self.sketch.timelines + datatype_per_timeline = {} - for timeline in self.sketch.timelines: + for timeline in timelines: if timeline.get_status.status.lower() != "ready": continue @@ -107,11 +237,13 @@ def _get_data_types_per_timeline(self): ] return datatype_per_timeline - def _run_dfiq_analyzers(self, dfiq_analyzers): + def _run_dfiq_analyzers(self, dfiq_analyzers, approach, timelines=[]): """Executes DFIQ analyzers on matching timelines. Args: dfiq_analyzers (set): A set of DFIQ analyzer names. + approach (InvestigativeQuestionApproach): An approach object to link with the analyssis + timelines ([]): Optional list of timelines to limit the analysis on. Returns: list: A list of analyzer sessions (potentially empty). @@ -125,8 +257,7 @@ def _run_dfiq_analyzers(self, dfiq_analyzers): ) return [] - datatype_per_timeline = self._get_data_types_per_timeline() - + datatype_per_timeline = self._get_data_types_per_timeline(timelines) analyzer_by_timeline = {} for timeline_id, timeline_datatypes in datatype_per_timeline.items(): analyzer_by_timeline[timeline_id] = [] @@ -152,13 +283,13 @@ def _run_dfiq_analyzers(self, dfiq_analyzers): analyzer_group, session = tasks.build_sketch_analysis_pipeline( sketch_id=self.sketch.id, searchindex_id=timeline.searchindex.id, - user_id=self.user_id, + user_id=approach.user.id, analyzer_names=analyzer_names, analyzer_kwargs=None, timeline_id=timeline_id, analyzer_force_run=False, include_dfiq=True, - approach_id=self.approach_id, + approach_id=approach.id, ) except KeyError as e: logger.warning( diff --git a/timesketch/lib/analyzers/manager.py b/timesketch/lib/analyzers/manager.py index 24df99cacc..4f39979253 100644 --- a/timesketch/lib/analyzers/manager.py +++ b/timesketch/lib/analyzers/manager.py @@ -148,3 +148,20 @@ def register_analyzer(cls, analyzer_class): "Class already set for name: {0:s}.".format(analyzer_class.NAME) ) cls._class_registry[analyzer_name] = analyzer_class + + @classmethod + def deregister_analyzer(cls, analyzer_name): + """Deregister an analyzer class. + + The analyzer classes are identified by their lower case name. + + Args: + analyzer_name (string): the analyzer name to deregister. + + Raises: + KeyError: If class is not registered for the corresponding name. + """ + if analyzer_name not in cls._class_registry: + # Do we really need a KeyError here? Isn't logging enough? + raise KeyError("Class not set for name: {0:s}.".format(analyzer_name)) + analyzer_class = cls._class_registry.pop(analyzer_name, None) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 9f8e4c57ea..e051ba67f1 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -15,39 +15,29 @@ from __future__ import unicode_literals -import os -import logging -import subprocess -import traceback - import codecs +from hashlib import sha1 import io import json -from hashlib import sha1 -import six -import yaml - -from opensearchpy.exceptions import NotFoundError -from opensearchpy.exceptions import RequestError -from flask import current_app +import logging +import os +import subprocess +import traceback from celery import chain from celery import group from celery import signals +from flask import current_app +from opensearchpy.exceptions import NotFoundError +from opensearchpy.exceptions import RequestError +import six from sqlalchemy import create_engine - -# To be able to determine plaso's version. -try: - import plaso - from plaso.cli import pinfo_tool -except ImportError: - plaso = None - from timesketch.app import configure_logger from timesketch.app import create_celery_app from timesketch.lib import datafinder from timesketch.lib import errors from timesketch.lib.analyzers import manager +from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager from timesketch.lib.datastores.opensearch import OpenSearchDataStore from timesketch.lib.utils import read_and_validate_csv from timesketch.lib.utils import read_and_validate_jsonl @@ -59,6 +49,15 @@ from timesketch.models.sketch import Sketch from timesketch.models.sketch import Timeline from timesketch.models.user import User +import yaml + + +# To be able to determine plaso's version. +try: + import plaso + from plaso.cli import pinfo_tool +except ImportError: + plaso = None logger = logging.getLogger("timesketch.tasks") @@ -167,9 +166,11 @@ def _close_index(index_name, data_store, timeline_id): def _set_timeline_status(timeline_id, status, error_msg=None): """Helper function to set status for searchindex and all related timelines. - Args: - timeline_id: Timeline ID. + + Args: + timeline_id: Timeline ID. """ + # TODO: Clean-up function, since neither status nor error_msg are used! timeline = Timeline.get_by_id(timeline_id) if not timeline: logger.warning("Cannot set status: No such timeline") @@ -194,6 +195,32 @@ def _set_timeline_status(timeline_id, status, error_msg=None): db_session.add(timeline) db_session.commit() + # Refresh the index so it is searchable for the analyzers right away. + datastore = OpenSearchDataStore( + host=current_app.config["OPENSEARCH_HOST"], + port=current_app.config["OPENSEARCH_PORT"], + ) + try: + datastore.client.indices.refresh(index=timeline.searchindex.index_name) + except NotFoundError: + logger.error( + "Unable to refresh index: {0:s}, not found, " + "removing from list.".format(timeline.searchindex.index_name) + ) + + # If status is set to ready, check for analyzers to execute. + if timeline.get_status.status == "ready": + analyzer_manager = DFIQAnalyzerManager(sketch=timeline.sketch) + sessions = analyzer_manager.trigger_analyzers_for_timelines( + timelines=[timeline] + ) + if sessions: + logger.info( + "Executed %d analyzers on the new timeline: '%s'", + len(sessions), + timeline.name, + ) + def _set_datasource_status(timeline_id, file_path, status, error_message=None): timeline = Timeline.get_by_id(timeline_id) From adfa5819b838657b8b22ad1024069ea011307539 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 9 Sep 2024 16:27:46 +0000 Subject: [PATCH 04/14] linter --- timesketch/lib/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index e051ba67f1..7afb033eff 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -167,8 +167,8 @@ def _close_index(index_name, data_store, timeline_id): def _set_timeline_status(timeline_id, status, error_msg=None): """Helper function to set status for searchindex and all related timelines. - Args: - timeline_id: Timeline ID. + Args: + timeline_id: Timeline ID. """ # TODO: Clean-up function, since neither status nor error_msg are used! timeline = Timeline.get_by_id(timeline_id) From 5759a52fbac489230be75faa1a783921f71af6b6 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 9 Sep 2024 16:33:28 +0000 Subject: [PATCH 05/14] linter --- timesketch/lib/analyzers/dfiq_plugins/manager.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py index 76a1ab11e6..f50658fa6a 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/manager.py +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -188,10 +188,6 @@ def _get_analyzers_by_data_type(self, dfiq_analyzers): attribute (i.e., an empty list). It will trigger the analyzer to run on all timelines in the sketch. """ - # Import here to avoid circular imports. - # pylint: disable=import-outside-toplevel - from timesketch.lib.analyzers import manager as analyzer_manager - analyzer_by_datatypes = {} for ( analyzer_name, @@ -211,7 +207,7 @@ def _get_analyzers_by_data_type(self, dfiq_analyzers): ) return analyzer_by_datatypes - def _get_data_types_per_timeline(self, timelines=[]): + def _get_data_types_per_timeline(self, timelines=None): """Retrieves data types present in each eligible timeline. Args: @@ -237,13 +233,15 @@ def _get_data_types_per_timeline(self, timelines=[]): ] return datatype_per_timeline - def _run_dfiq_analyzers(self, dfiq_analyzers, approach, timelines=[]): + def _run_dfiq_analyzers(self, dfiq_analyzers, approach, timelines=None): """Executes DFIQ analyzers on matching timelines. Args: dfiq_analyzers (set): A set of DFIQ analyzer names. - approach (InvestigativeQuestionApproach): An approach object to link with the analyssis - timelines ([]): Optional list of timelines to limit the analysis on. + approach (InvestigativeQuestionApproach): An approach object to link + with the analyssis + timelines ([]): Optional list of timelines to limit the + analysis on. Returns: list: A list of analyzer sessions (potentially empty). From 40a435fd43a612cef010b425ebb36bc0837d3ac2 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 9 Sep 2024 16:37:06 +0000 Subject: [PATCH 06/14] linter --- timesketch/lib/analyzers/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timesketch/lib/analyzers/manager.py b/timesketch/lib/analyzers/manager.py index 4f39979253..45400f9c09 100644 --- a/timesketch/lib/analyzers/manager.py +++ b/timesketch/lib/analyzers/manager.py @@ -164,4 +164,4 @@ def deregister_analyzer(cls, analyzer_name): if analyzer_name not in cls._class_registry: # Do we really need a KeyError here? Isn't logging enough? raise KeyError("Class not set for name: {0:s}.".format(analyzer_name)) - analyzer_class = cls._class_registry.pop(analyzer_name, None) + _ = cls._class_registry.pop(analyzer_name, None) From c53709e293860bf54db38da32368839e5402531b Mon Sep 17 00:00:00 2001 From: janosch Date: Tue, 10 Sep 2024 08:21:46 +0000 Subject: [PATCH 07/14] another linter --- timesketch/lib/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 7afb033eff..116dae0389 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -23,6 +23,8 @@ import os import subprocess import traceback +import six +import yaml from celery import chain from celery import group @@ -30,7 +32,6 @@ from flask import current_app from opensearchpy.exceptions import NotFoundError from opensearchpy.exceptions import RequestError -import six from sqlalchemy import create_engine from timesketch.app import configure_logger from timesketch.app import create_celery_app @@ -49,7 +50,6 @@ from timesketch.models.sketch import Sketch from timesketch.models.sketch import Timeline from timesketch.models.user import User -import yaml # To be able to determine plaso's version. From 4919ce83d2f4d3d5d2aec0da6b72c8dddc98717c Mon Sep 17 00:00:00 2001 From: janosch Date: Tue, 10 Sep 2024 14:53:55 +0000 Subject: [PATCH 08/14] Linking Analysis and InvestigativeQuestionConclusion objects. --- timesketch/lib/tasks.py | 52 +++++++++++++++++++++++++++++++++++++ timesketch/models/sketch.py | 6 +++++ 2 files changed, 58 insertions(+) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 116dae0389..0581a66f4d 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -49,6 +49,8 @@ from timesketch.models.sketch import SearchIndex from timesketch.models.sketch import Sketch from timesketch.models.sketch import Timeline +from timesketch.models.sketch import InvestigativeQuestionApproach +from timesketch.models.sketch import InvestigativeQuestionConclusion from timesketch.models.user import User @@ -351,6 +353,43 @@ def build_index_pipeline( return chain(index_task) +def _create_question_conclusion(user_id, approach_id, analysis_results, analysis): + """Creates a QuestionConclusion for a user and approach. + + Args: + user_id (int): The user ID. + approach_id (int): The approach ID. + conclusion (str): The actual conclusion of the analysis. + + Returns: + InvestigativeQuestionConclusion: A QuestionConclusion object or None. + """ + approach = InvestigativeQuestionApproach.get_by_id(approach_id) + if not approach: + logging.error("No approach with ID '%d' found.", approach_id) + return None + + if not analysis_results: + logging.error( + "Can't create an InvestigativeQuestionConclusion without any " + "conclusion or analysis_results provided." + ) + return None + + # TODO: (jkppr) Parse the analysis_results and extract added stories, + # searches, graphs, aggregations and add to the object! + question_conclusion = InvestigativeQuestionConclusion( + conclusion=analysis_results, + investigativequestion_id=approach.investigativequestion_id, + automated=True, + ) + question_conclusion.analysis.append(analysis) + db_session.add(question_conclusion) + db_session.commit() + + return question_conclusion if question_conclusion else None + + def build_sketch_analysis_pipeline( sketch_id, searchindex_id, @@ -598,6 +637,19 @@ def run_sketch_analyzer( result = analyzer.run_wrapper(analysis_id) logger.info("[{0:s}] result: {1:s}".format(analyzer_name, result)) + if hasattr(analyzer_class, "IS_DFIQ_ANALYZER") and analyzer_class.IS_DFIQ_ANALYZER: + analysis = Analysis.get_by_id(analysis_id) + user_id = analysis.user.id + approach_id = analysis.approach_id + question_conclusion = _create_question_conclusion( + user_id, approach_id, result, analysis + ) + if question_conclusion: + logger.info( + '[{0:s}] added a conclusion to dfiq: "{1:s}"'.format( + analyzer_name, question_conclusion.investigativequestion.name + ) + ) return index_name diff --git a/timesketch/models/sketch.py b/timesketch/models/sketch.py index db650e4e78..cda99d7bf0 100644 --- a/timesketch/models/sketch.py +++ b/timesketch/models/sketch.py @@ -352,6 +352,9 @@ class Analysis(GenericAttributeMixin, LabelMixin, StatusMixin, CommentMixin, Bas timeline_id = Column(Integer, ForeignKey("timeline.id")) searchindex_id = Column(Integer, ForeignKey("searchindex.id")) approach_id = Column(Integer, ForeignKey("investigativequestionapproach.id")) + question_conclusion_id = Column( + Integer, ForeignKey("investigativequestionconclusion.id") + ) class AnalysisSession(LabelMixin, StatusMixin, CommentMixin, BaseModel): @@ -725,6 +728,9 @@ class InvestigativeQuestionConclusion(LabelMixin, StatusMixin, CommentMixin, Bas saved_aggregations = relationship( "Aggregation", secondary=questionconclusion_aggregation_association_table ) + analysis = relationship( + "Analysis", backref="investigativequestionconclusion", lazy="select" + ) class InvestigativeQuestion( From 2b396bd4b0c09cfe382f63c6337f206322dc2d5a Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 11 Sep 2024 18:41:07 +0000 Subject: [PATCH 09/14] review changes pt1 --- timesketch/api/v1/resources/analysis.py | 2 +- timesketch/lib/analyzers/dfiq_plugins/manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/timesketch/api/v1/resources/analysis.py b/timesketch/api/v1/resources/analysis.py index 08b0bced6c..e3e03c51c2 100644 --- a/timesketch/api/v1/resources/analysis.py +++ b/timesketch/api/v1/resources/analysis.py @@ -181,7 +181,7 @@ def get(self, sketch_id): * display_name: Display name of the analyzer for the UI * description: Description of the analyzer provided in the class * is_multi: Boolean indicating if the analyzer is a multi analyzer - * is_dfiq: Boolean indicating if the analyzer is a dfiq analyzer + * is_dfiq: Boolean indicating if the analyzer is a DFIQ analyzer """ sketch = Sketch.get_with_acl(sketch_id) if not sketch: diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py index f50658fa6a..6d68b088ea 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/manager.py +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -50,7 +50,7 @@ def load_dfiq_analyzers(): # Dynamically load DFIQ Analyzers for filename in os.listdir(DFIQ_ANALYZER_PATH): if filename.endswith(".py") and not filename.startswith("__"): - module_name = filename[:-3] # Remove .py extension + module_name = os.path.splitext(filename)[0] # Remove .py extension if module_name == "manager": continue module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" From 3f05c55db43056af70c72b1238c9a65e4cdeae66 Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 2 Oct 2024 16:26:13 +0000 Subject: [PATCH 10/14] Database migration script --- .../87d24c7252fc_linking_analysis_and_.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py diff --git a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py new file mode 100644 index 0000000000..1537b47ef5 --- /dev/null +++ b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py @@ -0,0 +1,36 @@ +"""Linking Analysis and InvestigativeQuestion models. + +Revision ID: 87d24c7252fc +Revises: c5560d97a2c8 +Create Date: 2024-10-02 16:17:42.576745 + +""" + +# revision identifiers, used by Alembic. +revision = '87d24c7252fc' +down_revision = 'c5560d97a2c8' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('analysis', schema=None) as batch_op: + batch_op.add_column(sa.Column('approach_id', sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column('question_conclusion_id', sa.Integer(), nullable=True)) + batch_op.create_foreign_key(None, 'investigativequestionconclusion', ['question_conclusion_id'], ['id']) + batch_op.create_foreign_key(None, 'investigativequestionapproach', ['approach_id'], ['id']) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('analysis', schema=None) as batch_op: + batch_op.drop_constraint(None, type_='foreignkey') + batch_op.drop_constraint(None, type_='foreignkey') + batch_op.drop_column('question_conclusion_id') + batch_op.drop_column('approach_id') + + # ### end Alembic commands ### From 18b51180542369b2887fa07818f6573224f98fcf Mon Sep 17 00:00:00 2001 From: janosch Date: Thu, 3 Oct 2024 08:50:01 +0000 Subject: [PATCH 11/14] linters --- .../87d24c7252fc_linking_analysis_and_.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py index 1537b47ef5..d41995818d 100644 --- a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py +++ b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py @@ -6,31 +6,36 @@ """ -# revision identifiers, used by Alembic. -revision = '87d24c7252fc' -down_revision = 'c5560d97a2c8' - from alembic import op import sqlalchemy as sa +# revision identifiers, used by Alembic. +revision = "87d24c7252fc" +down_revision = "c5560d97a2c8" def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('analysis', schema=None) as batch_op: - batch_op.add_column(sa.Column('approach_id', sa.Integer(), nullable=True)) - batch_op.add_column(sa.Column('question_conclusion_id', sa.Integer(), nullable=True)) - batch_op.create_foreign_key(None, 'investigativequestionconclusion', ['question_conclusion_id'], ['id']) - batch_op.create_foreign_key(None, 'investigativequestionapproach', ['approach_id'], ['id']) + with op.batch_alter_table("analysis", schema=None) as batch_op: + batch_op.add_column(sa.Column("approach_id", sa.Integer(), nullable=True)) + batch_op.add_column( + sa.Column("question_conclusion_id", sa.Integer(), nullable=True) + ) + batch_op.create_foreign_key( + None, "investigativequestionconclusion", ["question_conclusion_id"], ["id"] + ) + batch_op.create_foreign_key( + None, "investigativequestionapproach", ["approach_id"], ["id"] + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('analysis', schema=None) as batch_op: - batch_op.drop_constraint(None, type_='foreignkey') - batch_op.drop_constraint(None, type_='foreignkey') - batch_op.drop_column('question_conclusion_id') - batch_op.drop_column('approach_id') + with op.batch_alter_table("analysis", schema=None) as batch_op: + batch_op.drop_constraint(None, type_="foreignkey") + batch_op.drop_constraint(None, type_="foreignkey") + batch_op.drop_column("question_conclusion_id") + batch_op.drop_column("approach_id") # ### end Alembic commands ### From fc348700931069e07f796ed51215a7f8e7f1eb6e Mon Sep 17 00:00:00 2001 From: janosch Date: Thu, 3 Oct 2024 08:54:30 +0000 Subject: [PATCH 12/14] linter --- .../migrations/versions/87d24c7252fc_linking_analysis_and_.py | 1 + 1 file changed, 1 insertion(+) diff --git a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py index d41995818d..1ccff2499b 100644 --- a/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py +++ b/timesketch/migrations/versions/87d24c7252fc_linking_analysis_and_.py @@ -13,6 +13,7 @@ revision = "87d24c7252fc" down_revision = "c5560d97a2c8" + def upgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table("analysis", schema=None) as batch_op: From d4619ed0b3d2b1dbe90723e0a389f84addce13e0 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 7 Oct 2024 12:51:48 +0000 Subject: [PATCH 13/14] Adding unit tests for the scenarios API `check_and_run_dfiq_analysis_steps` function. --- timesketch/api/v1/resources_test.py | 129 ++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/timesketch/api/v1/resources_test.py b/timesketch/api/v1/resources_test.py index 791c4b2f7e..236b8e9bb1 100644 --- a/timesketch/api/v1/resources_test.py +++ b/timesketch/api/v1/resources_test.py @@ -26,6 +26,12 @@ from timesketch.lib.definitions import HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR from timesketch.lib.testlib import BaseTest from timesketch.lib.testlib import MockDataStore +from timesketch.lib.dfiq import DFIQ +from timesketch.api.v1.resources import scenarios +from timesketch.models.sketch import Scenario +from timesketch.models.sketch import InvestigativeQuestion +from timesketch.models.sketch import InvestigativeQuestionApproach +from timesketch.models.sketch import Facet from timesketch.api.v1.resources import ResourceMixin @@ -1403,3 +1409,126 @@ def test_system_settings_resource(self): response = self.client.get(self.resource_url) expected_response = {"DFIQ_ENABLED": False, "LLM_PROVIDER": "test"} self.assertEqual(response.json, expected_response) + + +class ScenariosResourceTest(BaseTest): + """Tests the scenarios resource.""" + + @mock.patch("timesketch.lib.analyzers.dfiq_plugins.manager.DFIQAnalyzerManager") + def test_check_and_run_dfiq_analysis_steps(self, mock_analyzer_manager): + """Test triggering analyzers for different DFIQ objects.""" + test_sketch = self.sketch1 + test_user = self.user1 + self.sketch1.set_status("ready") + self._commit_to_database(test_sketch) + + # Load DFIQ objects + dfiq_obj = DFIQ("./test_data/dfiq/") + + scenario = dfiq_obj.scenarios[0] + scenario_sql = Scenario( + dfiq_identifier=scenario.id, + uuid=scenario.uuid, + name=scenario.name, + display_name=scenario.name, + description=scenario.description, + spec_json=scenario.to_json(), + sketch=test_sketch, + user=test_user, + ) + + facet = dfiq_obj.facets[0] + facet_sql = Facet( + dfiq_identifier=facet.id, + uuid=facet.uuid, + name=facet.name, + display_name=facet.name, + description=facet.description, + spec_json=facet.to_json(), + sketch=test_sketch, + user=test_user, + ) + scenario_sql.facets = [facet_sql] + + question = dfiq_obj.questions[0] + question_sql = InvestigativeQuestion( + dfiq_identifier=question.id, + uuid=question.uuid, + name=question.name, + display_name=question.name, + description=question.description, + spec_json=question.to_json(), + sketch=test_sketch, + scenario=scenario_sql, + user=test_user, + ) + facet_sql.questions = [question_sql] + + approach = question.approaches[0] + approach_sql = InvestigativeQuestionApproach( + name=approach.name, + display_name=approach.name, + description=approach.description, + spec_json=approach.to_json(), + user=test_user, + ) + question_sql.approaches = [approach_sql] + + self._commit_to_database(approach_sql) + self._commit_to_database(question_sql) + self._commit_to_database(facet_sql) + self._commit_to_database(scenario_sql) + + # Test without analysis step + result = scenarios.check_and_run_dfiq_analysis_steps(scenario_sql, test_sketch) + self.assertFalse(result) + + result = scenarios.check_and_run_dfiq_analysis_steps(facet_sql, test_sketch) + self.assertFalse(result) + + result = scenarios.check_and_run_dfiq_analysis_steps(approach_sql, test_sketch) + self.assertFalse(result) + + # Add analysis step to approach + approach.steps.append( + { + "stage": "analysis", + "type": "timesketch-analyzer", + "value": "test_analyzer", + } + ) + approach_sql.spec_json = approach.to_json() + + # Mocking analyzer manager response. + mock_analyzer_manager.trigger_analyzers_for_approach.return_value = [ + mock.MagicMock() + ] + + # Test with analysis step + result = scenarios.check_and_run_dfiq_analysis_steps( + scenario_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY, mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + result = scenarios.check_and_run_dfiq_analysis_steps( + facet_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + result = scenarios.check_and_run_dfiq_analysis_steps( + question_sql, test_sketch, mock_analyzer_manager + ) + self.assertEqual(result, [mock.ANY]) + mock_analyzer_manager.trigger_analyzers_for_approach.assert_called_with( + approach=approach_sql + ) + + # Test with invalid object + result = scenarios.check_and_run_dfiq_analysis_steps("invalid", test_sketch) + self.assertFalse(result) From 9a4b5c0a1afbd855e53c05c38538a132a43af6e6 Mon Sep 17 00:00:00 2001 From: janosch Date: Mon, 7 Oct 2024 15:17:01 +0000 Subject: [PATCH 14/14] Ignore test files during import of analyzers. --- timesketch/lib/analyzers/dfiq_plugins/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timesketch/lib/analyzers/dfiq_plugins/manager.py b/timesketch/lib/analyzers/dfiq_plugins/manager.py index 6d68b088ea..de8fabbb76 100644 --- a/timesketch/lib/analyzers/dfiq_plugins/manager.py +++ b/timesketch/lib/analyzers/dfiq_plugins/manager.py @@ -51,7 +51,7 @@ def load_dfiq_analyzers(): for filename in os.listdir(DFIQ_ANALYZER_PATH): if filename.endswith(".py") and not filename.startswith("__"): module_name = os.path.splitext(filename)[0] # Remove .py extension - if module_name == "manager": + if module_name == "manager" or module_name.endswith("_test"): continue module_path = f"timesketch.lib.analyzers.dfiq_plugins.{module_name}" try: