# Detector¶

## ThresholdAD¶

`ThresholdAD`

compares each time series value with given thresholds.

In the following example, we detect time points when temperature is above 30C or below 15C.

```
[1]:
```

```
import pandas as pd
s = pd.read_csv('./data/temperature.csv', index_col="Time", parse_dates=True, squeeze=True)
from adtk.data import validate_series
s = validate_series(s)
```

```
[2]:
```

```
from adtk.detector import ThresholdAD
threshold_ad = ThresholdAD(high=30, low=15)
anomalies = threshold_ad.fit_detect(s)
```

```
[3]:
```

```
from adtk.visualization import plot
plot(s, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_markersize=5, ap_color='red', ap_marker_on_curve=True);
```

## QuantileAD¶

`QuantileAD`

compares each time series value with historical quantiles.

In the following example, we detect time points when temperature is above 99% percentile or below 1% percentile.

```
[4]:
```

```
from adtk.detector import QuantileAD
quantile_ad = QuantileAD(high=0.99, low=0.01)
anomalies = quantile_ad.fit_detect(s)
```

```
[5]:
```

```
plot(s, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_markersize=5, ap_color='red', ap_marker_on_curve=True);
```

## InterQuartileRangeAD¶

`InterQuartileRangeAD`

is another widely used detector based on simple historical statistics is based on interquartile range (IQR). When a value is out of the range defined by \([Q_1 - c \times IQR,\ Q_3 + c \times IQR]\) where \(IQR = Q_3 - Q_1\) is the difference between 25% and 75% quantiles. This detector is usually preferred to QuantileAD in the case where only a tiny portion or even none of training data is anomalous.

```
[6]:
```

```
from adtk.detector import InterQuartileRangeAD
iqr_ad = InterQuartileRangeAD(c=1.5)
anomalies = iqr_ad.fit_detect(s)
```

```
[7]:
```

```
plot(s, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_markersize=5, ap_color='red', ap_marker_on_curve=True);
```

## GeneralizedESDTestAD¶

`GeneralizedESDTestAD`

detects anomaly based on generalized extreme Studentized deviate (ESD) test.

Please note a key assumption of generalized ESD test is that normal values follow an approximately normal distribution. Please only use this detector when this assumption holds.

```
[8]:
```

```
from adtk.detector import GeneralizedESDTestAD
esd_ad = GeneralizedESDTestAD(alpha=0.3)
anomalies = esd_ad.fit_detect(s)
```

```
[9]:
```

```
plot(s, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_markersize=5, ap_color='red', ap_marker_on_curve=True);
```

## PersistAD¶

`PersistAD`

compares each time series value with its previous values. Internally, it is implemented as a pipenet with transformer DoubleRollingAggregate.

In the following example, we detect anomalous positive changes of price.

```
[10]:
```

```
s = pd.read_csv('./data/price_short.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[11]:
```

```
from adtk.detector import PersistAD
persist_ad = PersistAD(c=3.0, side='positive')
anomalies = persist_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red');
```

By default, `PersistAD`

only check one previous value, which is good at capturing additive anomaly in short-term scale, but not in long-term scale because it is too near-sighted.

In the following example, it fails to capture meaningful drops of price in a longer time scale.

```
[12]:
```

```
s = pd.read_csv('./data/price_long.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[13]:
```

```
persist_ad = PersistAD(c=1.5, side='negative')
anomalies = persist_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ts_markersize=3, ap_color='red');
```

We may change the parameter `window`

to a number greater than 1, and the detector will compare a value to the median or mean of its preceding time window. This will capture anomalous changes in mid- to long-term scale, because it is less near-sighted. In the same example as above, it detects drops of price in the long-term scale successfully.

```
[14]:
```

```
persist_ad.window = 24
anomalies = persist_ad.fit_detect(s)
```

```
[15]:
```

```
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ts_markersize=3, ap_color='red');
```

## LevelShiftAD¶

`LevelShiftAD`

detects shift of value level by tracking the difference between median values at two sliding time windows next to each other. It is not sensitive to instantaneous spikes and could be a good choice if noisy outliers happen frequently. Internally, it is implemented as a pipenet with transformer DoubleRollingAggregate.

In the following example, we detect shift point of CPU usage.

```
[16]:
```

```
s = pd.read_csv('./data/cpu.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[17]:
```

```
from adtk.detector import LevelShiftAD
level_shift_ad = LevelShiftAD(c=6.0, side='both', window=5)
anomalies = level_shift_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ts_markersize=3, ap_color='red');
```

## VolatilityShiftAD¶

`VolatilityShiftAD`

detects shift of volatility level by tracking the difference between standard deviations at two sliding time windows next to each other. Internally, it is implemented as a pipenet with transformer DoubleRollingAggregate.

In the following example, we detect positive shift of volatility of seismic amplitude which indicates start of earthquake.

```
[18]:
```

```
s = pd.read_csv('./data/seismic.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[19]:
```

```
from adtk.detector import VolatilityShiftAD
volatility_shift_ad = VolatilityShiftAD(c=6.0, side='positive', window=30)
anomalies = volatility_shift_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ts_markersize=3, ap_color='red');
```

## SeasonalAD¶

`SeasonalAD`

detects anomalous violations of seasonal pattern. Internally, it is implemented as a pipenet with transformer ClassicSeasonalDecomposition.

In the following example, we detect unusual traffic, which mostly happened on major holidays.

```
[20]:
```

```
s = pd.read_csv('./data/seasonal.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
plot(s, ts_linewidth=1);
```

```
[21]:
```

```
from adtk.detector import SeasonalAD
seasonal_ad = SeasonalAD(c=3.0, side="both")
anomalies = seasonal_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ap_color='red', ap_marker_on_curve=True);
```

## AutoregressionAD¶

`AutoregressionAD`

detects anomalous changes of autoregressive behavior in time series. Internally, it is implemented as a pipenet with transformers Retrospect and RegressionResidual.

For the same example as above, we detect violation of usual regressive behavior in traffic history.

```
[22]:
```

```
s = pd.read_csv('./data/seasonal.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[23]:
```

```
from adtk.detector import AutoregressionAD
autoregression_ad = AutoregressionAD(n_steps=7*2, step_size=24, c=3.0)
anomalies = autoregression_ad.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ap_color='red', ap_marker_on_curve=True);
```

## MinClusterDetector¶

`MinClusterDetector`

treats multivariate time series as independent points in a high-dimensional space, divides them into clusters, and identifies values in the smallest cluster as anomalous. This may help capturing outliers in high-dimensional space.

In the following example, we detect anomalous changes of relationship between the speed of a generator and power generated. The violation of regular relationship indicates equipment failure.

```
[24]:
```

```
df = pd.read_csv('./data/generator.csv', index_col="Time", parse_dates=True, squeeze=True)
df = validate_series(df)
```

```
[25]:
```

```
from adtk.detector import MinClusterDetector
from sklearn.cluster import KMeans
min_cluster_detector = MinClusterDetector(KMeans(n_clusters=3))
anomalies = min_cluster_detector.fit_detect(df)
plot(df, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red', ap_alpha=0.3, curve_group='all');
```

## OutlierDetector¶

`OutlierDetector`

performs multivariate time-independent outlier detection and identifies outliers as anomalies. The multivariate outlier detection algorithm could be those in scikit-learn or other packages following same API.

For the same example as above, we apply `OutlierDetector`

with a scikit-learn local outlier factor model.

```
[26]:
```

```
from adtk.detector import OutlierDetector
from sklearn.neighbors import LocalOutlierFactor
outlier_detector = OutlierDetector(LocalOutlierFactor(contamination=0.05))
anomalies = outlier_detector.fit_detect(df)
plot(df, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red', ap_alpha=0.3, curve_group='all');
```

## RegressionAD¶

`RegressionAD`

detects anomalous violation of usual relationship between multivariate series by tracking regressive error. Internally, it is implemented as a pipenet with transformer RegressionResidual.

For the same example as above, we apply `RegressionAD`

with linear regression model.

```
[27]:
```

```
from adtk.detector import RegressionAD
from sklearn.linear_model import LinearRegression
regression_ad = RegressionAD(regressor=LinearRegression(), target="Speed (kRPM)", c=3.0)
anomalies = regression_ad.fit_detect(df)
plot(df, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red', ap_alpha=0.3, curve_group='all');
```

## PcaAD¶

`PcaAD`

performs principal component analysis (PCA) to multivariate time series (every time point as a vector in a high-dimensional space) and tracks reconstruction error of those vectors. This detector may be helpful when normal points are supposed to be at a lower-rank manifold while anomalous points are not. Internally, it is implemented as a pipeline with transformer PcaReconstructionError.

We apply `PcaAD`

to the same example as above.

```
[28]:
```

```
from adtk.detector import PcaAD
pca_ad = PcaAD(k=1)
anomalies = pca_ad.fit_detect(df)
plot(df, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red', ap_alpha=0.3, curve_group='all');
```

## CustomizedDetector¶

`CustomizedDetector1D`

and `CustomizedDetectorHD`

help a user convert a function into a customized detector object that could be used (by a `Pipeline`

object, for example) as other detector object.

In the following example, we detect when the power generated (in kW) is less than 1.2 times generator speed (in kRPM).

```
[29]:
```

```
df = pd.read_csv('./data/generator.csv', index_col="Time", parse_dates=True)
df = validate_series(df)
df.head()
```

```
[29]:
```

Speed (kRPM) | Power (kW) | |
---|---|---|

Time | ||

2017-05-02 17:08:37 | 6.066579 | 10.308257 |

2017-05-02 18:08:37 | 6.035764 | 9.186763 |

2017-05-02 19:08:37 | 5.922730 | 10.128382 |

2017-05-02 20:08:37 | 5.999581 | 10.290300 |

2017-05-02 21:08:37 | 6.031067 | 8.910037 |

```
[30]:
```

```
def myDetectionFunc(df):
return (df["Speed (kRPM)"] * 1.2 > df["Power (kW)"])
```

```
[31]:
```

```
from adtk.detector import CustomizedDetectorHD
customized_detector = CustomizedDetectorHD(detect_func=myDetectionFunc)
anomalies = customized_detector.detect(df)
plot(df, anomaly_pred=anomalies, ts_linewidth=2, ts_markersize=3, ap_color='red', ap_alpha=0.3, curve_group='all');
```

# Transformer¶

## RollingAggregate¶

`RollingAggregate`

rolls a sliding window along a time series and aggregates using a selected operation. Common examples including moving average, rolling standard deviation, etc.

In the following example, we track the rolling count of valid values along a time series, which may help raise alarm when missing values occur frequently.

```
[32]:
```

```
s = pd.read_csv('./data/pressure.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
plot(s, ts_linewidth=2, ts_markersize=3);
```

```
[33]:
```

```
from adtk.transformer import RollingAggregate
s_transformed = RollingAggregate(agg='count', window=5).transform(s)
plot(s_transformed.rename("Rolling count of valid values"), ts_linewidth=2, ts_markersize=3);
```

## DoubleRollingAggregate¶

`DoubleRollingAggregate`

rolls two sliding windows side-by-side along a time series, aggregates using a selected operation, and tracks difference of the aggregated metrics between the two windows. This may help tracking changes of statistical behavior in a time series.

In the following example, we track the change of statistical distribution of series values.

```
[34]:
```

```
s = pd.read_csv('./data/seismic.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
from adtk.transformer import DoubleRollingAggregate
s_transformed = DoubleRollingAggregate(
agg="quantile",
agg_params={"q": [0.1, 0.5, 0.9]},
window=50,
diff="l2").transform(s).rename("Diff rolling quantiles (mm)")
plot(pd.concat([s, s_transformed], axis=1), ts_linewidth=1, ts_markersize=3);
```

In the following example, we track the shift of value level.

```
[35]:
```

```
s = pd.read_csv('./data/cpu.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[36]:
```

```
from adtk.transformer import DoubleRollingAggregate
s_transformed = DoubleRollingAggregate(
agg="median",
window=5,
diff="diff").transform(s).rename("Diff rolling median (mm)")
plot(pd.concat([s, s_transformed], axis=1), ts_linewidth=1, ts_markersize=3);
```

## ClassicSeasonalDecomposition¶

`ClassicSeasonalDecomposition`

decomposes a series into trend part (optional), seasonal part and residual part. The residual part may help identifying anomalous deviation from seasonal pattern.

In the following example, we calculate the deviation from usual traffic pattern along the series, which may help identify unusual traffic.

```
[37]:
```

```
s = pd.read_csv('./data/seasonal.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[38]:
```

```
from adtk.transformer import ClassicSeasonalDecomposition
s_transformed = ClassicSeasonalDecomposition().fit_transform(s).rename("Seasonal decomposition residual")
plot(pd.concat([s, s_transformed], axis=1), ts_linewidth=1);
```

`ClassicSeasonalDecomposition`

without extracting trend could not handle cases where long-term trend is mixed with seasonal pattern as well as noise. In those cases, option `trend`

should be turned on.

In the following example, `ClassicSeasonalDecomposition`

fails to decompose residual series and long-term trend from a synthetic series without `trend`

option turned on.

```
[39]:
```

```
s = pd.read_csv('./data/seasonal+trend.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[40]:
```

```
s_transformed = ClassicSeasonalDecomposition(freq=7).fit_transform(s).rename("Seasonal decomposition residual and trend")
plot(pd.concat([s, s_transformed], axis=1), ts_linewidth=2, ts_markersize=3);
```

We turn on the `trend`

option of the model and re-apply to the same example as above, where residual is separated from trend.

```
[41]:
```

```
s_transformed = ClassicSeasonalDecomposition(freq=7, trend=True).fit_transform(s).rename("Seasonal decomposition residual")
plot(pd.concat([s, s_transformed], axis=1), ts_linewidth=2, ts_markersize=3);
```

## Retrospect¶

`Retrospect`

returns data frame with retrospective values, i.e. a row at time t includes value at (t-k)’s where k’s are specified by user. This transformer may be useful for cases where lagging effect should be taken into account.

In the following example, we create a retrospective data frame for a synthetic series.

```
[42]:
```

```
s = pd.read_csv('./data/sin.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
```

```
[43]:
```

```
from adtk.transformer import Retrospect
df = Retrospect(n_steps=4, step_size=20, till=50).transform(s)
plot(pd.concat([s, df], axis=1), curve_group="all", ts_linewidth=2);
```

## RegressionResidual¶

`RegressionResidual`

performs regression to a multivariate series and returns regressive residual. This may help identifying anomalous violation of usual relationship between series.

In the following example, we track deviation of series from usual correlation between generator speed and power.

```
[44]:
```

```
df = pd.read_csv('./data/generator.csv', index_col="Time", parse_dates=True)
df = validate_series(df)
```

```
[45]:
```

```
from adtk.transformer import RegressionResidual
s = RegressionResidual(regressor=LinearRegression(), target="Speed (kRPM)").fit_transform(df).rename("Regression residual (kRPM)")
plot(pd.concat([df, s], axis=1), ts_linewidth=2, ts_markersize=3, curve_group=[[0,1], [2]]);
```

## PcaProjection¶

`PcaProjection`

transforms a multivariate series into representation with first k principal components.

In the following example, we transform the 2-dimensional series used in the previous example into 1-dimension by only keeping its first principal coefficient.

```
[46]:
```

```
from adtk.transformer import PcaProjection
s = PcaProjection(k=1).fit_transform(df)
plot(pd.concat([df, s], axis=1), ts_linewidth=2, ts_markersize=3, curve_group=[[0,1], [2]]);
```

## PcaReconstruction¶

`PcaReconstruction`

projects a multivariate series onto the hyperplane spanned by its first k principal components.

In the following example, we project the 2-dimensional series used in previous example onto the line of its first principal component.

```
[47]:
```

```
from adtk.transformer import PcaReconstruction
df_transformed = PcaReconstruction(k=1).fit_transform(df).rename(columns={"Speed (kRPM)": "Speed reconstruction (kRPM)", "Power (kW)": "Power reconstruction (kW)"})
plot(pd.concat([df, df_transformed], axis=1), ts_linewidth=2, ts_markersize=3, curve_group=[[0,1], [2, 3]]);
```

## PcaReconstructionError¶

`PcaReconstructionError`

projects a multivariate series onto the hyperplane spanned by its first k principal components and returns the error that indicates the distance between the data point and the lower-rank subspace of the first k principal components.

In the following example, we project the 2-dimensional series used in previous example onto the line of its first principal component.

```
[48]:
```

```
from adtk.transformer import PcaReconstructionError
s = PcaReconstructionError(k=1).fit_transform(df).rename("PCA Reconstruction Error")
plot(pd.concat([df, s], axis=1), ts_linewidth=2, ts_markersize=3, curve_group=[[0,1], [2]]);
```

## CustomizedTransformer¶

Like a customized detector, a user may convert a function to a customized transformer with `CustomizedTransformer1D`

or `CustomizedTransformerHD`

, so that it can be used by a `Pipe`

object.

In the following example, we obtained a calculated series of generated power (in kW) divided by generator speed (in kRPM).

```
[49]:
```

```
df = pd.read_csv('./data/generator.csv', index_col="Time", parse_dates=True)
df = validate_series(df)
```

```
[50]:
```

```
def myTransformationFunc(df):
return (df["Power (kW)"] / df["Speed (kRPM)"]).rename("Power/Speed (kW/kRPM)")
```

```
[51]:
```

```
from adtk.transformer import CustomizedTransformerHD
customized_transformer = CustomizedTransformerHD(transform_func=myTransformationFunc)
s = customized_transformer.transform(df)
plot(pd.concat([df, s], axis=1), ts_linewidth=2, curve_group=([[0, 1], [2]]));
```

# Aggregator¶

## OrAggregator¶

`OrAggregator`

identifies a time point as anomalous as long as it is included in one of the input anomaly lists.

It could be applied to merge anomalies in the format of binary series.

```
[52]:
```

```
import numpy as np
anomaly = pd.DataFrame(
np.array(
[[False, True, True, False, False],
[False, False, True, True, False]]).T,
columns=["overcurrent", "overheating"],
index=pd.date_range(start="2019-12-7 14:00:00", periods=5, freq="min"))
anomaly
```

```
[52]:
```

overcurrent | overheating | |
---|---|---|

2019-12-07 14:00:00 | False | False |

2019-12-07 14:01:00 | True | False |

2019-12-07 14:02:00 | True | True |

2019-12-07 14:03:00 | False | True |

2019-12-07 14:04:00 | False | False |

```
[53]:
```

```
from adtk.aggregator import OrAggregator
OrAggregator().aggregate(anomaly)
```

```
[53]:
```

```
2019-12-07 14:00:00 False
2019-12-07 14:01:00 True
2019-12-07 14:02:00 True
2019-12-07 14:03:00 True
2019-12-07 14:04:00 False
Freq: T, dtype: bool
```

It could also be applied to merge anomalies in the format of event list.

```
[54]:
```

```
overcurrent = [
(pd.Timestamp("2019-12-7 14:00:30"), pd.Timestamp("2019-12-7 14:01:45")),
(pd.Timestamp("2019-12-7 14:02:00"), pd.Timestamp("2019-12-7 14:02:45")),
(pd.Timestamp("2019-12-7 14:03:15"), pd.Timestamp("2019-12-7 14:03:50")),
]
overheating = [
(pd.Timestamp("2019-12-7 14:01:10"), pd.Timestamp("2019-12-7 14:02:55")),
(pd.Timestamp("2019-12-7 14:03:30"), pd.Timestamp("2019-12-7 14:04:25")),
]
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overcurrent, ts_linewidth=0, legend=False, figsize=(16, 1));
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overheating, ts_linewidth=0, legend=False, figsize=(16, 1));
```

```
[55]:
```

```
overcurrent_or_overheating = OrAggregator().aggregate(
{"overcurrent": overcurrent,
"overheating": overheating})
display(overcurrent_or_overheating)
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overcurrent_or_overheating, ts_linewidth=0, legend=False, figsize=(16, 1));
```

```
[(Timestamp('2019-12-07 14:00:30'), Timestamp('2019-12-07 14:02:55')),
(Timestamp('2019-12-07 14:03:15'), Timestamp('2019-12-07 14:04:25'))]
```

## AndAggregator¶

`AndAggregator`

identifies a time point as anomalous only if it is included in all the input anomaly lists.

It could be applied to merge anomalies in the format of binary series.

```
[56]:
```

```
import numpy as np
anomaly = pd.DataFrame(
np.array(
[[False, True, True, False, False],
[False, False, True, True, False]]).T,
columns=["overcurrent", "overheating"],
index=pd.date_range(start="2019-12-7 14:00:00", periods=5, freq="min"))
anomaly
```

```
[56]:
```

overcurrent | overheating | |
---|---|---|

2019-12-07 14:00:00 | False | False |

2019-12-07 14:01:00 | True | False |

2019-12-07 14:02:00 | True | True |

2019-12-07 14:03:00 | False | True |

2019-12-07 14:04:00 | False | False |

```
[57]:
```

```
from adtk.aggregator import AndAggregator
AndAggregator().aggregate(anomaly)
```

```
[57]:
```

```
2019-12-07 14:00:00 False
2019-12-07 14:01:00 False
2019-12-07 14:02:00 True
2019-12-07 14:03:00 False
2019-12-07 14:04:00 False
Freq: T, dtype: bool
```

It could also be applied to merge anomalies in the format of event list.

```
[58]:
```

```
overcurrent = [
(pd.Timestamp("2019-12-7 14:00:30"), pd.Timestamp("2019-12-7 14:01:45")),
(pd.Timestamp("2019-12-7 14:02:00"), pd.Timestamp("2019-12-7 14:02:45")),
(pd.Timestamp("2019-12-7 14:03:15"), pd.Timestamp("2019-12-7 14:03:50")),
]
overheating = [
(pd.Timestamp("2019-12-7 14:01:10"), pd.Timestamp("2019-12-7 14:02:55")),
(pd.Timestamp("2019-12-7 14:03:30"), pd.Timestamp("2019-12-7 14:04:25")),
]
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overcurrent, ts_linewidth=0, legend=False, figsize=(16, 1));
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overheating, ts_linewidth=0, legend=False, figsize=(16, 1));
```

```
[59]:
```

```
overcurrent_or_overheating = AndAggregator().aggregate(
{"overcurrent": overcurrent,
"overheating": overheating})
display(overcurrent_or_overheating)
plot(pd.Series(0, index=pd.DatetimeIndex(["2019-12-7 14:00:00", "2019-12-7 14:05:00"])), overcurrent_or_overheating, ts_linewidth=0, legend=False, figsize=(16, 1));
```

```
[(Timestamp('2019-12-07 14:01:10'), Timestamp('2019-12-07 14:01:45')),
(Timestamp('2019-12-07 14:02:00'), Timestamp('2019-12-07 14:02:45')),
(Timestamp('2019-12-07 14:03:30'), Timestamp('2019-12-07 14:03:50'))]
```

# Pipe¶

## Pipeline¶

`Pipeline`

chains transformers and a detector sequentially.

In the following example, we connect a `ClassicSeasonalDecomposition`

transformer with a `QuantileAD`

detector, which aims at detecting anomalous deviation from regular seasonal pattern in time series.

```
[60]:
```

```
from adtk.pipe import Pipeline
steps = [
("deseasonal", ClassicSeasonalDecomposition()),
("quantile_ad", QuantileAD(high=0.995, low=0.005))
]
pipeline = Pipeline(steps)
```

```
[61]:
```

```
s = pd.read_csv('./data/seasonal.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
anomalies = pipeline.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ap_marker_on_curve=True, ap_color='red');
```

A user has the option to get intermediate series and anomaly lists by setting parameter `return_intermediate`

to True when calling fitting or predicting methods of a `Pipeline`

object.

```
[62]:
```

```
results = pipeline.fit_detect(s, return_intermediate=True)
plot(results["deseasonal"]);
```

## Pipenet¶

`Pipenet`

is a generalization of `Pipeline`

to non-sequential combination of model components. Some detectors in detector module are internally implemented by `Pipenet`

.

For example, a single-side level shift detector (`LevelShiftAD`

with parameter `side`

as “positive” or “negative”) is implemented as the following `Pipenet`

. A `DoubleRollingAggregate`

transformer with parameter `diff`

as “l1” is connected to a `InterQuartileRangeAD`

detector to detect level shift on both sides, while another `DoubleRollingAggregate`

transformer with parameter `diff`

as “diff” is connected to a `ThresholdAD`

detector with parameter `upper_thresh`

as 0 to
detect positive increase level change. Positive level shifts is then obtained with an `AndAggregator`

by selecting level shifts that are positive.

```
[63]:
```

```
from adtk.pipe import Pipenet
steps = {
"abs_level_change": {
"model": DoubleRollingAggregate(
agg="median",
window=10,
center=True,
diff="l1"
),
"input": "original"
},
"level_shift": {
"model": InterQuartileRangeAD(c=3.0),
"input": "abs_level_change"
},
"level_change": {
"model": DoubleRollingAggregate(
agg="median",
window=10,
center=True,
diff="diff"
),
"input": "original",
},
"positive_level_change": {
"model": ThresholdAD(high=0),
"input": "level_change"
},
"positive_level_shift": {
"model": AndAggregator(),
"input": ["level_shift", "positive_level_change"]
}
}
pipenet = Pipenet(steps)
```

The flowchart of a `Pipenet`

object can be visualized by method `plot_flowchart`

.

```
[64]:
```

```
pipenet.plot_flowchart()
```

Method `summary`

and `get_params`

can be applied to get information of the pipenet object.

```
[65]:
```

```
pipenet.summary()
```

```
name model input subset
--------------------- ---------------------- ---------------------------------------- --------
abs_level_change DoubleRollingAggregate original all
level_change DoubleRollingAggregate original all
level_shift InterQuartileRangeAD abs_level_change all
positive_level_change ThresholdAD level_change all
positive_level_shift AndAggregator ['level_shift', 'positive_level_change'] all
```

```
[66]:
```

```
pipenet.get_params()
```

```
[66]:
```

```
{'abs_level_change': {'agg': 'median',
'agg_params': None,
'window': 10,
'center': True,
'min_periods': None,
'diff': 'l1'},
'level_shift': {'c': 3.0},
'level_change': {'agg': 'median',
'agg_params': None,
'window': 10,
'center': True,
'min_periods': None,
'diff': 'diff'},
'positive_level_change': {'low': None, 'high': 0},
'positive_level_shift': {}}
```

```
[67]:
```

```
s = pd.read_csv('./data/cpu.csv', index_col="Time", parse_dates=True, squeeze=True)
s = validate_series(s)
anomalies = pipenet.fit_detect(s)
plot(s, anomaly_pred=anomalies, ts_linewidth=1, ts_markersize=3, ap_color='red');
```

Similar to `Pipeline`

, a user has the option to get intermediate series and anomaly lists by setting parameter `return_intermediate`

to True when calling fitting or predicting methods of a `Pipenet`

object.

```
[68]:
```

```
results = pipenet.fit_detect(s, return_intermediate=True)
plot(results["level_change"], ts_linewidth=1, ts_markersize=3);
```

# Metrics¶

Module `metrics`

contains four types of scoring function that measure the quality of a detection results against true anomalies.

All scoring functions support two modes: 1. If inputs are in the form of binary labels, every positive time point is considered as an independent anomalous event. This is equivalent to the definition of scoring functions for time-independent binary classification.

- If inputs are in the form of event lists, every anomaly time segment is considered as an event. The scoring function calculation will be based on the length of segment that appears in both detected list and true list.

For example, `recall`

, a.k.a. sensitivity, hit rate, or true positive rate (TPR), is the percentage of true anomalies that are detected successfully. In the following example, there are 13 data points that are known anomalies. In the detection results, 9 out of 13 are detected successfully. Therefore, the recall score is equal to 9/13.

```
[69]:
```

```
s_constant = pd.Series(0, pd.date_range(start='2017-1-1', periods=24, freq="1d"), name="Time Series")
label_true = pd.Series(
[0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0],
pd.date_range(start='2017-1-1', periods=24, freq="1d"),
)
label_pred = pd.Series(
[0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1],
pd.date_range(start='2017-1-1', periods=24, freq="1d"),
)
```

```
[70]:
```

```
plot(s_constant, ts_markersize=8,
anomaly_true=label_true, at_markersize=12, at_marker_on_curve=True,
anomaly_pred=label_pred, ap_markersize=12, ap_marker_on_curve=True);
```

```
[71]:
```

```
from adtk.metrics import recall
recall(label_true, label_pred)
```

```
[71]:
```

```
0.6923076923076923
```

In the following example, there are 6 anomalous events (including 3 instantaneous events). By default, an event is considered successfully detected if no less than 50% of the segment is covered in the detection result. In this example, the first 4 events are detected according to this setting, and the last 2 events are not detected. Therefore, the recall score is equal to 4/6.

```
[72]:
```

```
list_true = [
(pd.Timestamp("2017-01-03 00:00:00"), pd.Timestamp("2017-01-03 23:59:59.999999999")),
pd.Timestamp("2017-01-06 00:00:00"),
(pd.Timestamp("2017-01-08 00:00:00"), pd.Timestamp("2017-01-09 23:59:59.999999999")),
pd.Timestamp("2017-01-12 00:00:00"),
(pd.Timestamp("2017-01-14 00:00:00"), pd.Timestamp("2017-01-17 23:59:59.999999999")),
pd.Timestamp("2017-01-22 00:00:00"),
]
list_pred = [
(pd.Timestamp("2017-01-02 00:00:00"), pd.Timestamp("2017-01-06 23:59:59.999999999")),
(pd.Timestamp("2017-01-09 00:00:00"), pd.Timestamp("2017-01-09 23:59:59.999999999")),
pd.Timestamp("2017-01-12 00:00:00"),
pd.Timestamp("2017-01-15 00:00:00"),
(pd.Timestamp("2017-01-17 00:00:00"), pd.Timestamp("2017-01-18 23:59:59.999999999")),
pd.Timestamp("2017-01-21 00:00:00"),
(pd.Timestamp("2017-01-23 00:00:00"), pd.Timestamp("2017-01-23 23:59:59.999999999")),
]
```

```
[73]:
```

```
plot(s_constant, list_true, list_pred);
```

```
[74]:
```

```
recall(list_true, list_pred)
```

```
[74]:
```

```
0.6666666666666666
```

If we restrict the criteria of successful detection such that an event must have more than 90% time segment covered to be considered as detected successfully, the third event is then not a successful detection because only half of its segment are detected, and therefore recall score is equal to 3/6.

```
[75]:
```

```
recall(list_true, list_pred, thresh=0.9)
```

```
[75]:
```

```
0.5
```

Scoring function `precision`

is similar to `recall`

, except it calculates the percentage of predicted anomalies that are true anomalies. It is also called positive predictive value (PPV) in some context.

If a detector is too tolerant, `recall`

score will be high, while `precision`

will be low. If a detector is too strict, `recall`

score will be low, while `recall`

will be high.

`f1_score`

is a compromise between these two, where the harmonic means of recall and precision is returned.

Intersect over union (IoU) score `iou`

calculated the length ratio between time segments that are identified as anomalous in both lists and those identified by at least one of the two lists. In the previous example, the union between true anomalous segments and detected segments are 13 days, while the intersect is 3 days. Therefore the IoU score is 3/13.

```
[76]:
```

```
from adtk.metrics import iou
iou(list_true, list_pred)
```

```
[76]:
```

```
0.2307692307692289
```

# Data Processing¶

Module `data`

provides some functions for common time series processing operations.

## Validate time series¶

Function `validate_series`

checks some common critical issues that may cause problems if anomaly detection is performed to time series without fixing them. The function will automatically fix some of them, while it will raise errors when detect others.

Issues will be checked and automatically fixed include:

- Time index is not monotonically increasing;
- Time index contains duplicated time stamps (fix by keeping first values);
- (optional) Time index attribute
`freq`

is missed; - (optional) Time series include categorical (non-binary) label columns (to fix by converting categorical labels into binary indicators).

Issues will be checked and raise error include:

- Wrong type of time series object (must be pandas Series or DataFrame);
- Wrong type of time index object (must be pandas DatetimeIndex).

In the following example, we validate a series with duplicated time point, unsorted index, and missed frequency.

```
[77]:
```

```
df = pd.read_csv('./data/invalid_series.csv', index_col="time", parse_dates=True)
df
```

```
[77]:
```

value | category | |
---|---|---|

time | ||

2017-01-02 | 2.0 | even |

2017-01-01 | 1.0 | odd |

2017-01-03 | 3.0 | odd |

2017-01-03 | 3.5 | odd |

2017-01-06 | 6.0 | even |

2017-01-04 | 4.0 | even |

2017-01-05 | 5.0 | odd |

2017-01-07 | 7.0 | odd |

2017-01-04 | 4.5 | even |

2017-01-08 | 8.0 | even |

```
[78]:
```

```
from adtk.data import validate_series
validate_series(df)
```

```
[78]:
```

value | category | |
---|---|---|

time | ||

2017-01-01 | 1.0 | odd |

2017-01-02 | 2.0 | even |

2017-01-03 | 3.0 | odd |

2017-01-04 | 4.0 | even |

2017-01-05 | 5.0 | odd |

2017-01-06 | 6.0 | even |

2017-01-07 | 7.0 | odd |

2017-01-08 | 8.0 | even |

Optionally, the function may automatically switch categorical series into binary series.

```
[79]:
```

```
from adtk.data import validate_series
validate_series(df, check_categorical=True)
```

```
[79]:
```

value | category_even | category_odd | |
---|---|---|---|

time | |||

2017-01-01 | 1.0 | 0 | 1 |

2017-01-02 | 2.0 | 1 | 0 |

2017-01-03 | 3.0 | 0 | 1 |

2017-01-04 | 4.0 | 1 | 0 |

2017-01-05 | 5.0 | 0 | 1 |

2017-01-06 | 6.0 | 1 | 0 |

2017-01-07 | 7.0 | 0 | 1 |

2017-01-08 | 8.0 | 1 | 0 |

## Validate event list¶

Function `validate_events`

checks the validity of an event list (a list of time windows). Common issues includes invalid time window, overlapped or consecutive time windows, unsorted events, etc.

```
[80]:
```

```
event_list = [
(pd.Timestamp('2017-1-2 23:59:59.999999999'), pd.Timestamp('2017-1-1 00:00:00.000000000')),
(pd.Timestamp('2017-1-2 00:00:00.000000000'), pd.Timestamp('2017-1-3 23:59:59.999999999')),
(pd.Timestamp('2017-1-8 00:00:00.000000000'), pd.Timestamp('2017-1-9 23:59:59.999999999')),
pd.Timestamp('2017-1-5 00:00:00.000000000'),
(pd.Timestamp('2017-1-9 00:00:00.000000000'), pd.Timestamp('2017-1-10 23:59:59.999999999')),
(pd.Timestamp('2017-1-4 00:00:00.000000000'), pd.Timestamp('2017-1-5 23:59:59.999999999')),
pd.Timestamp('2017-1-7 00:00:00.000000000')
]
event_list
```

```
[80]:
```

```
[(Timestamp('2017-01-02 23:59:59.999999999'),
Timestamp('2017-01-01 00:00:00')),
(Timestamp('2017-01-02 00:00:00'),
Timestamp('2017-01-03 23:59:59.999999999')),
(Timestamp('2017-01-08 00:00:00'),
Timestamp('2017-01-09 23:59:59.999999999')),
Timestamp('2017-01-05 00:00:00'),
(Timestamp('2017-01-09 00:00:00'),
Timestamp('2017-01-10 23:59:59.999999999')),
(Timestamp('2017-01-04 00:00:00'),
Timestamp('2017-01-05 23:59:59.999999999')),
Timestamp('2017-01-07 00:00:00')]
```

```
[81]:
```

```
from adtk.data import validate_events
validated_event_list = validate_events(event_list)
validated_event_list
```

```
[81]:
```

```
[(Timestamp('2017-01-02 00:00:00'),
Timestamp('2017-01-05 23:59:59.999999999')),
Timestamp('2017-01-07 00:00:00'),
(Timestamp('2017-01-08 00:00:00'),
Timestamp('2017-01-10 23:59:59.999999999'))]
```

## Convert a binary series into an event list¶

Function `to_events`

converts a binary series into an event list.

```
[82]:
```

```
s = pd.Series([True, False, False, True, True, True, False, False, True, False],
index=pd.date_range(start='2017-1-1', periods=10, freq='D'))
```

```
[83]:
```

```
from adtk.data import to_events
to_events(s)
```

```
[83]:
```

```
[(Timestamp('2017-01-01 00:00:00', freq='D'),
Timestamp('2017-01-01 23:59:59.999999999', freq='D')),
(Timestamp('2017-01-04 00:00:00', freq='D'),
Timestamp('2017-01-06 23:59:59.999999999', freq='D')),
(Timestamp('2017-01-09 00:00:00', freq='D'),
Timestamp('2017-01-09 23:59:59.999999999', freq='D'))]
```

For a series with regular frequency, every time item in the series is assumed to represent a period instead of an instantaneous time point by default. A user may turn this option off by setting `freq_as_period`

to False.

```
[84]:
```

```
to_events(s, freq_as_period=False)
```

```
[84]:
```

```
[Timestamp('2017-01-01 00:00:00'),
Timestamp('2017-01-04 00:00:00'),
Timestamp('2017-01-05 00:00:00'),
Timestamp('2017-01-06 00:00:00'),
Timestamp('2017-01-09 00:00:00')]
```

When time points are treated as instantaneous instants, the function will not merge “consecutive” points by default, because “consecutive” is a concept on continuous time line. By setting `merge_consecutive`

to True, a user may force the function to return “consecutive” time window where positive time points next to each other in the time index are merged into a same event. A user should be careful of the meaning of such time windows.

```
[85]:
```

```
to_events(s, freq_as_period=False, merge_consecutive=True)
```

```
[85]:
```

```
[Timestamp('2017-01-01 00:00:00', freq='D'),
(Timestamp('2017-01-04 00:00:00', freq='D'),
Timestamp('2017-01-06 00:00:00', freq='D')),
Timestamp('2017-01-09 00:00:00', freq='D')]
```

## Convert an event list into a binary series¶

Function `to_labels`

converts an event lists into a binary series following a given time index.

Similarly to `to_events`

, argument `freq_as_period`

controls whether an element in a time index with regular frequency represent a period or a time instant. If a period, the period is marked positive as long as an event overlaps with it. Otherwise, a time instant is marked as positive if an event covered it.

```
[86]:
```

```
time_index = pd.date_range(start='2017-1-1', periods=5, freq='D')
time_index
```

```
[86]:
```

```
DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04',
'2017-01-05'],
dtype='datetime64[ns]', freq='D')
```

```
[87]:
```

```
event_list = [(pd.Timestamp('2017-1-2 18:30:00'), pd.Timestamp('2017-1-3 02:00:00')),
(pd.Timestamp('2017-1-4 20:00:00'), pd.Timestamp('2017-1-5 09:30:00'))]
```

```
[88]:
```

```
from adtk.data import to_labels
to_labels(event_list, time_index, freq_as_period=True)
```

```
[88]:
```

```
2017-01-01 False
2017-01-02 True
2017-01-03 True
2017-01-04 True
2017-01-05 True
Freq: D, dtype: bool
```

```
[89]:
```

```
to_labels(event_list, time_index, freq_as_period=False)
```

```
[89]:
```

```
2017-01-01 False
2017-01-02 False
2017-01-03 True
2017-01-04 False
2017-01-05 True
Freq: D, dtype: bool
```

## Expand during of events¶

When calculating the quality of a detection result, sometimes slight offset is acceptable. Function `expand_events`

expands the duration of events in a list by a given factor, such that scoring is more tolerant.

In the following example, we have a detection result with 2 ~ 3 minutes offset from the true anomaly list. If we accept 5 minutes offset forward and backward, the score of the result is more meaningful.

```
[90]:
```

```
true_anomaly_list = [
pd.Timestamp('2017-1-2 18:00:00'),
pd.Timestamp('2017-1-4 20:00:00')]
```

```
[91]:
```

```
detected_anomaly_list = [
pd.Timestamp('2017-1-2 18:02:00'),
pd.Timestamp('2017-1-4 19:57:00')]
```

```
[92]:
```

```
from adtk.metrics import precision
precision(true_anomaly_list, detected_anomaly_list)
```

```
[92]:
```

```
0.0
```

```
[93]:
```

```
from adtk.data import expand_events
expanded_true_anomaly_list = expand_events(
true_anomaly_list, left_expand=pd.Timedelta('5m'), right_expand=pd.Timedelta('5m'))
expanded_true_anomaly_list
```

```
[93]:
```

```
[(Timestamp('2017-01-02 17:55:00'), Timestamp('2017-01-02 18:05:00')),
(Timestamp('2017-01-04 19:55:00'), Timestamp('2017-01-04 20:05:00'))]
```

```
[94]:
```

```
precision(expanded_true_anomaly_list, detected_anomaly_list)
```

```
[94]:
```

```
1.0
```

## Resample a time series with new spacing¶

Function `resample`

samples time series with given new spacing. Resampled values are determined by time-weighted linear interpolation.

In following example, we resample the given time series with spacing of 1 day.

```
[95]:
```

```
s = pd.Series([1, 2.5, 3, 4.25, 6, 9],
index=pd.DatetimeIndex(['2017-1-1 00:00:00', '2017-1-2 12:00:00', '2017-1-3 00:00:00',
'2017-1-4 06:00:00', '2017-1-6 00:00:00', '2017-1-9 00:00:00']))
```

```
[96]:
```

```
from adtk.data import resample
resample(s, dT=pd.Timedelta("1 day"))
```

```
[96]:
```

```
2017-01-01 1.0
2017-01-02 2.0
2017-01-03 3.0
2017-01-04 4.0
2017-01-05 5.0
2017-01-06 6.0
2017-01-07 7.0
2017-01-08 8.0
2017-01-09 9.0
Freq: D, dtype: float64
```

If no spacing is specified, the greatest common divider of original time steps will be used, which makes the refinement of time line a minimal refinement subject to keeping all original time points still included in the resampled time series. Please note that this may dramatically increase the size of time series and memory usage.

For the previous example, such a refinement yields constant spacing of 6 hours. All original time points are kept in the refined series.

```
[97]:
```

```
resample(s)
```

```
[97]:
```

```
2017-01-01 00:00:00 1.00
2017-01-01 06:00:00 1.25
2017-01-01 12:00:00 1.50
2017-01-01 18:00:00 1.75
2017-01-02 00:00:00 2.00
2017-01-02 06:00:00 2.25
2017-01-02 12:00:00 2.50
2017-01-02 18:00:00 2.75
2017-01-03 00:00:00 3.00
2017-01-03 06:00:00 3.25
2017-01-03 12:00:00 3.50
2017-01-03 18:00:00 3.75
2017-01-04 00:00:00 4.00
2017-01-04 06:00:00 4.25
2017-01-04 12:00:00 4.50
2017-01-04 18:00:00 4.75
2017-01-05 00:00:00 5.00
2017-01-05 06:00:00 5.25
2017-01-05 12:00:00 5.50
2017-01-05 18:00:00 5.75
2017-01-06 00:00:00 6.00
2017-01-06 06:00:00 6.25
2017-01-06 12:00:00 6.50
2017-01-06 18:00:00 6.75
2017-01-07 00:00:00 7.00
2017-01-07 06:00:00 7.25
2017-01-07 12:00:00 7.50
2017-01-07 18:00:00 7.75
2017-01-08 00:00:00 8.00
2017-01-08 06:00:00 8.25
2017-01-08 12:00:00 8.50
2017-01-08 18:00:00 8.75
2017-01-09 00:00:00 9.00
Freq: 6H, dtype: float64
```

## Split a time series into training and testing segments¶

When building a time series anomaly detection model, splitting a historical series into training and testing segments is a common operation needed.

Function `split_train_test`

provides four modes of splitting. A user may choose from it according to the specific case.

- Divide time series into n_splits folds of equal length, split each fold into training and testing based on train_ratio.
- Create n_splits folds, where each fold starts at t_0 and ends at t_(n/n_splits), where n goes from 0 to n_splits and the first train_ratio of the fold is for training.
- Create n_splits folds, where each fold starts at t_0. Each fold has len(ts)/(1 + n_splits) test points at the end. Each fold is n * len(ts)/(1 + n_splits) long, where n ranges from 1 to n_splits.
- Create n_splits folds, where each fold starts at t_0. Each fold has n * len(ts)/(1 + n_splits) training points at the beginning of the time series, where n ranges from 1 to n_splits and the remaining points are testing points.

```
[98]:
```

```
s = pd.read_csv('./data/seasonal.csv', index_col="Time", parse_dates=True, squeeze=True)
```

```
[99]:
```

```
from adtk.data import split_train_test
s_train, s_test = split_train_test(s, mode=1, n_splits=3)
plot(pd.concat(s_train+s_test, axis=1), curve_group=[[0, 3], [1, 4], [2, 5]], figsize=(16, 8));
```

```
[100]:
```

```
s_train, s_test = split_train_test(s, mode=2, n_splits=3)
plot(pd.concat(s_train+s_test, axis=1), curve_group=[[0, 3], [1, 4], [2, 5]], figsize=(16, 8));
```

```
[101]:
```

```
s_train, s_test = split_train_test(s, mode=3, n_splits=3)
plot(pd.concat(s_train+s_test, axis=1), curve_group=[[0, 3], [1, 4], [2, 5]], figsize=(16, 8));
```

```
[102]:
```

```
s_train, s_test = split_train_test(s, mode=4, n_splits=3)
plot(pd.concat(s_train+s_test, axis=1), curve_group=[[0, 3], [1, 4], [2, 5]], figsize=(16, 8));
```