Pipeline and Pipenet

Module of model pipeline and pipenet.

Pipeline or Pipenet connects multiple components (transformers, detectors, and/or aggregators) into a model that may perform complex anomaly detection process.

class adtk.pipe.Pipeline(steps)[source]

A Pipeline object chains transformers and a detector sequentially.

Parameters

steps (list of 2-tuples (str, object)) – Components of this pipeline. Each 2-tuple represents a step in the pipeline (step name, model object).

Return type

None

Examples

>>> steps = [('moving average', RollingAggregate(agg='mean', window=10)),
             ('filter quantile 0.99', QuantileAD(high=0.99))]
>>> myPipeline = Pipeline(steps)
fit(ts, skip_fit=None, return_intermediate=False)[source]

Train all models in the pipeline sequentially.

Parameters
  • ts (pandas Series or DataFrame) – Time series used to train models.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.

Return type

dict, optional

detect(ts, return_intermediate=False, return_list=False)[source]

Transform time series sequentially along pipeline, and detect anomalies with the last detector.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

  • return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.

Returns

Detected anomalies.

  • If return_intermediate=False, return detected anomalies, i.e. result from last detector.

  • If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.

  • If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.

  • If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

Return type

pandas Series, pandas DataFrame, list, or dict

transform(ts, return_intermediate=False)[source]

Transform time series sequentially along pipeline.

Parameters
  • ts (pandas Series or DataFrame) – Time series to be transformed

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

Transformed time series.

  • If return_intermediate=False, return transformed series, i.e. result from last transformer;

  • If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.

Return type

pandas Series, pandas DataFrame, or dict

fit_detect(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]

Train models in pipeline sequentially, transform time series along pipeline, and use the last detector to detect anomalies.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

  • return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.

Returns

Detected anomalies.

  • If return_intermediate=False, return detected anomalies, i.e. result from last detector.

  • If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.

  • If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.

  • If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

Return type

pandas Series, pandas DataFrame, list, or dict

fit_transform(ts, skip_fit=None, return_intermediate=False)[source]

Train models in pipeline sequentially, and transform time series along pipeline.

Parameters
  • ts (pandas Series or DataFrame) – Time series to be transformed.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipeline contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

Transformed time series.

  • If return_intermediate=False, return transformed series, i.e. result from last transformer;

  • If return_intermediate=True, return results of all models in pipeline as a dict where each item represents the result of a model.

Return type

pandas Series, pandas DataFrame, or dict

score(ts, anomaly_true, scoring='recall', **kwargs)[source]

Detect anomalies and score the results against true anomalies.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • anomaly_true (pandas Series or list) –

    True anomalies.

    • If pandas Series, it is treated as a series of binary labels.

    • If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

  • scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”

  • **kwargs – Optional parameters for scoring function. See module metrics for more information.

  • kwargs (Any) –

Returns

Score of detection result.

Return type

float

get_params()[source]

Get parameters of models in pipeline.

Returns

A dictionary of model name and model parameters.

Return type

dict

class adtk.pipe.Pipenet(steps=None)[source]

A Pipenet object connects transformers, detectors and aggregators.

Parameters

steps (dicts) –

Components of the pipenet. Each key-value item represents a step ( transformer, detector, or aggregator), where key is the unique name of the step and the value is a dict with the following key-value pairs:

  • input (str or list of str): Input to the model, which must be either ‘original’ (i.e. the input time series), or the name of a upstream component.

  • subset (str, list of str, or list of lists of str, optional): If a model does not use all series from an input component, use this field to specify which series should be included. If not given or “all”, all series from the input component will be used.

  • model (object): A detector, transformer, or aggregator object.

Return type

None

steps_graph_

Order of steps to be executed. Keys are step names, values are 2-tuple (i, j) where i is the index of execution round and j is the the index within a round.

Type

OrderedDict

final_step_

Name of the final step to be executed. It is the single step in the last round of execution in attribute steps_graph_.

Type

str

Examples

The following example show how to use a Pipenet to build a level shift detector with some basic transformers, detectors, and aggregator.

>>> from adtk.detector import QuantileAD, ThresholdAD
>>> from adtk.transformer import DoubleRollingAggregate
>>> from adtk.aggregator import AndAggregator
>>> from adtk.pipe import Pipenet
>>> steps = {
        "diff_abs": {
            "input": "original",
            "model": DoubleRollingAggregate(
                agg="median",
                window=20,
                center=True,
                diff="l1",
            ),
        },
        "quantile_ad": {
            "input": "diff_abs",
            "model": QuantileAD(high=0.99, low=0),
        },
        "diff": {
            "input": "original",
            "model": DoubleRollingAggregate(
                agg="median",
                window=20,
                center=True,
                diff="diff",
            ),
        },
        "sign_check": {
            "input": "diff",
            "model": ThresholdAD(high=0.0, low=-float("inf")),
        },
        "and": {
            "model": AndAggregator(),
            "input": ["quantile_ad", "sign_check"],
        },
    }
>>> myPipenet = Pipenet(steps)
fit(ts, skip_fit=None, return_intermediate=False)[source]

Train models in the pipenet.

Parameters
  • ts (pandas Series or DataFrame) – Time series used to train models.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

If return_intermediate=True, return intermediate results generated during training as a dictionary where keys are step names. If a step does not perform transformation or detection, the result of that step will be None.

Return type

dict, optional

detect(ts, return_intermediate=False, return_list=False)[source]

Detect anomaly from time series using the pipenet.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

  • return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.

Returns

Detected anomalies.

  • If return_intermediate=False, return detected anomalies, i.e. result from last detector.

  • If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.

  • If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.

  • If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

Return type

pandas Series, pandas DataFrame, list, or dict

transform(ts, return_intermediate=False)[source]

Transform time series using the pipenet.

Parameters
  • ts (pandas Series or DataFrame) – Time series to be transformed.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

Transformed time series.

  • If return_intermediate=False, return transformed series, i.e. result from last transformer;

  • If return_intermediate=True, return results of all models in pipnet as a dict where each item represents the result of a model.

Return type

pandas Series, pandas DataFrame, or dict

fit_detect(ts, skip_fit=None, return_intermediate=False, return_list=False)[source]

Train models in the pipenet and detect anomaly with it.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

  • return_list (bool, optional) – Whether to return a list of anomalous events, or a binary series indicating normal/anomalous. Default: False.

Returns

Detected anomalies.

  • If return_intermediate=False, return detected anomalies, i.e. result from last detector.

  • If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.

  • If return_list=False, result from a detector or an aggregator will be a binary pandas Series indicating normal/anomalous.

  • If return_list=True, result from a detector or an aggregator will be a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

Return type

pandas Series, pandas DataFrame, list, or dict

fit_transform(ts, skip_fit=None, return_intermediate=False)[source]

Train models in the pipenet and transform time series with it.

Parameters
  • ts (pandas Series or DataFrame) – Time series to be transformed.

  • skip_fit (list, optional) – Models to skip training. This could be used when pipenet contains models that are already trained by the same time series, and re- training would be time consuming. It must be a list of strings where each element is a model name. Default: None.

  • return_intermediate (bool, optional) – Whether to return intermediate results. Default: False.

Returns

Transformed time series.

  • If return_intermediate=False, return transformed series, i.e. result from last transformer;

  • If return_intermediate=True, return results of all models in pipenet as a dict where each item represents the result of a model.

Return type

pandas Series, pandas DataFrame, or dict

score(ts, anomaly_true, scoring='recall', **kwargs)[source]

Detect anomalies and score the results against true anomalies.

Parameters
  • ts (pandas Series or DataFrame) – Time series to detect anomalies from.

  • anomaly_true (Series, or a list of Timestamps or Timestamp tuple) –

    True anomalies.

    • If pandas Series, it is treated as a series of binary labels.

    • If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval.

  • scoring (str, optional) – Scoring function to use. Must be one of “recall”, “precision”, “f1”, and “iou”. See module metrics for more information. Default: “recall”

  • **kwargs – Optional parameters for scoring function. See module metrics for more information.

  • kwargs (Any) –

Returns

Score of detection result.

Return type

float

get_params()[source]

Get parameters of models in pipenet.

Returns

A dictionary of model name and model parameters.

Return type

dict

summary()[source]

Print a summary of the pipenet.

Return type

None

plot_flowchart(ax=None, figsize=None, radius=1.0)[source]

Plot flowchart of this pipenet.

Parameters
  • ax (matplotlib axes object, optional) – Axes to plot at. If not given, the method will create a matplotlib figure and axes. Default: None.

  • figsize (tuple, optional) – Width and height of the figure to plot at. Only to be used if ax is not given. Default: None.

  • radius (float, optional) – Relative size of components in the chart. Default: 1.0.

Returns

Axes where the flowchart is plotted.

Return type

matplotlib axes object