Source code for adtk.metrics.metrics

"""Module for metrics that scores predicted anomaly list against ground truth.
"""

import pandas as pd

from ..aggregator import AndAggregator, OrAggregator
from ..data import validate_events

__all__ = ["recall", "precision", "f1_score", "iou"]


[docs]def recall(y_true, y_pred, thresh=0.5): """Recall score of prediction. Recall, a.k.a. sensitivity, hit rate, or true positive rate (TPR), is the percentage of true anomalies that are detected successfully. When the input is anomaly labels, metric calculation treats every time point with positive label as an independent event. An anomalous time point that are detected as anomalous is considered as a successful detection. When the input is lists of anomalous time windows, metric calculation treats every anomalous time window as an independent event. A true event is considered as successfully detected if the percentage of this time window included in the detected list is greater or equal to `thresh`. Note that input time windows will be merged first if overlapped time windows exists in the list. Parameters ---------- y_true: pandas Series or DataFrame, list, or dict Labels or lists of true anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. y_pred: pandas Series or DataFrame, list, or dict Labels or lists of predicted anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. thresh: float, optional Threshold of a hit. Only used if input is list or dict. Default: 0.5. Returns ------- float or dict Score(s) for each type of anomaly. """ if (thresh <= 0) or (thresh > 1): raise ValueError( "Parameter `thresh` must be a positive number less or equal to 1." ) if type(y_true) != type(y_pred): raise TypeError("y_true and y_pred must have same type.") if isinstance(y_true, pd.Series): try: pd.testing.assert_index_equal(y_true.index, y_pred.index) except AssertionError: raise ValueError("y_true and y_pred must have identical index") if y_true.clip(0, 1).round().sum() != 0: return ( y_true.clip(0, 1).round() * y_pred.clip(0, 1).round() ).sum() / (y_true.clip(0, 1).round()).sum() else: return float("nan") elif isinstance(y_true, pd.DataFrame): if set(y_true.columns) != set(y_pred.columns): raise ValueError("y_true and y_pred must have identical columns.") return { col: recall(y_true[col], y_pred[col]) for col in y_true.columns } elif isinstance(y_true, list): y_true = validate_events(y_true) if not y_true: return float("nan") y_int = AndAggregator().aggregate({"y_true": y_true, "y_pred": y_pred}) n_hit = 0 for w_true in y_true: if isinstance(w_true, tuple): true_start = w_true[0] true_end = w_true[1] else: true_start = w_true true_end = w_true len_w_true = true_end - true_start if len_w_true > pd.Timedelta(0): len_overlap = pd.Timedelta(0) for w_int in y_int: if isinstance(w_int, tuple): int_start = w_int[0] int_end = w_int[1] else: int_start = w_int int_end = w_int len_overlap += max( pd.Timedelta(0), min(true_end, int_end) - max(true_start, int_start), ) if len_overlap >= thresh * len_w_true: n_hit += 1 else: for w_int in y_int: if isinstance(w_int, tuple): int_start = w_int[0] int_end = w_int[1] else: int_start = w_int int_end = w_int if (int_start <= true_start) and (int_end >= true_end): n_hit += 1 break return n_hit / len(y_true) elif isinstance(y_true, dict): return { key: recall(y_true[key], y_pred[key], thresh=thresh) for key in y_true.keys() } else: raise TypeError( "y_true and y_pred must be pandas Series or DataFrame, list, or a " "dict of lists" )
[docs]def precision(y_true, y_pred, thresh=0.5): """Precision score of prediction. Precision, a.k.a. positive predictive value (PPV), is the percentage of predicted anomalies that are true anomalies. When the input is anomaly labels, metric calculation treats every time point with positive label as an independent event. An anomalous time point that are detected as anomalous is considered as a successful detection. When the input is lists of anomalous time windows, metric calculation treats every anomalous time window as an independent event. A detected event is considered as a successfully detection if the percentage of this time window included in the true anomaly list is greater or equal to `thresh`. Note that input time windows will be merged first if overlapped time windows exists in the list. Parameters ---------- y_true: pandas Series or DataFrame, list, or dict Labels or lists of true anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. y_pred: pandas Series or DataFrame, list, or dict Labels or lists of predicted anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. thresh: float, optional Threshold of a hit. Only used if input is list or dict. Default: 0.5. Returns ------- float or dict Score(s) for each type of anomaly. """ return recall(y_pred, y_true, thresh=thresh)
[docs]def f1_score(y_true, y_pred, recall_thresh=0.5, precision_thresh=0.5): """F1 score of prediction. F1 score is the harmonic mean of precision and recall. For more details about precision and recall, please refer to function `precision` and `recall`. Parameters ---------- y_true: pandas Series or DataFrame, list, or dict Labels or lists of true anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. y_pred: pandas Series or DataFrame, list, or dict Labels or lists of predicted anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. recall_thresh: float, optional Threshold of recall calculation. Only used if input is list or dict. For more details, please refer to function `recall`. Default: 0.5. precision_thresh: float, optional Threshold of precision calculation. Only used if input is list or dict. For more details, please refer to function `precision`. Default: 0.5. Returns ------- float or dict Score(s) for each type of anomaly. """ recall_score = recall(y_true, y_pred, recall_thresh) precision_score = precision(y_true, y_pred, precision_thresh) if not isinstance(recall_score, dict): if recall_score + precision_score != 0: return ( 2 * recall_score * precision_score / (recall_score + precision_score) ) else: return float("nan") else: return { key: ( ( 2 * recall_score[key] * precision_score[key] / (recall_score[key] + precision_score[key]) ) if (recall_score[key] + precision_score[key] != 0) else float("nan") ) for key in recall_score.keys() }
[docs]def iou(y_true, y_pred): """IoU (Intersection over Union) score of prediction. Intersection over union is the length ratio between time segments that are identified as anomalous in both lists and those identified by at least one of the two lists. When the input is anomaly labels, metric calculation counts the number of anomalous time points. When the input is lists of anomalous time windows, metric calculation measure the length of time segments. Parameters ---------- y_true: pandas Series or DataFrame, list, or dict Labels or lists of true anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. y_pred: pandas Series or DataFrame, list, or dict Labels or lists of predicted anomalies. - If pandas Series, it is treated as binary labels along time index. - If pandas DataFrame, each column is treated as a type of anomaly. - If list, it is treated as a list of anomalous time windows. - If dict of lists, each item is treated as a type of anomaly. Returns ------- float or dict Score(s) for each type of anomaly. """ if type(y_true) != type(y_pred): raise TypeError("y_true and y_pred must have same type.") if isinstance(y_true, pd.Series): try: pd.testing.assert_index_equal(y_true.index, y_pred.index) except AssertionError: raise ValueError("y_true and y_pred must have identical index") if (y_true.clip(0, 1).round() + y_pred.clip(0, 1).round()).clip( 0, 1 ).sum() != 0: return ( (y_true.clip(0, 1).round() * y_pred.clip(0, 1).round()).sum() / (y_true.clip(0, 1).round() + y_pred.clip(0, 1).round()) .clip(0, 1) .sum() ) else: return float("nan") elif isinstance(y_true, pd.DataFrame): if set(y_true.columns) != set(y_pred.columns): raise ValueError("y_true and y_pred must have identical columns.") return {col: iou(y_true[col], y_pred[col]) for col in y_true.columns} elif isinstance(y_true, list): y_int = AndAggregator().aggregate({"y_true": y_true, "y_pred": y_pred}) y_union = OrAggregator().aggregate( {"y_true": y_true, "y_pred": y_pred} ) len_int = sum( [ (w[1] - w[0]).total_seconds() if isinstance(w, tuple) else 0 for w in y_int ] ) len_union = sum( [ (w[1] - w[0]).total_seconds() if isinstance(w, tuple) else 0 for w in y_union ] ) if len_union == 0: return float("nan") return len_int / len_union elif isinstance(y_true, dict): return {key: iou(y_true[key], y_pred[key]) for key in y_true.keys()} else: raise TypeError( "y_true and y_pred must be pandas Series or DataFrame, list, or a " "dict of lists" )