From 0d96ce9ed1ce74d3420f04b4d1b0fc0870df0afb Mon Sep 17 00:00:00 2001 From: Louis Carpentier <54282224+LouisCarpentier42@users.noreply.github.com> Date: Fri, 11 Oct 2024 12:30:45 +0200 Subject: [PATCH] Added error logging in workflow and caching data sets (#3) * Added error logging in workflow and caching data sets * Update GitHub action to test notebooks * Update GitHub action to test notebooks * Minor update to test notebook workflow --- .github/workflows/python-package.yml | 9 +- .github/workflows/test-notebooks.yml | 41 ++++ CHANGELOG.md | 12 ++ README.md | 1 + docs/getting_started/custom_models.rst | 13 +- .../quantitative_evaluation.rst | 3 +- dtaianomaly/data/UCRLoader.py | 8 +- dtaianomaly/data/data.py | 27 ++- dtaianomaly/workflow/Workflow.py | 34 ++-- dtaianomaly/workflow/error_logging.py | 76 ++++++++ notebooks/Custom-models.ipynb | 4 +- notebooks/Quantitative-evaluation.ipynb | 3 +- tests/anomaly_detection/test_detectors.py | 3 +- tests/data/test_data.py | 44 ++++- tests/workflow/test_WorkFlow.py | 39 ++-- tests/workflow/test_error_logging.py | 183 ++++++++++++++++++ 16 files changed, 448 insertions(+), 52 deletions(-) create mode 100644 .github/workflows/test-notebooks.yml create mode 100644 dtaianomaly/workflow/error_logging.py create mode 100644 tests/workflow/test_error_logging.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d26294c..0a1a4d8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -20,21 +20,26 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v3 with: python-version: ${{ matrix.python-version }} + - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -r requirements.txt + pip install -r requirements-dev.txt + pip install flake8 + - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest run: | pytest diff --git a/.github/workflows/test-notebooks.yml b/.github/workflows/test-notebooks.yml new file mode 100644 index 0000000..28276d2 --- /dev/null +++ b/.github/workflows/test-notebooks.yml @@ -0,0 +1,41 @@ +name: Test Notebooks + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + test-notebooks: + runs-on: ubuntu-latest + + steps: + # Check out the repository code + - name: Checkout repository + uses: actions/checkout@v4 + + # Set up Python + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.10' # Only for one python version to save on resources + + # Install dependencies + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + pip install nbconvert jupyter + + # Install dtaianomaly locally + - name: Install dtaianomaly + run: pip install . + + # Test the notebooks + - name: Execute Notebooks + run: | + for notebook in $(find ./notebooks -name "*.ipynb"); do + jupyter nbconvert --to notebook --execute $notebook + done diff --git a/CHANGELOG.md b/CHANGELOG.md index cf59798..c602b13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,20 @@ All notable changes to this project will be documented in this file. ### Added +- When executing a workflow, and an error occurs. The errors will be written to + an error file. This file contains in which phase the error occurred and the + entire traceback of the error. Additionally, the error file contains the code + to reproduce the specific error. In fact, the error file can be run as any + python script. + ### Changed +- Added the option to cache data in ``LazyDataLoader`` via parameter ``do_caching``. + The ``load`` function in ``LazyDataLoader`` is adjusted to either load the data and + potentially cache the data, or return a cached version of the data. As a consequence, + the children of ``LazyDataLoader`` must implement the ``_load`` method (instead of + the ``load()`` method), which will effectively load the data, independent of any cache. + ### Fixed diff --git a/README.md b/README.md index 730fe17..e842386 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Dtaianomaly for Time Series Anomaly Detection +[![Documentation Status](https://readthedocs.org/projects/dtaianomaly/badge/?version=stable)](https://dtaianomaly.readthedocs.io/en/stable/?badge=stable) [![PyPi Version](https://img.shields.io/pypi/v/dtaianomaly.svg)](https://pypi.org/project/dtaianomaly/) [![Downloads](https://static.pepy.tech/badge/dtaianomaly)](https://pepy.tech/project/dtaianomaly) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/dtaianomaly)](https://pypi.python.org/pypi/dtaianomaly/) diff --git a/docs/getting_started/custom_models.rst b/docs/getting_started/custom_models.rst index e32a378..8f41239 100644 --- a/docs/getting_started/custom_models.rst +++ b/docs/getting_started/custom_models.rst @@ -66,11 +66,14 @@ Custom data loader Some dataloaders are provided within ``dtaianomaly``, but often we want to detect anomalies in our own data. Typically, for such custom data, there is no dataloader available within ``dtaianomaly``. To address this, you can implement a new dataloader by extending the -:py:class:`~dtaianomaly.data.LazyDataLoader`, along with the :py:func:`~dtaianomaly.data.LazyDataLoader.load` +:py:class:`~dtaianomaly.data.LazyDataLoader`, along with the :py:func:`~dtaianomaly.data.LazyDataLoader._load` method. Upon initialization of the custom data loader, a ``path`` parameter is required, -which points to the location of the data. The :py:func:`~dtaianomaly.data.LazyDataLoader.load` -function will then effectively load this dataset and return a :py:class:`~dtaianomaly.data.DataSet` -object, which combines the data ``X`` and ground truth labels ``y``. +which points to the location of the data. Optionally, you can pass a ``do_caching`` parameter +to prevent reading big files multiple times. The :py:func:`~dtaianomaly.data.LazyDataLoader._load` +function will effectively load this dataset and return a :py:class:`~dtaianomaly.data.DataSet` +object, which combines the data ``X`` and ground truth labels ``y``. The :py:func:`~dtaianomaly.data.LazyDataLoader.load` +function will either load the data or return a cached version of the data, depending on the +``do_caching`` property. Implementing a custom dataloader is especially useful for quantitatively evaluating the anomaly detectors on your own data, as you can pass the loader to a :py:class:`~dtaianomaly.workflow.Workflow` @@ -82,7 +85,7 @@ and easily analyze multiple detectors simultaneously. class SimpleDataLoader(LazyDataLoader): - def load(self)-> DataSet: + def _load(self)-> DataSet: """ Read a data frame with the data in column 'X' and the labels in column 'y'. """ df = pd.read_clipboard(self.path) return DataSet(x=df['X'].values, y=df['y'].values) diff --git a/docs/getting_started/quantitative_evaluation.rst b/docs/getting_started/quantitative_evaluation.rst index 8895e51..01014f6 100644 --- a/docs/getting_started/quantitative_evaluation.rst +++ b/docs/getting_started/quantitative_evaluation.rst @@ -1,7 +1,7 @@ Quantitative evaluation with a workflow ======================================= -It is crucial to qualitatively the performance of anomaly detectors +It is crucial to qualitatively evaluate the performance of anomaly detectors to know their capabilities. For this, ``dtaianomaly`` offers the :py:class:`~dtaianomaly.workflow.Workflow`: detect anomalies in a large set of time series using various detectors, and to measure their performance using multiple evaluation criteria. The :py:class:`~dtaianomaly.workflow.Workflow` @@ -94,6 +94,7 @@ method, which returns a dataframe with the results. results = workflow.run() + .. _with-config: Run a workflow from a configuration file diff --git a/dtaianomaly/data/UCRLoader.py b/dtaianomaly/data/UCRLoader.py index b00958c..ded680e 100644 --- a/dtaianomaly/data/UCRLoader.py +++ b/dtaianomaly/data/UCRLoader.py @@ -10,13 +10,9 @@ class UCRLoader(LazyDataLoader): This implementation expects the file names to contain the start and stop time stamps of the single anomaly in the time series as: '\*_start_stop.txt'. - - Parameters - ---------- - path: str - Path to a single UCR data set. """ - def load(self) -> DataSet: + + def _load(self) -> DataSet: # Load time series X = np.loadtxt(self.path) diff --git a/dtaianomaly/data/data.py b/dtaianomaly/data/data.py index 2e1097a..aedb6cd 100644 --- a/dtaianomaly/data/data.py +++ b/dtaianomaly/data/data.py @@ -38,6 +38,14 @@ class LazyDataLoader(PrettyPrintable): ---------- path: str Path to the relevant data set. + do_caching: bool, default=False + Whether to cache the loaded data or not + + Attributes + ---------- + cache_ : DataSet + Cached version of the loaded data set. Only available if ``do_caching==True`` + and the data has been loaded before. Raises ------ @@ -45,22 +53,35 @@ class LazyDataLoader(PrettyPrintable): If the given path does not point to an existing file or directory. """ path: str + do_caching: bool + cache_: DataSet - def __init__(self, path: Union[str, Path]) -> None: + def __init__(self, path: Union[str, Path], do_caching: bool = False): if not (Path(path).is_file() or Path(path).is_dir()): raise FileNotFoundError(f'No such file or directory: {path}') self.path = str(path) + self.do_caching = do_caching - @abc.abstractmethod def load(self) -> DataSet: """ - Load the dataset. + Load the dataset. If ``do_caching==True``, the loaded will be saved in the + cache if no cache is available yet, and the cached data will be returned. Returns ------- data_set: DataSet The loaded dataset. """ + if self.do_caching: + if not hasattr(self, 'cache_'): + self.cache_ = self._load() + return self.cache_ + else: + return self._load() + + @abc.abstractmethod + def _load(self) -> DataSet: + """ Abstract method to effectively load the data. """ def from_directory(directory: Union[str, Path], dataloader: Type[LazyDataLoader]) -> List[LazyDataLoader]: diff --git a/dtaianomaly/workflow/Workflow.py b/dtaianomaly/workflow/Workflow.py index 72e799e..01b5e38 100644 --- a/dtaianomaly/workflow/Workflow.py +++ b/dtaianomaly/workflow/Workflow.py @@ -1,10 +1,10 @@ -from functools import partial + import multiprocessing import time import tracemalloc -from typing import Dict, List, Union - import pandas as pd +from typing import Dict, List, Union +from functools import partial from dtaianomaly.data import LazyDataLoader from dtaianomaly.evaluation import Metric, BinaryMetric @@ -14,6 +14,7 @@ from dtaianomaly.pipeline import EvaluationPipeline from dtaianomaly.workflow.utils import build_pipelines, convert_to_proba_metrics, convert_to_list +from dtaianomaly.workflow.error_logging import log_error class Workflow: @@ -22,7 +23,10 @@ class Workflow: Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``, and ``metrics``. The metrics requiring a thresholding operation are - combined with every element of ``thresholds``. + combined with every element of ``thresholds``. If an error occurs in any + execution of an anomaly detector or loading of data, then the error will + be written to an error file, which is an executable Python file to reproduce + the error. Parameters ---------- @@ -60,12 +64,16 @@ class Workflow: Whether or not memory usage of each run is reported. While this might give additional insights into the models, their runtime will be higher due to additional internal bookkeeping. + + error_log_path: str, default='./error_logs' + The path in which the error logs should be saved. """ dataloaders: List[LazyDataLoader] pipelines: List[EvaluationPipeline] provided_preprocessors: bool n_jobs: int trace_memory: bool + error_log_path: str def __init__(self, dataloaders: Union[LazyDataLoader, List[LazyDataLoader]], @@ -74,7 +82,8 @@ def __init__(self, preprocessors: Union[Preprocessor, List[Preprocessor]] = None, thresholds: Union[Thresholding, List[Thresholding]] = None, n_jobs: int = 1, - trace_memory: bool = False): + trace_memory: bool = False, + error_log_path: str = './error_logs'): # Make sure the inputs are lists. dataloaders = convert_to_list(dataloaders) @@ -113,6 +122,7 @@ def __init__(self, self.dataloaders = dataloaders self.n_jobs = n_jobs self.trace_memory = trace_memory + self.error_log_path = error_log_path def run(self) -> pd.DataFrame: """ @@ -137,9 +147,9 @@ def run(self) -> pd.DataFrame: # Execute the jobs if self.n_jobs == 1: - result = [_single_job(*job, trace_memory=self.trace_memory) for job in unit_jobs] + result = [_single_job(*job, trace_memory=self.trace_memory, error_log_path=self.error_log_path) for job in unit_jobs] else: - single_run_function = partial(_single_job, trace_memory=self.trace_memory) + single_run_function = partial(_single_job, trace_memory=self.trace_memory, error_log_path=self.error_log_path) with multiprocessing.Pool(processes=self.n_jobs) as pool: result = pool.starmap(single_run_function, unit_jobs) @@ -160,7 +170,7 @@ def run(self) -> pd.DataFrame: return results_df -def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_memory: bool) -> Dict[str, Union[str, float]]: +def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_memory: bool, error_log_path: str) -> Dict[str, Union[str, float]]: # Initialize the results, and by default everything went wrong ('Error') results = {'Dataset': str(dataloader)} @@ -172,8 +182,8 @@ def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_ # Try to load the data set, if this fails, return the results try: dataset = dataloader.load() - except Exception as e: - print(e) + except Exception as exception: + results['Error file'] = log_error(error_log_path, exception, dataloader) return results # We can already save the used preprocessor and detector @@ -188,8 +198,8 @@ def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_ start = time.time() try: results.update(pipeline.run(X=dataset.x, y=dataset.y)) - except Exception as e: - print(e) + except Exception as exception: + results['Error file'] = log_error(error_log_path, exception, dataloader, pipeline.pipeline) stop = time.time() # Save the runtime diff --git a/dtaianomaly/workflow/error_logging.py b/dtaianomaly/workflow/error_logging.py new file mode 100644 index 0000000..0dc7fc2 --- /dev/null +++ b/dtaianomaly/workflow/error_logging.py @@ -0,0 +1,76 @@ + +import os +import datetime +import traceback + +from dtaianomaly.data import LazyDataLoader +from dtaianomaly.pipeline import Pipeline + + +def log_error(error_log_path: str, exception: Exception, data_loader: LazyDataLoader, pipeline: Pipeline = None) -> str: + + # Ensure the directory exists + os.makedirs(error_log_path, exist_ok=True) + + # Set an intuitive name for the error file based on the given data loader and pipeline + base_file_name = data_loader.__class__.__name__ + if pipeline is not None: + base_file_name += f'-{pipeline.detector.__class__.__name__}' + + # Ensure that the file name is unique + while True: + now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + if not os.path.exists(f'{error_log_path}/{base_file_name}-{now}.err'): + break + file_path = f'{error_log_path}/{base_file_name}-{now}.err' + + # Write away the logging + with open(file_path, 'w') as error_file: + + # Create an error message as a string + if pipeline is None: # Didn't reach anomaly detection when error occurred. + error_message = 'An error occurred while loading data!' + else: + error_message = 'An error occurred while detecting anomalies!' + error_message += '\nCode to reproduce the error is at the bottom of this error-log.\n\n' + + # Add the error message + error_message += 'Traceback (most recent call last):\n\n' + error_message += '\n'.join(traceback.format_tb(exception.__traceback__)) + error_message += f'\n{exception.__class__.__name__}: {exception}' + + # Make sure the message is in python comments + error_message = '# ' + error_message.replace('\n', '\n# ') + + # Write the error message + error_file.write(error_message) + + # Add an empty line below the error message + error_file.write('\n\n') + + # Add the imports to the file + error_file.write('import numpy as np\n') + error_file.write('from dtaianomaly.data import *\n') + if pipeline is not None: + error_file.write('from dtaianomaly.preprocessing import *\n') + error_file.write('from dtaianomaly.anomaly_detection import *\n') + error_file.write('from dtaianomaly.pipeline import Pipeline\n') + + # Ad an empty line + error_file.write('\n') + + # Add code for loading the data + error_file.write(f'data_loader = {data_loader}\n') + error_file.write(f'data = data_loader.load()\n\n') + + # Add code for detecting anomalies + if pipeline is not None: + error_file.write(f'preprocessor = {pipeline.preprocessor}\n') + error_file.write(f'detector = {pipeline.detector}\n') + error_file.write('pipeline = Pipeline(\n' + ' preprocessor=preprocessor,\n' + ' detector=detector\n' + ')\n') + error_file.write('y_pred = pipeline.fit(data.x, data.y).predict_proba(data.x)\n\n') + + return os.path.abspath(file_path) diff --git a/notebooks/Custom-models.ipynb b/notebooks/Custom-models.ipynb index 9591890..e9df273 100644 --- a/notebooks/Custom-models.ipynb +++ b/notebooks/Custom-models.ipynb @@ -86,7 +86,7 @@ "source": [ "## Custom data loader\n", "\n", - "Some dataloaders are provided within ``dtaianomaly``, but often we want to detect anomalies in our own data. Typically, for such custom data, there is no dataloader available within ``dtaianomaly``. To address this, you can implement a new dataloader by extending the ``LazyDataLoader`` object, along with the ``load()`` method. Upon initialization of the custom data loader, a ``path`` parameter is required, which locates the data. The ``load()`` function will then effectively load this dataset and return a ``DataSet`` object, which combines the data ``X`` and ground truth labels ``y``. \n", + "Some dataloaders are provided within ``dtaianomaly``, but often we want to detect anomalies in our own data. Typically, for such custom data, there is no dataloader available within ``dtaianomaly``. To address this, you can implement a new dataloader by extending the ``LazyDataLoader`` object, along with the ``_load`` method. Upon initialization of the custom data loader, a ``path`` parameter is required, which points to the location of the data. Optionally, you can pass a ``do_caching`` parameter to prevent reading big files multiple times. The ``_load`` function will effectively load this dataset and return a ``DataSet`` object, which combines the data ``X`` and ground truth labels ``y``. The ``load`` function will either load the data or return a cached version of the data, depending on the ``do_caching`` property.\n", "\n", "Implementing a custom dataloader is especially useful for quantitatively evaluating the anomaly detectors on your own data, as you can pass the loader to a ``Workflow`` and easily analyze multiple detectors simultaneously. " ], @@ -105,7 +105,7 @@ "\n", "class SimpleDataLoader(LazyDataLoader):\n", " \n", - " def load(self)-> DataSet:\n", + " def _load(self)-> DataSet:\n", " \"\"\" Read a data frame with the data in column 'X' and the labels in column 'y'. \"\"\"\n", " df = pd.read_clipboard(self.path)\n", " return DataSet(x=df['X'].values, y=df['y'].values)" diff --git a/notebooks/Quantitative-evaluation.ipynb b/notebooks/Quantitative-evaluation.ipynb index f1f9673..7eb1b5a 100644 --- a/notebooks/Quantitative-evaluation.ipynb +++ b/notebooks/Quantitative-evaluation.ipynb @@ -6,8 +6,7 @@ "source": [ "# Quantitative evaluation with a workflow\n", "\n", - "It is crucial to qualitatively the performance of anomaly detectors to know their capabilities. For this, ``dtaianomaly`` offers the ``Workflow``: detect anomalies in a large set of time series using various detectors, and to measure their performance using multiple evaluation criteria. The ``Workflow`` facilitates the validation of the anomaly detectors, because you only need to define\n", - "the different components.\n", + "It is crucial to qualitatively evaluate the performance of anomaly detectors to know their capabilities. For this, ``dtaianomaly`` offers the ``Workflow``: detect anomalies in a large set of time series using various detectors, and to measure their performance using multiple evaluation criteria. The ``Workflow`` facilitates the validation of the anomaly detectors, because you only need to define the different components.\n", "\n", "There are two ways to run a ``Workflow`` from Python or from a configuration file.\n", "\n", diff --git a/tests/anomaly_detection/test_detectors.py b/tests/anomaly_detection/test_detectors.py index 0c32bc0..c19247a 100644 --- a/tests/anomaly_detection/test_detectors.py +++ b/tests/anomaly_detection/test_detectors.py @@ -11,7 +11,8 @@ @pytest.fixture(params=[ anomaly_detection.IsolationForest(15), - anomaly_detection.LocalOutlierFactor(15), + anomaly_detection.LocalOutlierFactor(15, novelty=False), + anomaly_detection.LocalOutlierFactor(15, novelty=True), anomaly_detection.MatrixProfileDetector(15), pipeline.Pipeline(preprocessing.Identity(), anomaly_detection.IsolationForest(15)) ]) diff --git a/tests/data/test_data.py b/tests/data/test_data.py index 4a89071..0a05e6b 100644 --- a/tests/data/test_data.py +++ b/tests/data/test_data.py @@ -1,12 +1,13 @@ import numpy as np import pytest +import time from dtaianomaly.data import LazyDataLoader, DataSet, from_directory class DummyLoader(LazyDataLoader): - def load(self) -> DataSet: + def _load(self) -> DataSet: return DataSet(x=np.array([]), y=np.array([])) @@ -29,6 +30,47 @@ def test_str(self, tmp_path): assert str(DummyLoader(tmp_path)) == f"DummyLoader(path='{tmp_path}')" +class CostlyDummyLoader(LazyDataLoader): + NB_SECONDS_SLEEP = 1.5 + + def _load(self) -> DataSet: + time.sleep(self.NB_SECONDS_SLEEP) + return DataSet(x=np.array([]), y=np.array([])) + + +class TestCaching: + + def test_caching(self): + loader = CostlyDummyLoader(path='.', do_caching=True) + assert not hasattr(loader, 'cache_') + + # First load takes a long time + start = time.time() + loader.load() + assert time.time() - start >= loader.NB_SECONDS_SLEEP + assert hasattr(loader, 'cache_') + + # Second load is fast + start = time.time() + loader.load() + assert time.time() - start < loader.NB_SECONDS_SLEEP + + def test_no_caching(self): + loader = CostlyDummyLoader(path='.', do_caching=False) + assert not hasattr(loader, 'cache_') + + # First load takes a long time + start = time.time() + loader.load() + assert time.time() - start >= loader.NB_SECONDS_SLEEP + assert not hasattr(loader, 'cache_') + + # Second load is also slow + start = time.time() + loader.load() + assert time.time() - start >= loader.NB_SECONDS_SLEEP + + class TestFromDirectory: def test_no_directory(self): diff --git a/tests/workflow/test_WorkFlow.py b/tests/workflow/test_WorkFlow.py index 70c8f53..ce12554 100644 --- a/tests/workflow/test_WorkFlow.py +++ b/tests/workflow/test_WorkFlow.py @@ -145,10 +145,7 @@ def test_invalid_nb_jobs(self, tmp_path_factory): class DummyDataLoader(LazyDataLoader): - def __init__(self, path: str): - super().__init__(path) - - def load(self) -> DataSet: + def _load(self) -> DataSet: X, y = demonstration_time_series() return DataSet(X, y) @@ -273,7 +270,7 @@ def test_no_preprocessors(self, tmp_path_factory, univariate_time_series): class DummyDataLoaderError(LazyDataLoader): - def load(self) -> DataSet: + def _load(self) -> DataSet: raise Exception('Dummy exception') @@ -309,10 +306,11 @@ def test_failed_to_read_data(self, tmp_path_factory): preprocessors=[Identity(), ZNormalizer()], detectors=[LocalOutlierFactor(15), IsolationForest(15)], n_jobs=1, - trace_memory=True + trace_memory=True, + error_log_path=str(tmp_path_factory.mktemp('error-log')) ) results = workflow.run() - assert results.shape == (8, 10) + assert results.shape == (8, 11) assert results['Dataset'].value_counts()[f"DummyDataLoader(path='{path}')"] == 4 assert results['Dataset'].value_counts()[f"DummyDataLoaderError(path='{path}')"] == 4 assert results['Preprocessor'].value_counts()['Identity()'] == 2 @@ -322,7 +320,8 @@ def test_failed_to_read_data(self, tmp_path_factory): assert 'Peak Memory [MB]' in results.columns assert (results == 'Error').any().sum() == 9 assert (results == 'Error').any(axis=1).sum() == 4 - assert not results.isna().any().any() + assert 'Error file' in results.columns + assert results['Error file'].isna().sum() == 4 def test_failed_to_preprocess(self, tmp_path_factory): path = str(tmp_path_factory.mktemp('some-path-1')) @@ -335,10 +334,11 @@ def test_failed_to_preprocess(self, tmp_path_factory): preprocessors=[PreprocessorError(), ZNormalizer()], detectors=[LocalOutlierFactor(15), IsolationForest(15)], n_jobs=1, - trace_memory=True + trace_memory=True, + error_log_path=str(tmp_path_factory.mktemp('error-log')) ) results = workflow.run() - assert results.shape == (4, 10) + assert results.shape == (4, 11) assert results['Dataset'].value_counts()[f"DummyDataLoader(path='{path}')"] == 4 assert results['Preprocessor'].value_counts()['PreprocessorError()'] == 2 assert results['Preprocessor'].value_counts()['ZNormalizer()'] == 2 @@ -347,7 +347,8 @@ def test_failed_to_preprocess(self, tmp_path_factory): assert 'Peak Memory [MB]' in results.columns assert (results == 'Error').any().sum() == 5 assert (results == 'Error').any(axis=1).sum() == 2 - assert not results.isna().any().any() + assert 'Error file' in results.columns + assert results['Error file'].isna().sum() == 2 def test_failed_to_fit_model(self, tmp_path_factory): path = str(tmp_path_factory.mktemp('some-path-1')) @@ -360,10 +361,11 @@ def test_failed_to_fit_model(self, tmp_path_factory): preprocessors=[Identity(), ZNormalizer()], detectors=[DetectorError(), IsolationForest(15)], n_jobs=1, - trace_memory=True + trace_memory=True, + error_log_path=str(tmp_path_factory.mktemp('error-log')) ) results = workflow.run() - assert results.shape == (4, 10) + assert results.shape == (4, 11) assert results['Dataset'].value_counts()[f"DummyDataLoader(path='{path}')"] == 4 assert results['Preprocessor'].value_counts()['Identity()'] == 2 assert results['Preprocessor'].value_counts()['ZNormalizer()'] == 2 @@ -372,7 +374,8 @@ def test_failed_to_fit_model(self, tmp_path_factory): assert 'Peak Memory [MB]' in results.columns assert (results == 'Error').any().sum() == 5 assert (results == 'Error').any(axis=1).sum() == 2 - assert not results.isna().any().any() + assert 'Error file' in results.columns + assert results['Error file'].isna().sum() == 2 def test_failed_to_preprocess_and_to_fit_model(self, tmp_path_factory): path = str(tmp_path_factory.mktemp('some-path-1')) @@ -385,10 +388,11 @@ def test_failed_to_preprocess_and_to_fit_model(self, tmp_path_factory): preprocessors=[PreprocessorError(), ZNormalizer()], detectors=[DetectorError(), IsolationForest(15)], n_jobs=1, - trace_memory=True + trace_memory=True, + error_log_path=str(tmp_path_factory.mktemp('error-log')) ) results = workflow.run() - assert results.shape == (4, 10) + assert results.shape == (4, 11) assert results['Dataset'].value_counts()[f"DummyDataLoader(path='{path}')"] == 4 assert results['Preprocessor'].value_counts()['PreprocessorError()'] == 2 assert results['Preprocessor'].value_counts()['ZNormalizer()'] == 2 @@ -397,4 +401,5 @@ def test_failed_to_preprocess_and_to_fit_model(self, tmp_path_factory): assert 'Peak Memory [MB]' in results.columns assert (results == 'Error').any().sum() == 5 assert (results == 'Error').any(axis=1).sum() == 3 - assert not results.isna().any().any() + assert 'Error file' in results.columns + assert results['Error file'].isna().sum() == 1 diff --git a/tests/workflow/test_error_logging.py b/tests/workflow/test_error_logging.py new file mode 100644 index 0000000..2d9429f --- /dev/null +++ b/tests/workflow/test_error_logging.py @@ -0,0 +1,183 @@ + +import os +import sys +import py_compile +import subprocess +import pathlib + +from dtaianomaly.data import LazyDataLoader, DataSet, demonstration_time_series +from dtaianomaly.anomaly_detection import BaseDetector, IsolationForest +from dtaianomaly.preprocessing import Preprocessor, Identity, ChainedPreprocessor +from dtaianomaly.pipeline import Pipeline +from dtaianomaly.evaluation import AreaUnderROC +from dtaianomaly.workflow import Workflow +from dtaianomaly.workflow.error_logging import log_error + + +class DemonstrationDataLoader(LazyDataLoader): + + def __init__(self): + super().__init__('.') + + def _load(self) -> DataSet: + X, y = demonstration_time_series() + return DataSet(X, y) + + +class ErrorDataLoader(LazyDataLoader): + + def _load(self): + raise Exception('An error occurred when loading data!') + + +class ErrorPreprocessor(Preprocessor): + + def _fit(self, X, y=None): + return self + + def _transform(self, X, y=None): + raise Exception('An error occurred preprocessing data!') + + +class ErrorAnomalyDetector(BaseDetector): + + def fit(self, X, y=None): + return self + + def decision_function(self, X): + raise Exception('An error occurred when detecting anomalies!') + + +class TestErrorLogging: + + def test_error_loading(self, tmp_path_factory): + workflow = Workflow( + dataloaders=ErrorDataLoader('.'), + metrics=AreaUnderROC(), + preprocessors=ChainedPreprocessor(Identity(), ErrorPreprocessor()), + detectors=IsolationForest(15), + error_log_path=str(tmp_path_factory.mktemp('error-log')) + ) + results = workflow.run() + + assert results.shape == (1, 6) + assert 'Error file' in results.columns + + error_file = results.loc[0, 'Error file'] + error = Exception('An error occurred when loading data!') + assert error_file_has_correct_syntax(error_file) + assert error_file_contains_error(error_file, error) + assert error_file_results_in_error(error_file, error) + + def test_error_preprocessing(self, tmp_path_factory): + workflow = Workflow( + dataloaders=DemonstrationDataLoader(), + metrics=AreaUnderROC(), + preprocessors=ErrorPreprocessor(), + detectors=IsolationForest(15), + error_log_path=str(tmp_path_factory.mktemp('error-log')) + ) + results = workflow.run() + + assert results.shape == (1, 6) + assert 'Error file' in results.columns + + error_file = results.loc[0, 'Error file'] + error = Exception('An error occurred preprocessing data!') + assert error_file_has_correct_syntax(error_file) + assert error_file_contains_error(error_file, error) + assert error_file_results_in_error(error_file, error) + + def test_error_chained_preprocessing(self, tmp_path_factory): + workflow = Workflow( + dataloaders=DemonstrationDataLoader(), + metrics=AreaUnderROC(), + preprocessors=ErrorPreprocessor(), + detectors=IsolationForest(15), + error_log_path=str(tmp_path_factory.mktemp('error-log')) + ) + results = workflow.run() + + assert results.shape == (1, 6) + assert 'Error file' in results.columns + + error_file = results.loc[0, 'Error file'] + error = Exception('An error occurred preprocessing data!') + assert error_file_has_correct_syntax(error_file) + assert error_file_contains_error(error_file, error) + assert error_file_results_in_error(error_file, error) + + def test_error_detecting_anomalies(self, tmp_path_factory): + workflow = Workflow( + dataloaders=DemonstrationDataLoader(), + metrics=AreaUnderROC(), + preprocessors=Identity(), + detectors=ErrorAnomalyDetector(), + error_log_path=str(tmp_path_factory.mktemp('error-log')) + ) + results = workflow.run() + + assert results.shape == (1, 6) + assert 'Error file' in results.columns + + error_file = results.loc[0, 'Error file'] + error = Exception('An error occurred when detecting anomalies!') + assert error_file_has_correct_syntax(error_file) + assert error_file_contains_error(error_file, error) + assert error_file_results_in_error(error_file, error) + + def test_log_no_exception(self, tmp_path_factory): + error = Exception('Dummy') + error_file = log_error( + error_log_path=str(tmp_path_factory.mktemp('error-log')), + exception=Exception('Dummy'), + data_loader=DemonstrationDataLoader(), + pipeline=Pipeline( + preprocessor=Identity(), + detector=IsolationForest(15) + ) + ) + assert error_file_has_correct_syntax(error_file) + assert error_file_contains_error(error_file, error) + assert error_file_runs_successfully(error_file) + + +def error_file_has_correct_syntax(error_file): + try: + py_compile.compile(error_file, doraise=True) + return True + except py_compile.PyCompileError: + return False + + +def error_file_contains_error(error_file, error): + with open(error_file, 'r') as file: + for line in file: + if line.startswith('#') and str(error) in line: + return True + return False + + +def error_file_results_in_error(error_file, error): + output = _run_error_file(error_file) + return output.returncode == 1 and str(error) in output.stderr + + +def error_file_runs_successfully(error_file): + output = _run_error_file(error_file) + return output.returncode == 0 + + +def _run_error_file(error_file): + # Include this file to the python path to find the classes + current_dir = os.path.dirname(os.path.abspath(__file__)) + env = os.environ.copy() + env['PYTHONPATH'] = current_dir + os.pathsep + env.get('PYTHONPATH', '') + + # Add this file as import + with open(error_file, 'r+') as file: + content = file.read() + file.seek(0, 0) + file.write(f'from {pathlib.Path(__file__).stem} import *\n' + content) + + return subprocess.run([sys.executable, error_file], capture_output=True, text=True, env=env)