Source code for adtk.aggregator._aggregator

"""Module for aggregators.

An aggregator combines multiple lists of anomalies into one.

"""

from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import pandas as pd

from .._aggregator_base import _Aggregator
from ..data import validate_events


[docs]class CustomizedAggregator(_Aggregator): """Aggregator derived from a user-given function and parameters. Parameters ---------- aggregate_func: function A function aggregating multiple types of anomaly. The first input argument must be a pandas DataFrame, a dict of pandas Series/DataFrame, or a dict of event lists. - If a pandas DataFrame, every column is a binary Series representing a type of anomaly. - If a dict of pandas Series/DataFrame, every value of the dict is a binary Series/DataFrame representing a type or some types of anomaly; - If a dict of list, every value of the dict is a type of anomaly as a list of events, where each event is represented as a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval. Optional input argument may be accepted through parameter `aggregate_func_params`. The output must be a list of pandas Timestamps. - If input is a pandas DataFrame or a dict of Series/DataFrame, return a single binary pandas Series; - If input is a dict of lists, return a single list of events. aggregate_func_params: dict, optional Parameters of `aggregate_func`. Default: None. """ def __init__( self, aggregate_func: Callable, aggregate_func_params: Optional[Dict[str, Any]] = None, ) -> None: super().__init__() self.aggregate_func = aggregate_func self.aggregate_func_params = aggregate_func_params @property def _param_names(self) -> Tuple[str, ...]: return ("aggregate_func", "aggregate_func_params") def _predict_core( self, lists: Union[ pd.DataFrame, Dict[str, Union[pd.Series, pd.DataFrame]], Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], ], ], ) -> Union[ pd.Series, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ]: if self.aggregate_func_params is None: aggregate_func_params = {} else: aggregate_func_params = self.aggregate_func_params return self.aggregate_func(lists, **aggregate_func_params)
[docs]class OrAggregator(_Aggregator): """Aggregator that identifies a time point as anomalous as long as it is included in one of the input anomaly lists. """ def __init__(self) -> None: super().__init__() @property def _param_names(self) -> Tuple[str, ...]: return tuple() def _predict_core( self, lists: Union[ pd.DataFrame, Dict[str, Union[pd.Series, pd.DataFrame]], Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], ], ], ) -> Union[ pd.Series, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ]: if isinstance(lists, dict): if isinstance(next(iter(lists.values())), list): clean_lists = { key: validate_events(value) for key, value in lists.items() } return validate_events( [ window for clean_predict in clean_lists.values() for window in clean_predict ] ) else: # a dict of pandas Series/DataFrame return self._predict_core( pd.concat(lists, join="outer", axis=1) ) else: # pandas DataFrame predicted = lists.any(axis=1) predicted[~predicted & lists.isna().any(axis=1)] = float("nan") return predicted
[docs]class AndAggregator(_Aggregator): """Aggregator that identifies a time point as anomalous only if it is included in all the input anomaly lists. """ def __init__(self) -> None: super().__init__() @property def _param_names(self) -> Tuple[str, ...]: return tuple() def _predict_core( self, lists: Union[ pd.DataFrame, Dict[str, Union[pd.Series, pd.DataFrame]], Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], ], ], ) -> Union[ pd.Series, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ]: if isinstance(lists, dict): if isinstance(next(iter(lists.values())), list): clean_lists = { key: validate_events(value, point_as_interval=True) for key, value in lists.items() } time_window_stats = { key: pd.Series( [0] * len(clean_predict) + [1] * 2 * len(clean_predict) + [0] * len(clean_predict), index=( [ window[0] - pd.Timedelta("1ns") for window in clean_predict ] + [window[0] for window in clean_predict] + [window[1] for window in clean_predict] + [ window[1] + pd.Timedelta("1ns") for window in clean_predict ] ), dtype=int, ).sort_index() for key, clean_predict in clean_lists.items() } # type: Union[Dict, pd.Series] time_window_stats = { key: value[~value.index.duplicated()] for key, value in time_window_stats.items() } time_window_stats = ( pd.concat(time_window_stats, axis=1, join="outer") .fillna(method="ffill") .fillna(method="bfill") .fillna(0) ) time_window_stats = time_window_stats.all(axis=1) status = 0 last_t = None aggregated_predict = [] for t, v in time_window_stats.items(): if (status == 0) and (v == 1): start = t status = 1 if (status == 1) and (v == 0): end = last_t aggregated_predict.append((start, end)) status = 0 last_t = t return validate_events(aggregated_predict) else: # a dict of pandas Series/DataFrame return self._predict_core( pd.concat(lists, join="outer", axis=1) ) else: # pandas DataFrame predicted = lists.all(axis=1) predicted[predicted & lists.isna().any(axis=1)] = float("nan") return predicted