Skip to content

Commit

Permalink
Added error logging in workflow and caching data sets (#3)
Browse files Browse the repository at this point in the history
* Added error logging in workflow and caching data sets

* Update GitHub action to test notebooks

* Update GitHub action to test notebooks

* Minor update to test notebook workflow
  • Loading branch information
LouisCarpentier42 authored Oct 11, 2024
1 parent 28b05d4 commit 0d96ce9
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 52 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,26 @@ jobs:

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install -r requirements.txt
pip install -r requirements-dev.txt
pip install flake8
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
41 changes: 41 additions & 0 deletions .github/workflows/test-notebooks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Test Notebooks

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
test-notebooks:
runs-on: ubuntu-latest

steps:
# Check out the repository code
- name: Checkout repository
uses: actions/checkout@v4

# Set up Python
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.10' # Only for one python version to save on resources

# Install dependencies
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
pip install nbconvert jupyter
# Install dtaianomaly locally
- name: Install dtaianomaly
run: pip install .

# Test the notebooks
- name: Execute Notebooks
run: |
for notebook in $(find ./notebooks -name "*.ipynb"); do
jupyter nbconvert --to notebook --execute $notebook
done
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,20 @@ All notable changes to this project will be documented in this file.

### Added

- When executing a workflow, and an error occurs. The errors will be written to
an error file. This file contains in which phase the error occurred and the
entire traceback of the error. Additionally, the error file contains the code
to reproduce the specific error. In fact, the error file can be run as any
python script.

### Changed

- Added the option to cache data in ``LazyDataLoader`` via parameter ``do_caching``.
The ``load`` function in ``LazyDataLoader`` is adjusted to either load the data and
potentially cache the data, or return a cached version of the data. As a consequence,
the children of ``LazyDataLoader`` must implement the ``_load`` method (instead of
the ``load()`` method), which will effectively load the data, independent of any cache.

### Fixed


Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Dtaianomaly for Time Series Anomaly Detection

[![Documentation Status](https://readthedocs.org/projects/dtaianomaly/badge/?version=stable)](https://dtaianomaly.readthedocs.io/en/stable/?badge=stable)
[![PyPi Version](https://img.shields.io/pypi/v/dtaianomaly.svg)](https://pypi.org/project/dtaianomaly/)
[![Downloads](https://static.pepy.tech/badge/dtaianomaly)](https://pepy.tech/project/dtaianomaly)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/dtaianomaly)](https://pypi.python.org/pypi/dtaianomaly/)
Expand Down
13 changes: 8 additions & 5 deletions docs/getting_started/custom_models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ Custom data loader
Some dataloaders are provided within ``dtaianomaly``, but often we want to detect anomalies
in our own data. Typically, for such custom data, there is no dataloader available within
``dtaianomaly``. To address this, you can implement a new dataloader by extending the
:py:class:`~dtaianomaly.data.LazyDataLoader`, along with the :py:func:`~dtaianomaly.data.LazyDataLoader.load`
:py:class:`~dtaianomaly.data.LazyDataLoader`, along with the :py:func:`~dtaianomaly.data.LazyDataLoader._load`
method. Upon initialization of the custom data loader, a ``path`` parameter is required,
which points to the location of the data. The :py:func:`~dtaianomaly.data.LazyDataLoader.load`
function will then effectively load this dataset and return a :py:class:`~dtaianomaly.data.DataSet`
object, which combines the data ``X`` and ground truth labels ``y``.
which points to the location of the data. Optionally, you can pass a ``do_caching`` parameter
to prevent reading big files multiple times. The :py:func:`~dtaianomaly.data.LazyDataLoader._load`
function will effectively load this dataset and return a :py:class:`~dtaianomaly.data.DataSet`
object, which combines the data ``X`` and ground truth labels ``y``. The :py:func:`~dtaianomaly.data.LazyDataLoader.load`
function will either load the data or return a cached version of the data, depending on the
``do_caching`` property.

Implementing a custom dataloader is especially useful for quantitatively evaluating the anomaly
detectors on your own data, as you can pass the loader to a :py:class:`~dtaianomaly.workflow.Workflow`
Expand All @@ -82,7 +85,7 @@ and easily analyze multiple detectors simultaneously.
class SimpleDataLoader(LazyDataLoader):
def load(self)-> DataSet:
def _load(self)-> DataSet:
""" Read a data frame with the data in column 'X' and the labels in column 'y'. """
df = pd.read_clipboard(self.path)
return DataSet(x=df['X'].values, y=df['y'].values)
Expand Down
3 changes: 2 additions & 1 deletion docs/getting_started/quantitative_evaluation.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Quantitative evaluation with a workflow
=======================================

It is crucial to qualitatively the performance of anomaly detectors
It is crucial to qualitatively evaluate the performance of anomaly detectors
to know their capabilities. For this, ``dtaianomaly`` offers the :py:class:`~dtaianomaly.workflow.Workflow`:
detect anomalies in a large set of time series using various detectors, and to measure
their performance using multiple evaluation criteria. The :py:class:`~dtaianomaly.workflow.Workflow`
Expand Down Expand Up @@ -94,6 +94,7 @@ method, which returns a dataframe with the results.
results = workflow.run()
.. _with-config:

Run a workflow from a configuration file
Expand Down
8 changes: 2 additions & 6 deletions dtaianomaly/data/UCRLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ class UCRLoader(LazyDataLoader):
This implementation expects the file names to contain the start and
stop time stamps of the single anomaly in the time series as:
'\*_start_stop.txt'.
Parameters
----------
path: str
Path to a single UCR data set.
"""
def load(self) -> DataSet:

def _load(self) -> DataSet:
# Load time series
X = np.loadtxt(self.path)

Expand Down
27 changes: 24 additions & 3 deletions dtaianomaly/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,50 @@ class LazyDataLoader(PrettyPrintable):
----------
path: str
Path to the relevant data set.
do_caching: bool, default=False
Whether to cache the loaded data or not
Attributes
----------
cache_ : DataSet
Cached version of the loaded data set. Only available if ``do_caching==True``
and the data has been loaded before.
Raises
------
FileNotFoundError
If the given path does not point to an existing file or directory.
"""
path: str
do_caching: bool
cache_: DataSet

def __init__(self, path: Union[str, Path]) -> None:
def __init__(self, path: Union[str, Path], do_caching: bool = False):
if not (Path(path).is_file() or Path(path).is_dir()):
raise FileNotFoundError(f'No such file or directory: {path}')
self.path = str(path)
self.do_caching = do_caching

@abc.abstractmethod
def load(self) -> DataSet:
"""
Load the dataset.
Load the dataset. If ``do_caching==True``, the loaded will be saved in the
cache if no cache is available yet, and the cached data will be returned.
Returns
-------
data_set: DataSet
The loaded dataset.
"""
if self.do_caching:
if not hasattr(self, 'cache_'):
self.cache_ = self._load()
return self.cache_
else:
return self._load()

@abc.abstractmethod
def _load(self) -> DataSet:
""" Abstract method to effectively load the data. """


def from_directory(directory: Union[str, Path], dataloader: Type[LazyDataLoader]) -> List[LazyDataLoader]:
Expand Down
34 changes: 22 additions & 12 deletions dtaianomaly/workflow/Workflow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from functools import partial

import multiprocessing
import time
import tracemalloc
from typing import Dict, List, Union

import pandas as pd
from typing import Dict, List, Union
from functools import partial

from dtaianomaly.data import LazyDataLoader
from dtaianomaly.evaluation import Metric, BinaryMetric
Expand All @@ -14,6 +14,7 @@
from dtaianomaly.pipeline import EvaluationPipeline

from dtaianomaly.workflow.utils import build_pipelines, convert_to_proba_metrics, convert_to_list
from dtaianomaly.workflow.error_logging import log_error


class Workflow:
Expand All @@ -22,7 +23,10 @@ class Workflow:
Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``,
and ``metrics``. The metrics requiring a thresholding operation are
combined with every element of ``thresholds``.
combined with every element of ``thresholds``. If an error occurs in any
execution of an anomaly detector or loading of data, then the error will
be written to an error file, which is an executable Python file to reproduce
the error.
Parameters
----------
Expand Down Expand Up @@ -60,12 +64,16 @@ class Workflow:
Whether or not memory usage of each run is reported. While this
might give additional insights into the models, their runtime
will be higher due to additional internal bookkeeping.
error_log_path: str, default='./error_logs'
The path in which the error logs should be saved.
"""
dataloaders: List[LazyDataLoader]
pipelines: List[EvaluationPipeline]
provided_preprocessors: bool
n_jobs: int
trace_memory: bool
error_log_path: str

def __init__(self,
dataloaders: Union[LazyDataLoader, List[LazyDataLoader]],
Expand All @@ -74,7 +82,8 @@ def __init__(self,
preprocessors: Union[Preprocessor, List[Preprocessor]] = None,
thresholds: Union[Thresholding, List[Thresholding]] = None,
n_jobs: int = 1,
trace_memory: bool = False):
trace_memory: bool = False,
error_log_path: str = './error_logs'):

# Make sure the inputs are lists.
dataloaders = convert_to_list(dataloaders)
Expand Down Expand Up @@ -113,6 +122,7 @@ def __init__(self,
self.dataloaders = dataloaders
self.n_jobs = n_jobs
self.trace_memory = trace_memory
self.error_log_path = error_log_path

def run(self) -> pd.DataFrame:
"""
Expand All @@ -137,9 +147,9 @@ def run(self) -> pd.DataFrame:

# Execute the jobs
if self.n_jobs == 1:
result = [_single_job(*job, trace_memory=self.trace_memory) for job in unit_jobs]
result = [_single_job(*job, trace_memory=self.trace_memory, error_log_path=self.error_log_path) for job in unit_jobs]
else:
single_run_function = partial(_single_job, trace_memory=self.trace_memory)
single_run_function = partial(_single_job, trace_memory=self.trace_memory, error_log_path=self.error_log_path)
with multiprocessing.Pool(processes=self.n_jobs) as pool:
result = pool.starmap(single_run_function, unit_jobs)

Expand All @@ -160,7 +170,7 @@ def run(self) -> pd.DataFrame:
return results_df


def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_memory: bool) -> Dict[str, Union[str, float]]:
def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_memory: bool, error_log_path: str) -> Dict[str, Union[str, float]]:

# Initialize the results, and by default everything went wrong ('Error')
results = {'Dataset': str(dataloader)}
Expand All @@ -172,8 +182,8 @@ def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_
# Try to load the data set, if this fails, return the results
try:
dataset = dataloader.load()
except Exception as e:
print(e)
except Exception as exception:
results['Error file'] = log_error(error_log_path, exception, dataloader)
return results

# We can already save the used preprocessor and detector
Expand All @@ -188,8 +198,8 @@ def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_
start = time.time()
try:
results.update(pipeline.run(X=dataset.x, y=dataset.y))
except Exception as e:
print(e)
except Exception as exception:
results['Error file'] = log_error(error_log_path, exception, dataloader, pipeline.pipeline)
stop = time.time()

# Save the runtime
Expand Down
Loading

0 comments on commit 0d96ce9

Please sign in to comment.