Pipeline and Pipenet

Module of model pipeline and pipenet.

Pipeline or Pipenet connects multiple components (transformers, detectors, and/or aggregators) into a model that may perform complex anomaly detection process.

class adtk.pipe.Pipeline(steps=None)[source]

A Pipeline object chains transformers and a detector sequentially.

Parameters:steps (list of 2-tuples) – Components of this pipeline. Each 2-tuple represents a step in the pipeline (step name, model object).

Examples

>>> steps = [('moving average', RollingAggregate(agg='mean', window=10)),
             ('filter quantile 0.99', QuantileAD(high=0.99))]
>>> myPipeline = Pipeline(steps)
fit(ts, skip_fit=None, return_intermediate=False)[source]

Train all models in the pipeline sequentially.

Parameters:
  • ts (pandas Series or DataFrame) – Time series used to train models.
  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.

Return type:

dict

detect(ts, return_intermediate=False, return_list=False)[source]

Transform time series sequentially along pipeline, and detect anomalies with the last detector.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
  • return_list (bool, optional) – Whether to return a list of anomalous time stamps, or a binary series indicating normal/anomalous. Default: False.
Returns:

If return_intermediate=False, return detected anomalies, i.e. result from last detector; If return_intermediate=True, return a dictionary of model results for all models in pipeline. If return_list=True, result from a detector or an aggregators will be a list of pandas Timestamps; If return_list=False, result from a detector or an aggregators will be a binary pandas Series indicating normal/anomalous.

Return type:

list, panda Series, or dict

transform(ts, return_intermediate=False)[source]

Transform time series sequentially along pipeline.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to be transformed
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=False, return transformed dataframe, i.e. result from last transformer; If return_intermediate=True, return a dictionary of model results for all models in pipeline.

Return type:

list or dict

fit_detect(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]

Train models in pipeline sequentially, transform time series along pipeline, and use the last detector to detect anomalies.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.
  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
  • return_list (bool, optional) – Whether to return a list of anomalous time stamps, or a binary series indicating normal/anomalous. Default: False.
Returns:

If return_intermediate=False, return detected anomalies, i.e. result from last detector; If return_intermediate=True, return a dictionary of model results for all models in pipeline. If return_list=True, result from a detector or an aggregators will be a list of pandas Timestamps; If return_list=False, result from a detector or an aggregators will be a binary pandas Series indicating normal/anomalous.

Return type:

list, panda Series, or dict

fit_transform(ts, skip_fit=None, return_intermediate=False)[source]

Train models in pipeline sequentially, and transform time series along pipeline.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to be transformed
  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=False, return transformed dataframe, i.e. result from last transformer; If return_intermediate=True, return a dictionary of model results for all models in pipeline.

Return type:

list or dict

score(ts, anomaly_true, scoring='recall', **kwargs)[source]

Detect anomalies and score the results against true anomalies.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from. If a DataFrame with k columns, k univariate detectors will be applied to them respectively.
  • anomaly_true (Series, or a list of Timestamps or Timestamp tuple) – True anomalies. If Series, it is a series binary labels indicating anomalous; If list, it is a list of anomalous events in form of time windows.
  • scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
  • **kwargs – Optional parameters for scoring function. See module metrics for more information.
Returns:

Score of detection result.

Return type:

float

get_params()[source]

Get parameters of models in pipeline.

Returns:A dictionary of model name and model parameters.
Return type:dict
class adtk.pipe.Pipenet(steps=None)[source]

A Pipenet object connects transformers, detectors and aggregators.

Parameters:steps (dicts) –

Components of the pipenet. Each key-value item represents a step ( transformer, detector, or aggregator), where key is the unique name of the step and the value is a dict with the following key-value pairs:

  • input (str or list of str): Input to the model, which must be either ‘original’ (i.e. the input time series), or the name of a upstream component.
  • subset (str, list of str, or list of lists of str, optional): If a model does not use all series from an input component, use this field to specify which series should be included. If not given or “all”, all series from the input component will be used.
  • model (object): A detector, transformer, or aggregator object.
steps_graph_

Order of steps to be executed. Keys are step names, values are 2-tuple (i, j) where i is the index of execution round and j is the the index within a round.

Type:OrderedDict
final_step_

Name of the final step to be executed. It is the single step in the last round of execution in attribute steps_graph_.

Type:str

Examples

The following example show how to use a Pipenet to build a level shift detector with some basic transformers, detectors, and aggregator.

>>> from adtk.detector import QuantileAD, ThresholdAD
>>> from adtk.transformer import DoubleRollingAggregate
>>> from adtk.aggregator import AndAggregator
>>> from adtk.pipe import Pipenet
>>> steps = {
        "diff_abs": {
            "input": "original",
            "model": DoubleRollingAggregate(
                agg="median",
                window=20,
                center=True,
                diff="l1",
            ),
        },
        "quantile_ad": {
            "input": "diff_abs",
            "model": QuantileAD(high=0.99, low=0),
        },
        "diff": {
            "input": "original",
            "model": DoubleRollingAggregate(
                agg="median",
                window=20,
                center=True,
                diff="diff",
            ),
        },
        "sign_check": {
            "input": "diff",
            "model": ThresholdAD(high=0.0, low=-float("inf")),
        },
        "and": {
            "model": AndAggregator(),
            "input": ["quantile_ad", "sign_check"],
        },
    }
>>> myPipenet = Pipenet(steps)
fit(ts, skip_fit=None, return_intermediate=False)[source]

Train models in the pipenet.

Parameters:
  • ts (pandas Series or DataFrame) – Time series used to train models.
  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.

Return type:

dict

detect(ts, return_intermediate=False, return_list=False)[source]

Detect anomaly from time series using the pipenet.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
  • return_list (bool, optional) – Whether to return a list of anomalous time stamps, or a binary series indicating normal/anomalous. Default: False.
Returns:

If return_intermediate=False, return detected anomalies, i.e. result from last detector; If return_intermediate=True, return a dictionary of model results for all models in pipenet. If return_list=True, result from a detector or an aggregators will be a list of pandas Timestamps; If return_list=False, result from a detector or an aggregators will be a binary pandas Series indicating normal/anomalous.

Return type:

list, panda Series, or dict

transform(ts, return_intermediate=False)[source]

Transform time series using the pipenet.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to be transformed.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=False, return transformed dataframe, i.e. result from last transformer; If return_intermediate=True, return a dictionary of model results for all models in pipenet.

Return type:

list or dict

fit_detect(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]

Train models in the pipenet and detect anomaly with it.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.
  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
  • return_list (bool, optional) – Whether to return a list of anomalous time stamps, or a binary series indicating normal/anomalous. Default: False.
Returns:

If return_intermediate=False, return detected anomalies, i.e. result from last detector; If return_intermediate=True, return a dictionary of model results for all models in pipenet. If return_list=True, result from a detector or an aggregators will be a list of pandas Timestamps; If return_list=False, result from a detector or an aggregators will be a binary pandas Series indicating normal/anomalous.

Return type:

list, panda Series, or dict

fit_transform(ts, skip_fit=None, return_intermediate=False)[source]

Train models in the pipenet and transform time series with it.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to be transformed.
  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re-training would be time consuming. It must be a list of strings where each element is a model name. Default: None.
  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.
Returns:

If return_intermediate=False, return transformed dataframe, i.e. result from last transformer; If return_intermediate=True, return a dictionary of model results for all models in pipenet.

Return type:

list or dict

score(ts, anomaly_true, scoring='recall', **kwargs)[source]

Detect anomalies and score the results against true anomalies.

Parameters:
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from. If a DataFrame with k columns, k univariate detectors will be applied to them respectively.
  • anomaly_true (Series, or a list of Timestamps or Timestamp tuple) – True anomalies. If Series, it is a series binary labels indicating anomalous; If list, it is a list of anomalous events in form of time windows.
  • scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”
  • **kwargs – Optional parameters for scoring function. See module metrics for more information.
Returns:

Score of detection result.

Return type:

float

get_params()[source]

Get parameters of models in pipenet.

Returns:A dictionary of model name and model parameters.
Return type:dict
summary()[source]

Print a summary of the pipenet.

plot_flowchart(ax=None, figsize=None, radius=1.0)[source]

Plot flowchart of this pipenet.

Parameters:
  • ax (matplotlib axes object, optional) – Axes to plot at. If not given, the method will create a matplotlib figure and axes. Default: None.
  • figsize (tuple, optional) – Width and height of the figure to plot at. Only to be used if ax is not given. Default: None.
  • radius (float, optional) – Relative size of components in the chart. Default: 1.0.
Returns:

Axes where the flowchart is plotted.

Return type:

matplotlib axes object