"""Module for model pipeline and pipenet.
Pipeline or Pipenet connects multiple models (transformers, detectors, and/or
aggregators) into a "super" model that may perform complex anomaly detection
process.
"""
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.collections import PatchCollection
from matplotlib.lines import Line2D
from matplotlib.patches import Circle
from tabulate import tabulate
from .._aggregator_base import _Aggregator
from .._base import _Model, _TrainableModel
from .._detector_base import ( # _NonTrainableMultivariateDetector,
_NonTrainableUnivariateDetector,
_TrainableMultivariateDetector,
_TrainableUnivariateDetector,
)
from .._transformer_base import (
_NonTrainableMultivariateTransformer,
_NonTrainableUnivariateTransformer,
_TrainableMultivariateTransformer,
_TrainableUnivariateTransformer,
)
from ..metrics import f1_score, iou, precision, recall
_Detector = (
_NonTrainableUnivariateDetector,
# _NonTrainableMultivariateDetector,
_TrainableUnivariateDetector,
_TrainableMultivariateDetector,
)
_Transformer = (
_NonTrainableUnivariateTransformer,
_NonTrainableMultivariateTransformer,
_TrainableUnivariateTransformer,
_TrainableMultivariateTransformer,
)
[docs]class Pipeline:
"""A Pipeline object chains transformers and a detector sequentially.
Parameters
----------
steps: list of 2-tuples (str, object)
Components of this pipeline. Each 2-tuple represents a step in the
pipeline (step name, model object).
Examples
--------
>>> steps = [('moving average', RollingAggregate(agg='mean', window=10)),
('filter quantile 0.99', QuantileAD(high=0.99))]
>>> myPipeline = Pipeline(steps)
"""
def __init__(self, steps: List[Tuple[str, _Model]]) -> None:
self.steps = steps
self._pipenet = Pipenet()
self._update_internal_pipenet()
def _update_internal_pipenet(self) -> None:
pipenet_steps = dict()
last_name = "original"
for pipeline_step in self.steps:
pipenet_steps.update(
{
pipeline_step[0]: {
"model": pipeline_step[1],
"input": last_name,
}
}
)
last_name = pipeline_step[0]
self._pipenet.steps = pipenet_steps
[docs] def fit(
self,
ts: Union[pd.Series, pd.DataFrame],
skip_fit: Optional[List[str]] = None,
return_intermediate: bool = False,
) -> Optional[Dict[str, Optional[Union[pd.Series, pd.DataFrame]]]]:
"""Train all models in the pipeline sequentially.
Parameters
----------
ts: pandas Series or DataFrame
Time series used to train models.
skip_fit: list, optional
Models to skip training. This could be used when pipeline contains
models that are already trained by the same time series, and re-
training would be time consuming. It must be a list of strings
where each element is a model name. Default: None.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
Returns
-------
dict, optional
If return_intermediate=True, return intermediate results generated
during training as a dictionary where keys are step names. If a
step does not perform transformation or detection, the result of
that step will be None.
"""
self._update_internal_pipenet()
return self._pipenet.fit(
ts=ts, skip_fit=skip_fit, return_intermediate=return_intermediate
)
[docs] def detect(
self,
ts: Union[pd.Series, pd.DataFrame],
return_intermediate: bool = False,
return_list: bool = False,
) -> Union[
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
Dict[
str,
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[
Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]
],
],
],
],
]:
"""Transform time series sequentially along pipeline, and detect
anomalies with the last detector.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
return_list: bool, optional
Whether to return a list of anomalous events, or a binary series
indicating normal/anomalous. Default: False.
Returns
-------
pandas Series, pandas DataFrame, list, or dict
Detected anomalies.
- If return_intermediate=False, return detected anomalies, i.e.
result from last detector.
- If return_intermediate=True, return results of all models in
pipeline as a dict where each item represents the result of a
model.
- If return_list=False, result from a detector or an aggregator
will be a binary pandas Series indicating normal/anomalous.
- If return_list=True, result from a detector or an aggregator
will be a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
"""
self._update_internal_pipenet()
return self._pipenet.detect(
ts=ts,
return_intermediate=return_intermediate,
return_list=return_list,
)
[docs] def fit_detect(
self,
ts: Union[pd.Series, pd.DataFrame],
skip_fit: Optional[List[str]] = None,
return_intermediate: bool = False,
return_list: bool = False,
) -> Union[
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
Dict[
str,
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[
Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]
],
],
],
],
]:
"""Train models in pipeline sequentially, transform time series along
pipeline, and use the last detector to detect anomalies.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
skip_fit: list, optional
Models to skip training. This could be used when pipeline contains
models that are already trained by the same time series, and re-
training would be time consuming. It must be a list of strings
where each element is a model name. Default: None.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
return_list: bool, optional
Whether to return a list of anomalous events, or a binary series
indicating normal/anomalous. Default: False.
Returns
-------
pandas Series, pandas DataFrame, list, or dict
Detected anomalies.
- If return_intermediate=False, return detected anomalies, i.e.
result from last detector.
- If return_intermediate=True, return results of all models in
pipeline as a dict where each item represents the result of a
model.
- If return_list=False, result from a detector or an aggregator
will be a binary pandas Series indicating normal/anomalous.
- If return_list=True, result from a detector or an aggregator
will be a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
"""
self._update_internal_pipenet()
return self._pipenet.fit_detect(
ts=ts,
skip_fit=skip_fit,
return_intermediate=return_intermediate,
return_list=return_list,
)
[docs] def score(
self,
ts: Union[pd.Series, pd.DataFrame],
anomaly_true: Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
scoring: str = "recall",
**kwargs: Any
) -> Union[float, Dict[str, float]]:
"""Detect anomalies and score the results against true anomalies.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
anomaly_true: pandas Series or list
True anomalies.
- If pandas Series, it is treated as a series of binary labels.
- If list, a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
scoring: str, optional
Scoring function to use. Must be one of "recall", "precision",
"f1", and "iou". See module `metrics` for more information.
Default: "recall"
**kwargs
Optional parameters for scoring function. See module `metrics` for
more information.
Returns
-------
float
Score of detection result.
"""
if scoring == "recall":
scoring_func = recall # type: Callable
elif scoring == "precision":
scoring_func = precision
elif scoring == "f1":
scoring_func = f1_score
elif scoring == "iou":
scoring_func = iou
else:
raise ValueError(
"Argument `scoring` must be one of 'recall', 'precision', "
"'f1' and 'iou'."
)
if isinstance(anomaly_true, pd.Series):
return scoring_func(
y_true=anomaly_true,
y_pred=self.detect(ts, return_list=False),
**kwargs
)
else:
return scoring_func(
y_true=anomaly_true,
y_pred=self.detect(ts, return_list=True),
**kwargs
)
[docs] def get_params(self) -> Dict[str, Dict[str, Any]]:
"""Get parameters of models in pipeline.
Returns
-------
dict
A dictionary of model name and model parameters.
"""
self._update_internal_pipenet()
return self._pipenet.get_params()
[docs]class Pipenet:
"""A Pipenet object connects transformers, detectors and aggregators.
Parameters
----------
steps: dicts
Components of the pipenet. Each key-value item represents a step (
transformer, detector, or aggregator), where key is the unique name of
the step and the value is a dict with the following key-value pairs:
- input (str or list of str): Input to the model, which must be
either 'original' (i.e. the input time series), or the name of
a upstream component.
- subset (str, list of str, or list of lists of str, optional): If
a model does not use all series from an input component, use this
field to specify which series should be included. If not given or
"all", all series from the input component will be used.
- model (object): A detector, transformer, or aggregator object.
Attributes
----------
steps_graph_: OrderedDict
Order of steps to be executed. Keys are step names, values are 2-tuple
(i, j) where i is the index of execution round and j is the the index
within a round.
final_step_: str
Name of the final step to be executed. It is the single step in the
last round of execution in attribute `steps_graph_`.
Examples
--------
The following example show how to use a Pipenet to build a level shift
detector with some basic transformers, detectors, and aggregator.
>>> from adtk.detector import QuantileAD, ThresholdAD
>>> from adtk.transformer import DoubleRollingAggregate
>>> from adtk.aggregator import AndAggregator
>>> from adtk.pipe import Pipenet
>>> steps = {
"diff_abs": {
"input": "original",
"model": DoubleRollingAggregate(
agg="median",
window=20,
center=True,
diff="l1",
),
},
"quantile_ad": {
"input": "diff_abs",
"model": QuantileAD(high=0.99, low=0),
},
"diff": {
"input": "original",
"model": DoubleRollingAggregate(
agg="median",
window=20,
center=True,
diff="diff",
),
},
"sign_check": {
"input": "diff",
"model": ThresholdAD(high=0.0, low=-float("inf")),
},
"and": {
"model": AndAggregator(),
"input": ["quantile_ad", "sign_check"],
},
}
>>> myPipenet = Pipenet(steps)
"""
def __init__(
self, steps: Optional[Dict[str, Dict[str, Any]]] = None
) -> None:
if steps is None:
self.steps = dict()
else:
self.steps = steps
self._validate()
def _validate(self) -> None:
"""
Check the following issues and raise error if found
- steps is not a list of dict
- some step does not have required keys
- some step has invalid keys other than required and optional keys
- some step's `name` is not str
- some steps have duplicated name
- some step uses reserved name
- some step's `input` is not str or list of str
- some step's `input` is not a step name or "original"
- some step's `subset` (optional) is not a str ("all"),
or a list of (str ("all") or list of str)
- some step's `subset` does not have the same length as `input`
- aggregator has a `subset` field
- some step's `model` is not a valid ADTK model object
- upstream of a transformer or a detector must be a transformer
- upstream of aggregator must be a detector or an aggregator
Create a directed graph of model execution flowchart, when following
issues will be checked:
- some step cannot be reached
- more than one outlet steps found
"""
# check if step is a dict
if not isinstance(self.steps, dict):
raise TypeError("`steps` must be a dict objects.")
if not all([isinstance(value, dict) for value in self.steps.values()]):
raise TypeError("Values of dict `steps` must be dict objects.")
# check if each step has valid keys
if not all(
[
{"model", "input"}
<= step.keys()
<= {"model", "input", "subset"}
for step in self.steps.values()
]
):
raise KeyError(
"Each step must be a dict object with keys `model`, `input`, "
"and `subset` (optional)."
)
# check if each step has valid name
if not all(
[isinstance(step_name, str) for step_name in self.steps.keys()]
):
raise TypeError("Model name must be a string.")
if any([step_name == "original" for step_name in self.steps.keys()]):
raise ValueError(
"'original' is a reserved name for original time series input "
"and you may not use it as a model name."
)
# check if each step has valid input
def islistofstr(li: Any) -> bool:
if not isinstance(li, list):
return False
if not all([isinstance(x, str) for x in li]):
return False
return True
if not all(
[
isinstance(step["input"], str) or islistofstr(step["input"])
for step in self.steps.values()
]
):
raise TypeError(
"Field `input` must be a string or a list of strings."
)
name_set = set(self.steps.keys()).union({"original"})
for step in self.steps.values():
if (
isinstance(step["input"], str)
and (step["input"] not in name_set)
) or (
isinstance(step["input"], list)
and (not set(step["input"]) <= name_set)
):
raise ValueError(
"Field `input` must be 'original' or name of a model."
)
# check if only one step is not used as input (so it is the output)
for step in self.steps.values():
if isinstance(step["input"], str):
name_set = name_set - {step["input"]}
else:
name_set = name_set - set(step["input"])
if len(name_set) > 1:
raise ValueError(
"Pipenet output is ambiguous: "
"found more than one steps with no downstream step: {}.".format(
name_set
)
)
# check if each step has a valid subset, or has no subset (then we
# will add default one to it)
for step_name, step in self.steps.items():
if isinstance(step["input"], str):
if "subset" not in step.keys():
pass
elif not (
(
isinstance(step["subset"], str)
or islistofstr(step["subset"])
)
):
raise TypeError(
"Field `subset` at step '{}' is invalid.".format(
step_name
)
)
elif isinstance(step["subset"], str) and (
step["subset"] != "all"
):
raise ValueError(
"A subset corresponding to an input source must be "
"'all' or a list of strings (even if there is only "
"one element)."
)
else:
if "subset" not in step.keys():
pass
elif isinstance(step["subset"], str):
if step["subset"] == "all":
pass
else:
raise ValueError(
"Field `subset` at step '{}' is invalid.".format(
step_name
)
)
elif isinstance(step["subset"], list):
if len(step["input"]) != len(step["subset"]):
raise ValueError(
"Fields `input` and `subset` are inconsistent at "
"step '{}'.".format(step_name)
)
for subset in step["subset"]:
if not (
(isinstance(subset, str) or islistofstr(subset))
):
raise TypeError(
"Field `subset` at step '{}' is invalid.".format(
step_name
)
)
if isinstance(subset, str) and (subset != "all"):
raise ValueError(
"A subset corresponding to an input source "
"must be 'all' or a list of strings (even if "
"there is only one element)."
)
else:
raise TypeError(
"Field `subset` at step '{}' is invalid.".format(
step_name
)
)
# check if each step has valid model
if not all(
[isinstance(step["model"], _Model) for step in self.steps.values()]
):
raise ValueError(
"Model must be a detector, transformer, or aggregator object."
)
# check:
# 1. upstream of transformer and detector must be transformer, or input
# 2. upstream of aggregator must be detector or aggregator
for step_name, step in self.steps.items():
if isinstance(step["model"], (_Detector, _Transformer)):
if isinstance(step["input"], str):
if step["input"] == "original":
pass
elif not isinstance(
self.steps[step["input"]]["model"], _Transformer
):
raise TypeError(
"Model in step '{}' cannot accept output from "
"step '{}'.".format(step_name, step["input"])
)
else:
for input in step["input"]:
if input == "original":
pass
elif not isinstance(
self.steps[input]["model"], _Transformer
):
raise TypeError(
"Model in step '{}' cannot accept output from "
"step '{}'.".format(step_name, input)
)
elif isinstance(step["model"], _Aggregator):
if isinstance(step["input"], str):
if (step["input"] == "original") or (
not isinstance(
self.steps[step["input"]]["model"],
(_Detector, _Aggregator),
)
):
raise TypeError(
"Model in step '{}' cannot accept output from "
"step '{}'.".format(step_name, step["input"])
)
else:
for input in step["input"]:
if (input == "original") or (
not isinstance(
self.steps[input]["model"],
(_Detector, _Aggregator),
)
):
raise TypeError(
"Model in step '{}' cannot accept output from "
"step '{}'.".format(step_name, input)
)
# sort out graph
done_step = OrderedDict({"original": (0, 0)})
counter = 0
while True:
counter += 1
sub_counter = 0
done_step_name_up_to_last_round = set(done_step.keys())
for step_name, step in self.steps.items():
if step_name in done_step_name_up_to_last_round:
continue
if isinstance(step["input"], str):
if step["input"] in done_step_name_up_to_last_round:
done_step.update({step_name: (counter, sub_counter)})
sub_counter += 1
else:
if set(step["input"]) <= done_step_name_up_to_last_round:
done_step.update({step_name: (counter, sub_counter)})
sub_counter += 1
if len(done_step) == len(self.steps) + 1:
break
if sub_counter == 0:
raise ValueError(
"The following step(s) cannot be reached: {}.".format(
set(self.steps.keys()) - set(done_step.keys())
)
)
self.steps_graph_ = done_step.copy()
self.final_step_ = list(self.steps_graph_.keys())[-1]
@staticmethod
def _get_input(
step: Dict[str, Any], results: Dict[str, Any]
) -> Dict[str, Any]:
"""
Given a step block and an intermediate results dict, get the input of
this step based on fields `input` and `subset`.
"""
if isinstance(step["model"], (_Detector, _Transformer)):
if isinstance(step["input"], str):
if ("subset" not in step.keys()) or (step["subset"] == "all"):
input = results[step["input"]]
else:
if len(step["subset"]) == 1:
input = results[step["input"]][step["subset"][0]]
else:
input = results[step["input"]][step["subset"]]
else:
if ("subset" not in step.keys()) or (step["subset"] == "all"):
input = pd.concat(
[results[input_name] for input_name in step["input"]],
axis=1,
)
else:
input = pd.concat(
[
results[input_name]
if subset == "all"
else (
results[input_name][subset[0]]
if len(subset) == 1
else results[input_name][subset]
)
for input_name, subset in zip(
step["input"], step["subset"]
)
],
axis=1,
)
elif isinstance(step["model"], _Aggregator):
if isinstance(step["input"], str):
input = {step["input"]: results[step["input"]]}
else:
input = {
input_name: results[input_name]
for input_name in step["input"]
}
return input
[docs] def fit(
self,
ts: Union[pd.Series, pd.DataFrame],
skip_fit: Optional[List[str]] = None,
return_intermediate: bool = False,
) -> Optional[Dict[str, Optional[Union[pd.Series, pd.DataFrame]]]]:
"""Train models in the pipenet.
Parameters
----------
ts: pandas Series or DataFrame
Time series used to train models.
skip_fit: list, optional
Models to skip training. This could be used when pipenet contains
models that are already trained by the same time series, and re-
training would be time consuming. It must be a list of strings
where each element is a model name. Default: None.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
Returns
-------
dict, optional
If return_intermediate=True, return intermediate results generated
during training as a dictionary where keys are step names. If a
step does not perform transformation or detection, the result of
that step will be None.
"""
self._validate()
if skip_fit is None:
skip_fit = []
if not isinstance(skip_fit, list):
raise TypeError("Parameter `skip_fit` must be a list.")
if not set(skip_fit) <= set(self.steps.keys()):
raise ValueError("Name(s) in `skip_fit` is not valid model name.")
# determine the step needing fit and/or predict
need_fit = {
step_name: (
isinstance(step["model"], _TrainableModel)
& (step_name not in skip_fit)
)
for step_name, step in self.steps.items()
}
need_predict = {step_name: False for step_name in self.steps.keys()}
for step_name in list(self.steps_graph_.keys())[:0:-1]:
if need_fit[step_name] or need_predict[step_name]:
if isinstance(self.steps[step_name]["input"], str):
input = self.steps[step_name]["input"]
if input != "original":
need_predict[input] = True
else:
for input in self.steps[step_name]["input"]:
if input != "original":
need_predict[input] = True
# run fit or fit_predict
results = {"original": ts.copy()}
for step_name in list(self.steps_graph_.keys())[1:]:
step = self.steps[step_name]
if not (need_fit[step_name] or need_predict[step_name]):
results.update({step_name: None})
continue
input = self._get_input(step, results)
if need_fit[step_name] and (not need_predict[step_name]):
step["model"].fit(input)
results.update({step_name: None})
elif (not need_fit[step_name]) and need_predict[step_name]:
results.update({step_name: step["model"].predict(input)})
else:
results.update({step_name: step["model"].fit_predict(input)})
# return intermediate results
if return_intermediate:
return results
else:
return None
def _predict(
self,
ts: Union[pd.Series, pd.DataFrame],
fit: bool,
detect: bool,
skip_fit: Optional[List[str]],
return_intermediate: bool,
return_list: bool = False,
) -> Union[
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
Dict[
str,
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[
Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]
],
],
],
],
]:
"""
Private method for detect, transform, fit_detect, fit_transform
Parameters
----------
fit: bool
Whether this call is for fit_detect/fit_transform or
detect/transform.
detect: bool
Whether this call is for detect/fit_detect or
transform/fit_transform.
Others:
Same as higher-level calls
Returns
-------
Same as higher-level calls
"""
self._validate()
if skip_fit is None:
skip_fit = []
if not isinstance(skip_fit, list):
raise TypeError("Parameter `skip_fit` must be a list.")
if not set(skip_fit) <= set(self.steps.keys()):
raise ValueError("Name(s) in `skip_fit` is not valid model name.")
last_step_name = list(self.steps_graph_.keys())[-1]
if detect:
if isinstance(self.steps[last_step_name]["model"], _Transformer):
raise RuntimeError(
"This seems a transformation pipenet, "
"because model at the final step '{}' is a transformer. "
"Please use method `{}transform` instead.".format(
last_step_name, "fit_" if fit else ""
)
)
else:
if isinstance(
self.steps[last_step_name]["model"], (_Detector, _Aggregator)
):
raise RuntimeError(
"This seems a detection pipenet, "
"because model at the final step '{}' is "
"either a detector or an aggregator. "
"Please use method `{}detect` instead.".format(
last_step_name, "fit_" if fit else ""
)
)
results = {
"original": ts.copy()
} # type: Dict[str,Union[pd.Series,pd.DataFrame,List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], Dict[str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]]]]
for step_name in list(self.steps_graph_.keys())[1:]:
step = self.steps[step_name]
input = self._get_input(step, results)
results.update(
{
step_name: (
(
step["model"].fit_predict(
input, return_list=return_list
)
if isinstance(
step["model"],
(
_TrainableUnivariateDetector,
_TrainableMultivariateDetector,
),
)
else step["model"].predict(
input, return_list=return_list
)
)
if isinstance(step["model"], _Detector)
else (
step["model"].fit_predict(input)
if isinstance(
step["model"],
(
_TrainableUnivariateTransformer,
_TrainableMultivariateTransformer,
),
)
else step["model"].predict(input)
)
)
if fit and (step_name not in skip_fit)
else (
step["model"].predict(input, return_list=return_list)
if isinstance(step["model"], _Detector)
else step["model"].predict(input)
)
}
)
if return_intermediate:
return results
else:
return results[last_step_name]
[docs] def detect(
self,
ts: Union[pd.Series, pd.DataFrame],
return_intermediate: bool = False,
return_list: bool = False,
) -> Union[
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
Dict[
str,
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[
Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]
],
],
],
],
]:
"""Detect anomaly from time series using the pipenet.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
return_list: bool, optional
Whether to return a list of anomalous events, or a binary series
indicating normal/anomalous. Default: False.
Returns
-------
pandas Series, pandas DataFrame, list, or dict
Detected anomalies.
- If return_intermediate=False, return detected anomalies, i.e.
result from last detector.
- If return_intermediate=True, return results of all models in
pipenet as a dict where each item represents the result of a
model.
- If return_list=False, result from a detector or an aggregator
will be a binary pandas Series indicating normal/anomalous.
- If return_list=True, result from a detector or an aggregator
will be a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
"""
return self._predict(
ts,
fit=False,
detect=True,
skip_fit=None,
return_intermediate=return_intermediate,
return_list=return_list,
)
[docs] def fit_detect(
self,
ts: Union[pd.Series, pd.DataFrame],
skip_fit: Optional[List[str]] = None,
return_intermediate: bool = False,
return_list: bool = False,
) -> Union[
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
Dict[
str,
Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[
Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]
],
],
],
],
]:
"""Train models in the pipenet and detect anomaly with it.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
skip_fit: list, optional
Models to skip training. This could be used when pipenet contains
models that are already trained by the same time series, and re-
training would be time consuming. It must be a list of strings
where each element is a model name. Default: None.
return_intermediate: bool, optional
Whether to return intermediate results. Default: False.
return_list: bool, optional
Whether to return a list of anomalous events, or a binary series
indicating normal/anomalous. Default: False.
Returns
-------
pandas Series, pandas DataFrame, list, or dict
Detected anomalies.
- If return_intermediate=False, return detected anomalies, i.e.
result from last detector.
- If return_intermediate=True, return results of all models in
pipenet as a dict where each item represents the result of a
model.
- If return_list=False, result from a detector or an aggregator
will be a binary pandas Series indicating normal/anomalous.
- If return_list=True, result from a detector or an aggregator
will be a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
"""
return self._predict(
ts,
fit=True,
detect=True,
skip_fit=skip_fit,
return_intermediate=return_intermediate,
return_list=return_list,
)
[docs] def score(
self,
ts: Union[pd.Series, pd.DataFrame],
anomaly_true: Union[
pd.Series,
pd.DataFrame,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
Dict[
str,
List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]],
],
],
scoring: str = "recall",
**kwargs: Any
) -> Union[float, Dict[str, float]]:
"""Detect anomalies and score the results against true anomalies.
Parameters
----------
ts: pandas Series or DataFrame
Time series to detect anomalies from.
anomaly_true: Series, or a list of Timestamps or Timestamp tuple
True anomalies.
- If pandas Series, it is treated as a series of binary labels.
- If list, a list of events where an event is a pandas Timestamp if
it is instantaneous or a 2-tuple of pandas Timestamps if it is a
closed time interval.
scoring: str, optional
Scoring function to use. Must be one of "recall", "precision",
"f1", and "iou". See module `metrics` for more information.
Default: "recall"
**kwargs
Optional parameters for scoring function. See module `metrics` for
more information.
Returns
-------
float
Score of detection result.
"""
if scoring == "recall":
scoring_func = recall # type: Callable
elif scoring == "precision":
scoring_func = precision
elif scoring == "f1":
scoring_func = f1_score
elif scoring == "iou":
scoring_func = iou
else:
raise ValueError(
"Argument `scoring` must be one of 'recall', 'precision', "
"'f1' and 'iou'."
)
if isinstance(anomaly_true, pd.Series):
return scoring_func(
y_true=anomaly_true,
y_pred=self.detect(ts, return_list=False),
**kwargs
)
else:
return scoring_func(
y_true=anomaly_true,
y_pred=self.detect(ts, return_list=True),
**kwargs
)
[docs] def get_params(self) -> Dict[str, Dict[str, Any]]:
"""Get parameters of models in pipenet.
Returns
-------
dict
A dictionary of model name and model parameters.
"""
return {
step_name: step["model"].get_params()
for step_name, step in self.steps.items()
}
[docs] def summary(self) -> None:
"""Print a summary of the pipenet."""
df = pd.DataFrame(columns=["name", "model", "input", "subset"])
for step_name in self.steps_graph_.keys():
if step_name == "original":
continue
df = df.append(
{
"name": step_name,
"model": self.steps[step_name]["model"].__class__.__name__,
"input": self.steps[step_name]["input"],
"subset": (
self.steps[step_name]["subset"]
if "subset" in self.steps[step_name].keys()
else "all"
),
},
ignore_index=True,
)
print(tabulate(df, headers="keys", tablefmt="simple", showindex=False))
[docs] def plot_flowchart(
self,
ax: Optional[plt.Axes] = None,
figsize: Optional[Tuple[float, float]] = None,
radius: float = 1.0,
) -> plt.Axes: # pragma: no cover
"""Plot flowchart of this pipenet.
Parameters
----------
ax: matplotlib axes object, optional
Axes to plot at. If not given, the method will create a matplotlib
figure and axes. Default: None.
figsize: tuple, optional
Width and height of the figure to plot at. Only to be used if `ax`
is not given. Default: None.
radius: float, optional
Relative size of components in the chart. Default: 1.0.
Returns
-------
matplotlib axes object
Axes where the flowchart is plotted.
"""
self._validate()
radius = radius * 0.1
# create empty plot
if (ax is None) and (figsize is None):
figsize = (14, 16)
if ax is None:
_, ax = plt.subplots(figsize=figsize)
# get coordinate of components
layers = [] # type: List
for graph_step, values in self.steps_graph_.items():
if values[1] == 0:
layers.append([])
layers[-1].append(graph_step)
n_layer = len(layers)
max_n_comp = 0
coord = dict()
for layer in layers:
n_comp = len(layer)
max_n_comp = max(max_n_comp, n_comp)
for comp in layer:
x = self.steps_graph_[comp][0]
y = self.steps_graph_[comp][1] - (n_comp - 1) / 2
coord[comp] = (x, y)
# plot connection lines, and gather patches
io_patches = [] # type: List
detector_patches = [] # type: List
transformer_patches = [] # type: List
aggregator_patches = [] # type: List
for step_name, step in self.steps.items():
end_coord = coord[step_name]
if isinstance(step["model"], _Detector):
detector_patches.append(Circle(xy=end_coord, radius=radius))
elif isinstance(step["model"], _Transformer):
transformer_patches.append(Circle(xy=end_coord, radius=radius))
elif isinstance(step["model"], _Aggregator):
aggregator_patches.append(Circle(xy=end_coord, radius=radius))
if isinstance(step["input"], str):
input = step["input"]
start_coord = coord[input]
if ("subset" not in step.keys()) or (step["subset"] == "all"):
linestyle = "-"
else:
linestyle = "--"
plt.plot(
[start_coord[0], end_coord[0]],
[start_coord[1], end_coord[1]],
color="k",
linestyle=linestyle,
zorder=1,
)
else:
for counter, input in enumerate(step["input"]):
start_coord = coord[input]
if (
("subset" not in step.keys())
or (step["subset"] == "all")
or (step["subset"][counter] == "all")
):
linestyle = "-"
else:
linestyle = "--"
plt.plot(
[start_coord[0], end_coord[0]],
[start_coord[1], end_coord[1]],
color="k",
linestyle=linestyle,
zorder=1,
)
plt.plot([n_layer - 1, n_layer], [0, 0], color="k", zorder=1)
io_patches.append(Circle(xy=(0, 0), radius=radius))
io_patches.append(Circle(xy=(n_layer, 0), radius=radius))
# draw components
ax.add_collection(PatchCollection(io_patches, zorder=2, color="y"))
ax.add_collection(
PatchCollection(detector_patches, zorder=2, color="g")
)
ax.add_collection(
PatchCollection(transformer_patches, zorder=2, color="c")
)
ax.add_collection(
PatchCollection(aggregator_patches, zorder=2, color="m")
)
# label components with step names
for key, value in coord.items():
ax.annotate(
key, (value[0] - radius, value[1]), zorder=3, color="k"
)
ax.annotate("result", (n_layer - radius, 0), zorder=3, color="k")
# add legend
plt.legend(
handles=[
Line2D([], [], color="k", label="full connection"),
Line2D(
[],
[],
color="k",
linestyle="--",
label="partial connection",
),
Circle([], radius=radius, color="y", label="input/output"),
Circle([], radius=radius, color="g", label="detector"),
Circle([], radius=radius, color="c", label="transformer"),
Circle([], radius=radius, color="m", label="aggregator"),
],
bbox_to_anchor=(1, 1),
loc=2,
borderaxespad=0.0,
)
# clean up axes
ax.set_xlim([-radius - 0.1, n_layer + radius + 0.1])
ax.set_ylim(
[
-(max_n_comp - 1) / 2 - radius - 0.1,
(max_n_comp - 1) / 2 + radius + 0.1,
]
)
ax.set_axis_off()
ax.set_aspect(1)
return ax