Pipeline and Pipenet¶
Module of model pipeline and pipenet.
Pipeline or Pipenet connects multiple components (transformers, detectors, and/or aggregators) into a model that may perform complex anomaly detection process.
-
class
adtk.pipe.
Pipeline
(steps)[source]¶ A Pipeline object chains transformers and a detector sequentially.
- Parameters
steps (list of 2-tuples (str, object)) – Components of this pipeline. Each 2-tuple represents a step in the pipeline (step name, model object).
- Return type
None
Examples
>>> steps = [('moving average', RollingAggregate(agg='mean', window=10)), ('filter quantile 0.99', QuantileAD(high=0.99))] >>> myPipeline = Pipeline(steps)
-
fit
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train all models in the pipeline sequentially.
- Parameters
ts (pandas Series or DataFrame) – Time series used to train models.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.
- Return type
dict, optional
-
detect
(ts, return_intermediate=False, return_list=False)[source]¶ Transform time series sequentially along pipeline, and detect anomalies with the last detector.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
- Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
- Return type
pandas Series, pandas DataFrame, list, or dict
-
transform
(ts, return_intermediate=False)[source]¶ Transform time series sequentially along pipeline.
- Parameters
ts (pandas Series or DataFrame) – Time series to be transformed
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
- Return type
pandas Series, pandas DataFrame, or dict
-
fit_detect
(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]¶ Train models in pipeline sequentially, transform time series along pipeline, and use the last detector to detect anomalies.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
- Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
- Return type
pandas Series, pandas DataFrame, list, or dict
-
fit_transform
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in pipeline sequentially, and transform time series along pipeline.
- Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
- Return type
pandas Series, pandas DataFrame, or dict
-
score
(ts, anomaly_true, scoring='recall', **kwargs)[source]¶ Detect anomalies and score the results against true anomalies.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
anomaly_true (pandas Series or list) –
True anomalies.
If pandas Series, it is treated as a series of binary labels.
If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
**kwargs – Optional parameters for scoring function. See module metrics for more information.
kwargs (Any) –
- Returns
Score of detection result.
- Return type
float
-
class
adtk.pipe.
Pipenet
(steps=None)[source]¶ A Pipenet object connects transformers, detectors and aggregators.
- Parameters
steps (dicts) –
Components of the pipenet. Each key-value item represents a step ( transformer, detector, or aggregator), where key is the unique name of the step and the value is a dict with the following key-value pairs:
input (str or list of str): Input to the model, which must be either ‘original’ (i.e. the input time series), or the name of a upstream component.
subset (str, list of str, or list of lists of str, optional): If a model does not use all series from an input component, use this field to specify which series should be included. If not given or “all”, all series from the input component will be used.
model (object): A detector, transformer, or aggregator object.
- Return type
None
-
steps_graph_
¶ Order of steps to be executed. Keys are step names, values are 2-tuple (i, j) where i is the index of execution round and j is the the index within a round.
- Type
OrderedDict
-
final_step_
¶ Name of the final step to be executed. It is the single step in the last round of execution in attribute steps_graph_.
- Type
str
Examples
The following example show how to use a Pipenet to build a level shift detector with some basic transformers, detectors, and aggregator.
>>> from adtk.detector import QuantileAD, ThresholdAD >>> from adtk.transformer import DoubleRollingAggregate >>> from adtk.aggregator import AndAggregator >>> from adtk.pipe import Pipenet >>> steps = { "diff_abs": { "input": "original", "model": DoubleRollingAggregate( agg="median", window=20, center=True, diff="l1", ), }, "quantile_ad": { "input": "diff_abs", "model": QuantileAD(high=0.99, low=0), }, "diff": { "input": "original", "model": DoubleRollingAggregate( agg="median", window=20, center=True, diff="diff", ), }, "sign_check": { "input": "diff", "model": ThresholdAD(high=0.0, low=-float("inf")), }, "and": { "model": AndAggregator(), "input": ["quantile_ad", "sign_check"], }, } >>> myPipenet = Pipenet(steps)
-
fit
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in the pipenet.
- Parameters
ts (pandas Series or DataFrame) – Time series used to train models.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.
- Return type
dict, optional
-
detect
(ts, return_intermediate=False, return_list=False)[source]¶ Detect anomaly from time series using the pipenet.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
- Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
- Return type
pandas Series, pandas DataFrame, list, or dict
-
transform
(ts, return_intermediate=False)[source]¶ Transform time series using the pipenet.
- Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipnet as a dict where each item represents the result of a model.
- Return type
pandas Series, pandas DataFrame, or dict
-
fit_detect
(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]¶ Train models in the pipenet and detect anomaly with it.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
- Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
- Return type
pandas Series, pandas DataFrame, list, or dict
-
fit_transform
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in the pipenet and transform time series with it.
- Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
- Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
- Return type
pandas Series, pandas DataFrame, or dict
-
score
(ts, anomaly_true, scoring='recall', **kwargs)[source]¶ Detect anomalies and score the results against true anomalies.
- Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
anomaly_true (Series, or a list of Timestamps or Timestamp tuple) –
True anomalies.
If pandas Series, it is treated as a series of binary labels.
If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.
scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
**kwargs – Optional parameters for scoring function. See module metrics for more information.
kwargs (Any) –
- Returns
Score of detection result.
- Return type
float
-
get_params
()[source]¶ Get parameters of models in pipenet.
- Returns
A dictionary of model name and model parameters.
- Return type
dict
-
plot_flowchart
(ax=None, figsize=None, radius=1.0)[source]¶ Plot flowchart of this pipenet.
- Parameters
ax (matplotlib axes object, optional) – Axes to plot at. If not given, the method will create a matplotlib figure and axes. Default: None.
figsize (tuple, optional) – Width and height of the figure to plot at. Only to be used if ax is not given. Default: None.
radius (float, optional) – Relative size of components in the chart. Default: 1.0.
- Returns
Axes where the flowchart is plotted.
- Return type
matplotlib axes object