Source code for adtk.detector._detector_hd

"""Module for high-dimensional detectors.

High-dimensional detectors detect anomalies from high-dimensional time series,
i.e. from pandas DataFrame.
"""

from collections import Counter
from typing import Any, Callable, Dict, Optional, Tuple

import pandas as pd

from .._detector_base import _TrainableMultivariateDetector
from ..aggregator import AndAggregator
from ..detector import InterQuartileRangeAD, ThresholdAD
from ..pipe import Pipeline, Pipenet
from ..transformer import (
    CustomizedTransformer1D,
    PcaReconstructionError,
    RegressionResidual,
)


[docs]class CustomizedDetectorHD(_TrainableMultivariateDetector): """Multivariate detector derived from a user-given function and parameters. Parameters ---------- detect_func: function A function detecting anomalies from multivariate time series. The first input argument must be a pandas DataFrame, optional input argument may be accepted through parameter `detect_func_params` and the output of `fit_func`, and the output must be a binary pandas Series with the same index as input. detect_func_params: dict, optional Parameters of `detect_func`. Default: None. fit_func: function, optional A function training parameters of `detect_func` with multivariate time series. The first input argument must be a pandas Series, optional input argument may be accepted through parameter `fit_func_params`, and the output must be a dict that can be used by `detect_func` as parameters. Default: None. fit_func_params: dict, optional Parameters of `fit_func`. Default: None. """ def __init__( self, detect_func: Callable, detect_func_params: Optional[Dict[str, Any]] = None, fit_func: Optional[Callable] = None, fit_func_params: Optional[Dict[str, Any]] = None, ) -> None: self._fitted_detect_func_params = {} # type: Dict super().__init__() self.detect_func = detect_func self.detect_func_params = detect_func_params self.fit_func = fit_func self.fit_func_params = fit_func_params if self.fit_func is None: self._fitted = 1 @property def _param_names(self) -> Tuple[str, ...]: return ( "detect_func", "detect_func_params", "fit_func", "fit_func_params", ) def _fit_core(self, df: pd.DataFrame) -> None: if self.fit_func is not None: if self.fit_func_params is not None: fit_func_params = self.fit_func_params else: fit_func_params = {} self._fitted_detect_func_params = self.fit_func( df, **fit_func_params ) def _predict_core(self, df: pd.DataFrame) -> pd.Series: if self.detect_func_params is not None: detect_func_params = self.detect_func_params else: detect_func_params = {} if self.fit_func is not None: return self.detect_func( df, **{**self._fitted_detect_func_params, **detect_func_params} ) else: return self.detect_func(df, **detect_func_params)
[docs]class MinClusterDetector(_TrainableMultivariateDetector): """Detector that detects anomaly based on clustering of historical data. This detector peforms clustering using a clustering model, and identifies a time points as anomalous if it belongs to the minimal cluster. Parameters ---------- model: object A clustering model to be used for clustering time series values. Same as a clustering model in scikit-learn, the model should minimally have a `fit` method and a `predict` method. The `predict` method should return an array of cluster labels. """ def __init__(self, model: Any) -> None: super().__init__() self.model = model @property def _param_names(self) -> Tuple[str, ...]: return ("model",) def _fit_core(self, df: pd.DataFrame) -> None: if df.dropna().empty: raise RuntimeError("Valid values are not enough for training.") clustering_result = self.model.fit_predict(df.dropna()) cluster_count = Counter(clustering_result) # type: Counter self._anomalous_cluster_id = cluster_count.most_common()[-1][0] def _predict_core(self, df: pd.DataFrame) -> pd.Series: cluster_id = pd.Series(float("nan"), index=df.index) if not df.dropna().empty: cluster_id.loc[df.dropna().index] = self.model.predict(df.dropna()) predicted = pd.Series( cluster_id == self._anomalous_cluster_id, index=df.index ) predicted[cluster_id.isna()] = float("nan") return predicted
[docs]class OutlierDetector(_TrainableMultivariateDetector): """Detector that detects anomaly based on a outlier detection model. This detector peforms time-independent outlier detection using given model, and identifies a time points as anomalous if it is labelled as an outlier. Parameters ---------- model: object An outlier detection model to be used. Same as a outlier detection model in scikit-learn (e.g. EllipticEnvelope, IsolationForest, LocalOutlierFactor), the model should minimally have a `fit_predict` method, or `fit` and `predict` methods. The `fit_predict` or `predict` method should return an array of outlier indicators where outliers are marked by -1. """ def __init__(self, model: Any) -> None: super().__init__() self.model = model @property def _param_names(self) -> Tuple[str, ...]: return ("model",) def _fit_core(self, df: pd.DataFrame) -> None: if hasattr(self.model, "fit"): if df.dropna().empty: raise RuntimeError("Valid values are not enough for training.") self.model.fit(df.dropna()) def _predict_core(self, df: pd.DataFrame) -> pd.Series: is_outliers = pd.Series(float("nan"), index=df.index) if not df.dropna().empty: if hasattr(self.model, "predict"): is_outliers.loc[df.dropna().index] = ( self.model.predict(df.dropna()) == -1 ) else: is_outliers.loc[df.dropna().index] = ( self.model.fit_predict(df.dropna()) == -1 ) predicted = pd.Series(is_outliers == 1, index=df.index) predicted[is_outliers.isna()] = float("nan") return predicted
# ============================================================================= # PLEASE PUT PIPE-DERIVED DETECTOR CLASSES BELOW THIS LINE # =============================================================================
[docs]class RegressionAD(_TrainableMultivariateDetector): """Detector that detects anomalous inter-series relationship. This detector performs regression to build relationship between a target series and the rest of series, and identifies a time point as anomalous when the residual of regression is anomalously large. This detector is internally implemented as a `Pipenet` object. Advanced users may learn more details by checking attribute `pipe_`. Parameters ---------- target: str Name of the column to be regarded as target variable. regressor: object Regressor to be used. Same as a scikit-learn regressor, it should minimally have `fit` and `predict` methods. c: float, optional Factor used to determine the bound of normal range based on historical interquartile range. Default: 3.0. side: str, optional - If "both", to detect anomalous positive and negative residuals; - If "positive", to only detect anomalous positive residuals; - If "negative", to only detect anomalous negative residuals. Default: "both". Attributes ---------- pipe_: adtk.pipe.Pipenet Internal pipenet object. """ def __init__( self, regressor: Any, target: str, c: float = 3.0, side: str = "both" ) -> None: self.pipe_ = Pipenet( { "regression_residual": { "model": RegressionResidual( regressor=regressor, target=target ), "input": "original", }, "abs_residual": { "model": CustomizedTransformer1D(transform_func=abs), "input": "regression_residual", }, "iqr_ad": { "model": InterQuartileRangeAD((None, c)), "input": "abs_residual", }, "sign_check": { "model": ThresholdAD( high=( 0.0 if side == "positive" else ( float("inf") if side == "negative" else -float("inf") ) ), low=( 0.0 if side == "negative" else ( -float("inf") if side == "positive" else float("inf") ) ), ), "input": "regression_residual", }, "and": { "model": AndAggregator(), "input": ["iqr_ad", "sign_check"], }, } ) super().__init__() self.regressor = regressor self.target = target self.side = side self.c = c self._sync_params() @property def _param_names(self) -> Tuple[str, ...]: return ("regressor", "target", "c", "side") def _sync_params(self) -> None: if self.side not in ["both", "positive", "negative"]: raise ValueError( "Parameter `side` must be 'both', 'positive' or 'negative'." ) self.pipe_.steps["regression_residual"][ "model" ].regressor = self.regressor self.pipe_.steps["regression_residual"]["model"].set_params( target=self.target ) self.pipe_.steps["iqr_ad"]["model"].set_params(c=(None, self.c)) self.pipe_.steps["sign_check"]["model"].set_params( high=( 0.0 if self.side == "positive" else ( float("inf") if self.side == "negative" else -float("inf") ) ), low=( 0.0 if self.side == "negative" else ( -float("inf") if self.side == "positive" else float("inf") ) ), ) def _fit_core(self, s: pd.DataFrame) -> None: self._sync_params() self.pipe_.fit(s) def _predict_core(self, s: pd.DataFrame) -> pd.Series: self._sync_params() return self.pipe_.detect(s)
[docs]class PcaAD(_TrainableMultivariateDetector): """Detector that detects outlier point with principal component analysis. This detector performs principal component analysis (PCA) to the multivariate time series (every time point is treated as a point in high- dimensional space), measures reconstruction error at every time point, and identifies a time point as anomalous when the recontruction error is beyond anomalously large. This detector is internally implemented as a `Pipeline` object. Advanced users may learn more details by checking attribute `pipe_`. Parameters ---------- k: int, optional Number of principal components to use. Default: 1. c: float, optional Factor used to determine the bound of normal range based on historical interquartile range. Default: 5.0. Attributes ---------- pipe_: adtk.pipe.Pipenet Internal pipenet object. """ def __init__(self, k: int = 1, c: float = 5.0) -> None: self.pipe_ = Pipeline( [ ("pca_reconstruct_error", PcaReconstructionError(k=k)), ("ad", InterQuartileRangeAD(c=c)), ] ) super().__init__() self.k = k self.c = c self._sync_params() @property def _param_names(self) -> Tuple[str, ...]: return ("k", "c") def _sync_params(self) -> None: self.pipe_.steps[0][1].set_params(k=self.k) self.pipe_.steps[1][1].set_params(c=self.c) def _fit_core(self, s: pd.DataFrame) -> None: self._sync_params() self.pipe_.fit(s) def _predict_core(self, s: pd.DataFrame) -> pd.Series: self._sync_params() return self.pipe_.detect(s)