"""Module for 1-dimensional transformers.
1-dimensional transformers transform 1-dimensional time series, i.e. pandas
Series, into different series, to extract useful information out of the
original time series.
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import statsmodels
from packaging.version import parse
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import acf
from .._transformer_base import (
from .._utils import PandasBugError
[docs]class StandardScale(_NonTrainableUnivariateTransformer):
"""Transformer that scales time series such that mean is equal to 0 and
standard deviation is equal to 1.
def __init__(self) -> None:
def _param_names(self) -> Tuple[str, ...]:
return tuple()
def _predict_core(self, s: pd.Series) -> pd.Series:
mean = s.mean()
std = s.std()
if std == 0:
std = 1
return (s - mean) / std
[docs]class RollingAggregate(_NonTrainableUnivariateTransformer):
"""Transformer that rolls a sliding window along a time series, and
aggregates using a user-selected operation.
window: int or str
Size of the rolling time window.
- If int, it is the number of time point in this time window.
- If str, it must be able to be converted into a pandas Timedelta
agg: str or function
Aggregation method applied to series.
If str, must be one of supported built-in methods:
- 'mean': mean of all values in a rolling window.
- 'median': median of all values in a rolling window.
- 'sum': summation of all values in a rolling window.
- 'min': minimum of all values in a rolling window.
- 'max': maximum of all values in a rolling window.
- 'std': sample standard deviation of all values in a rolling window.
- 'var': sample variance of all values in a rolling window.
- 'skew': skewness of all values in a rolling window.
- 'kurt': kurtosis of all values in a rolling window.
- 'count': number of non-nan values in a rolling window.
- 'nnz': number of non-zero values in a rolling window.
- 'nunique': number of unique values in a rolling window.
- 'quantile': quantile of all values in a rolling window. Require
percentile parameter `q` in in parameter `agg_params`, which is a
float or a list of float between 0 and 1 inclusive.
- 'iqr': interquartile range, i.e. difference between 75% and 25%
- 'idr': interdecile range, i.e. difference between 90% and 10%
- 'hist': histogram of all values in a rolling window. Require
parameter `bins` in parameter `agg_params` to define the bins. `bins`
is either a list of floats, b1, ..., bn, which defines n-1 bins
[b1, b2), [b2, b3), ..., [b{n-2}, b{n-1}), [b{n-1}, bn], or an
integer that defines the number of equal-width bins in the range of
input series.
If function, it should accept a rolling window in form of a pandas
Series, and return either a scalar or a 1D numpy array. To specify
names of outputs, specify a list of strings as a parameter `names` in
parameter `agg_params`.
Default: 'mean'
agg_params: dict, optional
Parameters of aggregation function. Default: None.
center: bool, optional
Whether the calculation is at the center of time window or on the right
edge. Default: False.
min_periods: int, optional
Minimum number of observations in window required to have a value.
Default: None, i.e. all observations must have values.
def __init__(
window: Union[int, str],
agg: Union[
str, Callable[[pd.Series], Union[float, np.ndarray]]
] = "mean",
agg_params: Optional[Dict[str, Any]] = None,
center: bool = False,
min_periods: Optional[int] = None,
) -> None:
self.agg = agg
self.agg_params = agg_params
self.window = window
self.center = center
self.min_periods = min_periods
self._closed = None # type: Any
def _param_names(self) -> Tuple[str, ...]:
return ("window", "agg", "agg_params", "center", "min_periods")
def _predict_core(self, s: pd.Series) -> Union[pd.Series, pd.DataFrame]:
if not (
s.index.is_monotonic_increasing or s.index.is_monotonic_decreasing
raise ValueError("Time series must have a monotonic time index. ")
agg = self.agg
agg_params = self.agg_params if (self.agg_params is not None) else {}
window = self.window
center = self.center
min_periods = self.min_periods
closed = self._closed
rolling = s.rolling(
) # type: Union[pd.Series, pd.DataFrame]
def getRollingVector(
rolling: Union[pd.Series, pd.DataFrame],
aggFunc: Any,
output_names: List[str],
) -> Union[pd.Series, pd.DataFrame]:
# we use this function to trick pandas to get vector rolling agg
s_rolling_raw = [] # type: Union[List, np.array]
def agg_wrapped(x: Any) -> int:
return 0
s_rolling = rolling.agg(agg_wrapped)
s_rolling_raw = np.array(s_rolling_raw)
df = pd.concat([s_rolling] * s_rolling_raw.shape[1], axis=1)
df[s_rolling.notna()] = s_rolling_raw
s_rolling = df
s_rolling.columns = output_names
return s_rolling
aggList = [
if agg in [
s_rolling = rolling.agg(agg)
elif agg == "nunique":
s_rolling = rolling.agg(lambda x: len(np.unique(x.dropna())))
elif agg == "nnz":
s_rolling = rolling.agg(np.count_nonzero)
elif agg == "quantile":
if hasattr(agg_params["q"], "__iter__"):
s_rolling = pd.concat(
for q in agg_params["q"]
s_rolling = rolling.quantile(agg_params["q"])
elif agg == "iqr":
s_rolling = rolling.quantile(0.75) - rolling.quantile(0.25)
elif agg == "idr":
s_rolling = rolling.quantile(0.9) - rolling.quantile(0.1)
elif agg == "hist":
if isinstance(agg_params["bins"], int):
_, bins = np.histogram(
s.dropna().values, bins=agg_params["bins"]
bins = agg_params["bins"]
s_rolling = getRollingVector(
lambda x: np.histogram(x, bins=bins)[0],
"[{}, {}{}".format(
bins[i + 1],
")" if i < len(bins) - 2 else "]",
for i in range(len(bins) - 1)
elif callable(agg):
s_rolling = rolling.agg(agg)
except TypeError:
if "names" in agg_params.keys():
s_rolling = getRollingVector(
rolling, agg, agg_params["names"]
raise RuntimeError(
"Names of vector output are not specified."
raise ValueError("Attribute agg must be one of {}".format(aggList))
if isinstance(s_rolling, pd.Series):
s_rolling.name = s.name
return s_rolling
[docs]class DoubleRollingAggregate(_NonTrainableUnivariateTransformer):
"""Transformer that rolls two sliding windows side-by-side along a time
series, aggregates using a user-given operation, and calcuates the
difference of aggregated metrics between two sliding windows.
window: int or str, or 2-tuple of int or str
Size of the rolling time window.
- If int, it is the number of time point in this time window.
- If str, it must be able to be converted into a pandas Timedelta
- If tuple, it defines the size of left and right window respectively.
agg: str or function, or 2-tuple of str or function
Aggregation method applied to series.
If str, must be one of supported built-in methods:
- 'mean': mean of all values in a rolling window.
- 'median': median of all values in a rolling window.
- 'sum': summation of all values in a rolling window.
- 'min': minimum of all values in a rolling window.
- 'max': maximum of all values in a rolling window.
- 'std': sample standard deviation of all values in a rolling window.
- 'var': sample variance of all values in a rolling window.
- 'skew': skewness of all values in a rolling window.
- 'kurt': kurtosis of all values in a rolling window.
- 'count': number of non-nan values in a rolling window.
- 'nnz': number of non-zero values in a rolling window.
- 'nunique': number of unique values in a rolling window.
- 'quantile': quantile of all values in a rolling window. Require
percentile parameter `q` in in parameter `agg_params`, which is a
float or a list of float between 0 and 1 inclusive.
- 'iqr': interquartile range, i.e. difference between 75% and 25%
- 'idr': interdecile range, i.e. difference between 90% and 10%
- 'hist': histogram of all values in a rolling window. Require
parameter `bins` in parameter `agg_params` to define the bins. `bins`
is either a list of floats, b1, ..., bn, which defines n-1 bins
[b1, b2), [b2, b3), ..., [b{n-2}, b{n-1}), [b{n-1}, bn], or an
integer that defines the number of equal-width bins in the range of
input series.
If function, it should accept a rolling window in form of a pandas
Series, and return either a scalar or a 1D numpy array. To specify
names of outputs, specify a list of strings as a parameter `names` in
parameter `agg_params`.
If tuple, elements correspond left and right window respectively.
Default: 'mean'
agg_params: dict or 2-tuple of dict, optional
Parameters of aggregation function. If tuple, elements correspond left
and right window respectively. Default: None.
center: bool, optional
If True, the current point is the right edge of right window;
Otherwise, it is the right edge of left window.
Default: True.
min_periods: int or 2-tuple of int, optional
Minimum number of observations in window required to have a value. If
tuple, elements correspond left and right window respectively. Default:
None, i.e. all observations must have values.
diff: str or function, optional
Difference method applied between aggregated metrics from the two
sliding windows.
If str, choose from supported built-in methods:
- 'diff': Difference between values of aggregated metric (right minus
left). Only applicable if the aggregated metric is scalar.
- 'rel_diff': Relative difference between values of aggregated metric
(right minus left divided left). Only applicable if the aggregated
metric is scalar.
- 'abs_rel_diff': Absolute relative difference between values of
aggregated metric (right minus left divided left). Only applicable if
the aggregated metric is scalar.
- 'l1': Absolute difference if aggregated metric is scalar, or sum of
elementwise absolute difference if it is a vector.
- 'l2': Square root of sum of elementwise squared difference.
If function, it accepts two input arguments that are the two outputs of
applying aggregation method to the two windows, and returns a float
number measuring the difference.
Default: 'l1'
def __init__(
window: Union[int, str, Tuple[Union[int, str], Union[int, str]]],
agg: Union[
Callable[[pd.Series], Union[float, np.ndarray]],
Union[str, Callable[[pd.Series], Union[float, np.ndarray]]],
Union[str, Callable[[pd.Series], Union[float, np.ndarray]]],
] = "mean",
agg_params: Union[
Optional[Dict[str, Any]],
Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]],
] = None,
center: bool = True,
min_periods: Union[
Optional[int], Tuple[Optional[int], Optional[int]]
] = None,
diff: Union[
[Union[float, np.ndarray], Union[float, np.ndarray]], float
] = "l1",
) -> None:
self.agg = agg
self.agg_params = agg_params
self.window = window
self.min_periods = min_periods
self.center = center
self.diff = diff
def _param_names(self) -> Tuple[str, ...]:
return ("window", "agg", "agg_params", "center", "min_periods")
def _predict_core(self, s: pd.Series) -> pd.Series:
if not (
s.index.is_monotonic_increasing or s.index.is_monotonic_decreasing
raise ValueError("Time series must have a monotonic time index. ")
agg = self.agg
agg_params = self.agg_params if (self.agg_params is not None) else {}
window = self.window
min_periods = self.min_periods
center = self.center
diff = self.diff
if not isinstance(agg, tuple):
agg = (agg, agg)
if not isinstance(agg_params, tuple):
agg_params = (agg_params, agg_params)
if not isinstance(window, tuple):
window = (window, window)
if not isinstance(min_periods, tuple):
min_periods = (min_periods, min_periods)
if center:
if isinstance(window[0], int):
s_rolling_left = RollingAggregate(
ra = RollingAggregate(
if parse(pd.__version__) < parse("0.25"):
raise PandasBugError()
ra._closed = "left"
s_rolling_left = ra.transform(s)
if isinstance(window[1], int):
s_rolling_right = (
s_reversed = pd.Series(
s.index[0] + (s.index[-1] - s.index[i])
for i in range(len(s) - 1, -1, -1)
s_rolling_right = pd.Series(
s_rolling_right.name = s.name
if isinstance(window[1], int):
s_rolling_left = RollingAggregate(
s_shifted = pd.Series(
s.values, s.index + pd.Timedelta(window[1])
s_shifted = s_shifted.append(
pd.Series(index=s.index, dtype="float64")
s_shifted = s_shifted.iloc[
s_shifted.index.duplicated() == False
s_shifted = s_shifted.sort_index()
s_shifted.name = s.name
s_rolling_left = RollingAggregate(
if isinstance(s_rolling_left, pd.Series):
s_rolling_left = s_rolling_left[s.index]
s_rolling_left = s_rolling_left.loc[s.index, :]
s_rolling_right = RollingAggregate(
if isinstance(s_rolling_left, pd.Series):
if diff in ["l1", "l2"]:
return abs(s_rolling_right - s_rolling_left)
if diff == "diff":
return s_rolling_right - s_rolling_left
if diff == "rel_diff":
return (s_rolling_right - s_rolling_left) / s_rolling_left
if diff == "abs_rel_diff":
return abs(s_rolling_right - s_rolling_left) / s_rolling_left
if isinstance(s_rolling_left, pd.DataFrame):
if diff == "l1":
return abs(s_rolling_right - s_rolling_left).sum(
axis=1, skipna=False
if diff == "l2":
return ((s_rolling_right - s_rolling_left) ** 2).sum(
axis=1, skipna=False
) ** 0.5
if callable(diff):
s_rolling = s.copy()
for i in range(len(s_rolling)):
s_rolling.iloc[i] = diff(
s_rolling_left.iloc[i], s_rolling_right.iloc[i]
return s_rolling
raise ValueError("Invalid value of diff")
[docs]class ClassicSeasonalDecomposition(_TrainableUnivariateTransformer):
"""Transformer that performs classic seasonal decomposition to the time
series, and returns residual series.
Classic seasonal decomposition assumes time series is the sum of trend,
seasonal pattern, and noise (residual). This transformer calculates and
removes trend component with moving average, extracts seasonal pattern by
taking average over seasonal periods of the detrended series, and returns
residual series.
The `fit` method fits seasonal frequency (if not specified) and seasonal
pattern with the training series. The `transform` (or its alias `predict`)
method extracts the trend by moving average, but will NOT re-calucate the
seasonal pattern. Instead, it uses the trained seasonal pattern and
extracts it from the detrended series to obtain the residual series. This
implicitly assumes the seasonal property does not change over time.
freq: int, optional
Length of a seasonal cycle as the number of time points in a cycle. If
None, the model will determine based on autocorrelation of the training
series. Default: None.
trend: bool, optional
Whether to extract and remove trend of the series with moving average.
If False, the time series will be assumed the sum of seasonal pattern
and residual. Default: False.
freq_: int
Length of seasonal cycle. Equal to parameter `freq` if it is given.
Otherwise, calculated based on autocorrelation of the training series.
seasonal_: pandas.Series
Seasonal pattern extracted from training series.
def __init__(
self, freq: Optional[int] = None, trend: bool = False
) -> None:
self.freq = freq
self.trend = trend
def _param_names(self) -> Tuple[str, ...]:
return ("freq", "trend")
def _fit_core(self, s: pd.Series) -> None:
if not (
s.index.is_monotonic_increasing or s.index.is_monotonic_decreasing
raise ValueError("Time series must have a monotonic time index. ")
# remove starting and ending nans
s = s.loc[s.first_valid_index() : s[::-1].first_valid_index()].copy()
if pd.isna(s).any():
raise ValueError(
"Found NaN in time series among valid values. "
"NaNs starting or ending a time series are allowed, "
"but those among valid values are not."
# get datum time
self._datumTimestamp = s.index[0]
# get series_freq
if s.index.freq is not None:
self._series_freq = s.index.freqstr
self._series_freq = s.index.inferred_freq
if self._series_freq is None:
raise RuntimeError(
"Series does not follow any known frequency "
"(e.g. second, minute, hour, day, week, month, year, etc."
# get average dT
self._dT = pd.Series(s.index).diff().mean()
# get seasonal freq
if self.freq is None:
identified_freq = _identify_seasonal_period(s)
if identified_freq is None:
raise Exception("Could not find significant seasonality.")
self.freq_ = identified_freq
self.freq_ = self.freq
# get seasonal pattern
if self.trend:
seasonal_decompose_results = (
seasonal_decompose(s, period=self.freq_)
if parse(statsmodels.__version__) >= parse("0.11")
else seasonal_decompose(s, freq=self.freq_)
self.seasonal_ = getattr(seasonal_decompose_results, "seasonal")[
: self.freq_
self.seasonal_ = s.iloc[: self.freq_].copy()
for i in range(len(self.seasonal_)):
self.seasonal_.iloc[i] = s.iloc[
i :: len(self.seasonal_)
def _predict_core(self, s: pd.Series) -> pd.Series:
if not (
s.index.is_monotonic_increasing or s.index.is_monotonic_decreasing
raise ValueError("Time series must have a monotonic time index. ")
# check if series freq is same
if self._series_freq not in {s.index.freqstr, s.index.inferred_freq}:
raise RuntimeError(
"Model was trained by a series whose index has {} frequency, "
"but is tranforming a series whose index has {} frequency.".format(
self._series_freq, s.index.freq
# get phase shift
approx_steps = (s.index[0] - self._datumTimestamp) / self._dT
# try to find starting_phase
if approx_steps > 0:
helper_index = pd.date_range(
periods=round(approx_steps) + 100,
if helper_index[-1] <= s.index[0]:
raise RuntimeError("You shouldn't have reached here...")
for i in range(len(helper_index) - 1, -1, -1):
if helper_index[i] == s.index[0]:
starting_phase = i % self.freq_
elif helper_index[i] < s.index[0]:
raise RuntimeError(
"The series to be transformed has different "
"phases from the series used to train the model."
raise RuntimeError(
"You definitely shouldn't have reached here..."
elif approx_steps < 0:
helper_index = pd.date_range(
periods=round(-approx_steps) + 100,
if helper_index[0] >= s.index[0]:
raise RuntimeError("You shouldn't have reached here...")
for i in range(len(helper_index)):
if helper_index[i] == s.index[0]:
starting_phase = (len(helper_index) - 1 - i) % self.freq_
if starting_phase != 0:
starting_phase = self.freq_ - starting_phase
elif helper_index[i] > s.index[0]:
raise RuntimeError(
"The series to be transformed has different "
"phases from the series used to train the model."
raise RuntimeError(
"You definitely shouldn't have reached here..."
starting_phase = 0
# remove trend
if self.trend:
seasonal_decompose_results = (
seasonal_decompose(s, period=self.freq_)
if parse(statsmodels.__version__) >= parse("0.11")
else seasonal_decompose(s, freq=self.freq_)
s_trend = getattr(seasonal_decompose_results, "trend")
s_detrended = s - s_trend
# get seasonal series and remove it from original
phase_pattern = np.concatenate(
[np.arange(starting_phase, self.freq_), np.arange(starting_phase)]
s_seasonal = pd.Series(
phase_pattern[np.arange(len(s)) % self.freq_]
if self.trend:
s_residual = s_detrended - s_seasonal
s_residual = s - s_seasonal
return s_residual
def _identify_seasonal_period(
s: pd.Series, low_autocorr: float = 0.1, high_autocorr: float = 0.3
) -> Optional[int]:
"""Identify seasonal period of a time series based on autocorrelation.
s: pandas Series or DataFrame
Time series where to identify seasonal periods.
low_autocorr: float, optional
Threshold below which values of autocorreltion are consider low.
Default: 0.1
high_autocorr: float, optional
Threshold below which values of autocorreltion are consider high.
Default: 0.3
Seasonal period of the time series. If no significant seasonality
is found, return None.
if low_autocorr > high_autocorr:
raise ValueError("`low_autocorr` must not exceed `high_autocorr`")
# check if the time series has uniform time step
if len(np.unique(np.diff(s.index))) > 1:
raise ValueError("The time steps are not constant. ")
autocorr = acf(s, nlags=len(s), fft=False)
cutPos = np.argwhere(autocorr >= low_autocorr)[0][0]
diff_autocorr = np.diff(autocorr[cutPos:])
high_autocorr_peak_pos = (
+ 1
+ np.argwhere(
(diff_autocorr[:-1] > 0)
& (diff_autocorr[1:] < 0)
& (autocorr[cutPos + 1 : -1] > high_autocorr)
if len(high_autocorr_peak_pos) > 0:
return high_autocorr_peak_pos[
return None
[docs]class Retrospect(_NonTrainableUnivariateTransformer):
"""Transformer that returns dataframe with retrospective values, i.e. a row
at time t includes value at (t-k)'s where k's are specified by user.
This transformer may be useful for cases where lagging effect should be
taken in account. For example, a change of control u may not be reflected
in outcome y within 2 minutes, and its effect may last for another 3
minutes. In this case, a dataframe where each row include u_[t-3], u_[t-4],
u_[t-5], and a series y_t are needed to learn the relationship between
control and outcome.
n_steps: int, optional
Number of retrospective steps to take. Default: 1.
step_size: int, optional
Length of a retrospective step. Default: 1.
till: int, optional
Nearest retrospective step. Default: 0, i.e. the current time step.
>>> s = pd.Series(
2017-01-01 0
2017-01-02 1
2017-01-03 2
2017-01-04 3
2017-01-05 4
2017-01-06 5
2017-01-07 6
2017-01-08 7
2017-01-09 8
2017-01-10 9
>>> Retrospect(n_steps=3, step_size=2, till=1).transform(s)
t-1 t-3 t-5
2017-01-01 NaN NaN NaN
2017-01-02 0.0 NaN NaN
2017-01-03 1.0 NaN NaN
2017-01-04 2.0 0.0 NaN
2017-01-05 3.0 1.0 NaN
2017-01-06 4.0 2.0 0.0
2017-01-07 5.0 3.0 1.0
2017-01-08 6.0 4.0 2.0
2017-01-09 7.0 5.0 3.0
2017-01-10 8.0 6.0 4.0
def __init__(
self, n_steps: int = 1, step_size: int = 1, till: int = 0
) -> None:
self.n_steps = n_steps
self.step_size = step_size
self.till = till
def _param_names(self) -> Tuple[str, ...]:
return ("n_steps", "step_size", "till")
def _predict_core(self, s: pd.Series) -> pd.DataFrame:
if not (
s.index.is_monotonic_increasing or s.index.is_monotonic_decreasing
raise ValueError("Time series must have a monotonic time index. ")
if (s.index.freq is None) and (s.index.inferred_freq):
raise RuntimeError(
"Series does not follow any known frequency "
"(e.g. second, minute, hour, day, week, month, year, etc."
n_steps = self.n_steps
till = self.till
step_size = self.step_size
df = pd.DataFrame(index=s.index)
df = df.assign(
("t-{}".format(i)): s.shift(i)
for i in range(till, till + n_steps * step_size, step_size)
return df