Pipeline and Pipenet¶
Module of model pipeline and pipenet.
Pipeline or Pipenet connects multiple components (transformers, detectors, and/or aggregators) into a model that may perform complex anomaly detection process.

class
adtk.pipe.
Pipeline
(steps)[source]¶ A Pipeline object chains transformers and a detector sequentially.
 Parameters
steps (list of 2tuples (str, object)) – Components of this pipeline. Each 2tuple represents a step in the pipeline (step name, model object).
 Return type
None
Examples
>>> steps = [('moving average', RollingAggregate(agg='mean', window=10)), ('filter quantile 0.99', QuantileAD(high=0.99))] >>> myPipeline = Pipeline(steps)

fit
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train all models in the pipeline sequentially.
 Parameters
ts (pandas Series or DataFrame) – Time series used to train models.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.
 Return type
dict, optional

detect
(ts, return_intermediate=False, return_list=False)[source]¶ Transform time series sequentially along pipeline, and detect anomalies with the last detector.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
 Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
 Return type
pandas Series, pandas DataFrame, list, or dict

transform
(ts, return_intermediate=False)[source]¶ Transform time series sequentially along pipeline.
 Parameters
ts (pandas Series or DataFrame) – Time series to be transformed
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
 Return type
pandas Series, pandas DataFrame, or dict

fit_detect
(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]¶ Train models in pipeline sequentially, transform time series along pipeline, and use the last detector to detect anomalies.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
 Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
 Return type
pandas Series, pandas DataFrame, list, or dict

fit_transform
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in pipeline sequentially, and transform time series along pipeline.
 Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.
 Return type
pandas Series, pandas DataFrame, or dict

score
(ts, anomaly_true, scoring='recall', **kwargs)[source]¶ Detect anomalies and score the results against true anomalies.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
anomaly_true (pandas Series or list) –
True anomalies.
If pandas Series, it is treated as a series of binary labels.
If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
**kwargs – Optional parameters for scoring function. See module metrics for more information.
kwargs (Any) –
 Returns
Score of detection result.
 Return type
float

class
adtk.pipe.
Pipenet
(steps=None)[source]¶ A Pipenet object connects transformers, detectors and aggregators.
 Parameters
steps (dicts) –
Components of the pipenet. Each keyvalue item represents a step ( transformer, detector, or aggregator), where key is the unique name of the step and the value is a dict with the following keyvalue pairs:
input (str or list of str): Input to the model, which must be either ‘original’ (i.e. the input time series), or the name of a upstream component.
subset (str, list of str, or list of lists of str, optional): If a model does not use all series from an input component, use this field to specify which series should be included. If not given or “all”, all series from the input component will be used.
model (object): A detector, transformer, or aggregator object.
 Return type
None

steps_graph_
¶ Order of steps to be executed. Keys are step names, values are 2tuple (i, j) where i is the index of execution round and j is the the index within a round.
 Type
OrderedDict

final_step_
¶ Name of the final step to be executed. It is the single step in the last round of execution in attribute steps_graph_.
 Type
str
Examples
The following example show how to use a Pipenet to build a level shift detector with some basic transformers, detectors, and aggregator.
>>> from adtk.detector import QuantileAD, ThresholdAD >>> from adtk.transformer import DoubleRollingAggregate >>> from adtk.aggregator import AndAggregator >>> from adtk.pipe import Pipenet >>> steps = { "diff_abs": { "input": "original", "model": DoubleRollingAggregate( agg="median", window=20, center=True, diff="l1", ), }, "quantile_ad": { "input": "diff_abs", "model": QuantileAD(high=0.99, low=0), }, "diff": { "input": "original", "model": DoubleRollingAggregate( agg="median", window=20, center=True, diff="diff", ), }, "sign_check": { "input": "diff", "model": ThresholdAD(high=0.0, low=float("inf")), }, "and": { "model": AndAggregator(), "input": ["quantile_ad", "sign_check"], }, } >>> myPipenet = Pipenet(steps)

fit
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in the pipenet.
 Parameters
ts (pandas Series or DataFrame) – Time series used to train models.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.
 Return type
dict, optional

detect
(ts, return_intermediate=False, return_list=False)[source]¶ Detect anomaly from time series using the pipenet.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
 Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
 Return type
pandas Series, pandas DataFrame, list, or dict

transform
(ts, return_intermediate=False)[source]¶ Transform time series using the pipenet.
 Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipnet as a dict where each item represents the result of a model.
 Return type
pandas Series, pandas DataFrame, or dict

fit_detect
(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]¶ Train models in the pipenet and detect anomaly with it.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.
 Returns
Detected anomalies.
If return_intermediate=False, return detected anomalies, i.e. result from last detector.
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.
If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
 Return type
pandas Series, pandas DataFrame, list, or dict

fit_transform
(ts, skip_fit=None, return_intermediate=False)[source]¶ Train models in the pipenet and transform time series with it.
 Parameters
ts (pandas Series or DataFrame) – Time series to be transformed.
skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
 Returns
Transformed time series.
If return_intermediate=False, return transformed series, i.e. result from last transformer;
If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.
 Return type
pandas Series, pandas DataFrame, or dict

score
(ts, anomaly_true, scoring='recall', **kwargs)[source]¶ Detect anomalies and score the results against true anomalies.
 Parameters
ts (pandas Series or DataFrame) – Time series to detect anomalies from.
anomaly_true (Series, or a list of Timestamps or Timestamp tuple) –
True anomalies.
If pandas Series, it is treated as a series of binary labels.
If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2tuple of pandas Timestamps if it is a closed time interval.
scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
**kwargs – Optional parameters for scoring function. See module metrics for more information.
kwargs (Any) –
 Returns
Score of detection result.
 Return type
float

get_params
()[source]¶ Get parameters of models in pipenet.
 Returns
A dictionary of model name and model parameters.
 Return type
dict

plot_flowchart
(ax=None, figsize=None, radius=1.0)[source]¶ Plot flowchart of this pipenet.
 Parameters
ax (matplotlib axes object, optional) – Axes to plot at. If not given, the method will create a matplotlib figure and axes. Default: None.
figsize (tuple, optional) – Width and height of the figure to plot at. Only to be used if ax is not given. Default: None.
radius (float, optional) – Relative size of components in the chart. Default: 1.0.
 Returns
Axes where the flowchart is plotted.
 Return type
matplotlib axes object