Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windstats monthly #170

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions merlion/models/anomaly/windstats_monthly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Copyright (c) 2023 salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#
"""
Window Statistics anomaly detection model for data with monthly seasonality.
"""
import datetime
import logging

import numpy
import pandas as pd

from merlion.evaluate.anomaly import TSADMetric
from merlion.models.anomaly.base import DetectorConfig, DetectorBase
from merlion.post_process.threshold import AggregateAlarms
from merlion.transform.moving_average import DifferenceTransform
from merlion.utils import UnivariateTimeSeries, TimeSeries

logger = logging.getLogger(__name__)


class WindStatsConfig(DetectorConfig):
"""
Config class for `WindStats`.
"""

_default_transform = DifferenceTransform()

@property
def _default_threshold(self):
t = 3.0 if self.enable_calibrator else 8.8
return AggregateAlarms(
alm_threshold=t, alm_window_minutes=self.wind_sz, alm_suppress_minutes=120, min_alm_in_window=1
)

def __init__(self, wind_sz=30, max_day=4, **kwargs):
"""
:param wind_sz: the window size in minutes, default is 30 minute window
:param max_day: maximum number of month days stored in memory (only mean
and std of each window are stored). Here, the days are first
bucketed by month day and then by window id.
"""
self.wind_sz = wind_sz
self.max_day = max_day
super().__init__(**kwargs)


class MonthlyWindStats(DetectorBase):
"""
Sliding Window Statistics based Anomaly Detector.
This detector assumes the time series comes with a monthly seasonality.
It divides the month into buckets of the specified size (in minutes). For
a given (t, v) it computes an anomaly score by comparing the current
value v against the historical values (mean and standard deviation) for
that window of time.
Note that if multiple matches (specified by the parameter max_day) can be
found in history with the same day and same time window, then the
minimum of the scores is returned.
"""

config_class = WindStatsConfig

def __init__(self, config: WindStatsConfig = None):
"""
config.wind_sz: the window size in minutes, default is 30 minute window
config.max_days: maximum number of days stored in memory (only mean and std of each window are stored), default is 4 days
here the days are first bucketized and then bucketized by window id.
"""
super().__init__(WindStatsConfig() if config is None else config)
self.table = {}

@property
def require_even_sampling(self) -> bool:
return False

@property
def require_univariate(self) -> bool:
return True

@property
def _default_post_rule_train_config(self):
return dict(metric=TSADMetric.F1, unsup_quantile=None)

def _get_anomaly_score(self, time_series: pd.DataFrame, time_series_prev: pd.DataFrame = None) -> pd.DataFrame:
times, scores = [], []
for t, (x,) in zip(time_series.index, time_series.values):
t = t.timetuple()
key = (t.tm_mday, (t.tm_hour * 60 + t.tm_min) // self.config.wind_sz)
if key in self.table:
stats = self.table[key]
score = []
for d, mu, sigma in stats:
if sigma == 0: # handle missing value
score.append(0)
else:
score.append((x - mu) / sigma)
else:
score = [0]
scores.append(min(score, key=abs))

return pd.DataFrame(scores, index=time_series.index)

def _train(self, train_data: pd.DataFrame, train_config=None) -> pd.DataFrame:
# first build a hashtable with (day in the month, yearofday, and window id of the day) as key.
# the value is a list of metrics
table = {}
for time, x in zip(train_data.index, train_data.values):
t = time.timetuple()
code = (t.tm_mday, t.tm_yday, (t.tm_hour * 60 + t.tm_min) // self.config.wind_sz)
if code in table:
table[code].append(x)
else:
table[code] = [x]

# for each bucket, compute the mean and standard deviation
for t, x in table.items():
md, d, h = t
key = (md, h)
v1 = numpy.array(x)
mu = numpy.mean(v1)
sigma = numpy.std(v1)
if key in self.table:
self.table[key].append((d, mu, sigma))
else:
self.table[key] = [(d, mu, sigma)]

# cut out maximum number of days saved in the table. only store the latest max_day
for t, x in self.table.items():
self.table[t] = sorted(x, key=lambda x: x[0])
if len(self.table[t]) > self.config.max_day:
self.table[t] = self.table[t][-self.config.max_day :]

return self._get_anomaly_score(train_data)


54 changes: 54 additions & 0 deletions merlion/models/anomaly/windstats_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This is the running file that implements windstats with both weekly and monthly seasonalities.
For the implementation of only weekly/monthly seasonality, specify "enable_weekly" of "enable_monthly" arguments of RunWindStats().
"""

from windstats import WindStats, WindStatsConfig
from windstats_monthly import MonthlyWindStats, MonthlyWindStatsConfig
from ts_datasets.anomaly import NAB
from merlion.utils import TimeSeries
from merlion.post_process.threshold import AggregateAlarms

class RunWindStats:
def __init__(self, threshold, enable_weekly = True, enable_monthly = True, WeeklyWindStatsConfig = WindStatsConfig(), MonthlyWindStatsConfig = MonthlyWindStatsConfig()):
"""
Users can customize the configuration for weekly or monthly-based windstats. If not, then the default configuration will apply.
"""

self.enable_weekly = enable_weekly
self.enable_monthly = enable_monthly
assert self.enable_weekly == True or self.enable_monthly == True, "Must enable either weekly or monthly seasonality, or both!"

# Threshold on identifying anomaly based on anomaly score.
self.threshold = threshold

if self.enable_weekly:
self.model_weekly = WindStats(WeeklyWindStatsConfig)

if self.enable_monthly:
self.model_monthly = MonthlyWindStats(MonthlyWindStatsConfig)

def anomalyByScore(self, scores, threshold):
scores.loc[abs(scores["anom_score"]) <= threshold] = 0
scores.loc[abs(scores["anom_score"]) > threshold] = 1

scores.rename(columns = {"anom_score": "anomaly"}, inplace = True)
return scores

def run(self, ts):
if self.enable_weekly:
scores_weekly = self.model_weekly.train(ts).to_pd()
scores_weekly = self.anomalyByScore(scores_weekly, self.threshold)

if self.enable_monthly:
scores_monthly = self.model_monthly.train(ts).to_pd()
scores_monthly = self.anomalyByScore(scores_monthly, self.threshold)

if self.enable_weekly and self.enable_monthly:
return scores_weekly * scores_monthly
elif self.enable_weekly:
return scores_weekly
else:
return scores_monthly
3 changes: 2 additions & 1 deletion tests/anomaly/test_dpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_full(self):
self.model.save(dirname=join(rootdir, "tmp", "dpad"))
loaded_model = DeepPointAnomalyDetector.load(dirname=join(rootdir, "tmp", "dpad"))
loaded_alarms = loaded_model.get_anomaly_label(self.test_data)
n_loaded_alarms = sum(loaded_alarms.to_pd().values != 0)
n_loaded_alarms = np.sum(loaded_alarms.to_pd().values != 0)
self.assertAlmostEqual(n_loaded_alarms, n_alarms, delta=1)

# Evaluation
Expand All @@ -94,3 +94,4 @@ def test_full(self):
format="%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s", stream=sys.stdout, level=logging.DEBUG
)
unittest.main()

6 changes: 3 additions & 3 deletions tests/transform/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ def test_two_month(self):
logger.info("Testing start-of-month resampling with an offset...")
self._test_granularity(granularity="2MS", offset=pd.Timedelta(days=3, hours=6, minutes=30))
logger.info("Testing end-of-month resampling...")
self._test_granularity(granularity="2M")
self._test_granularity(granularity="2ME")
logger.info("Testing end-of-month resampling...")
self._test_granularity(granularity="2M", offset=-pd.Timedelta(days=7, hours=7))
self._test_granularity(granularity="2ME", offset=-pd.Timedelta(days=7, hours=7))

def test_yearly(self):
logger.info("Testing start-of-year resampling...")
self._test_granularity(granularity="12MS", offset=pd.to_timedelta(0))
logger.info("Testing end-of-year resampling...")
self._test_granularity(granularity="12M", offset=pd.to_timedelta(0))
self._test_granularity(granularity="12ME", offset=pd.to_timedelta(0))


class TestShingle(unittest.TestCase):
Expand Down
Loading