from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any, Dict, List, Tuple, Union
import pandas as pd
class _Model(ABC):
"Base class for all models (detectors, transformers, and aggregators)."
def __init__(self, *args: Any, **kwargs: Any) -> None:
pass
def get_params(self) -> Dict[str, Any]:
"""Get the parameters of this model.
Returns
-------
dict
Model parameters.
"""
return {key: getattr(self, key) for key in self._param_names}
def set_params(self, **params: Any) -> None:
"""Set the parameters of this model.
Parameters
----------
**params
Model parameters to set.
"""
for key in params.keys():
if key not in self._param_names:
raise KeyError(
"'{}' is not a valid parameter name.".format(key)
)
for key, value in params.items():
setattr(self, key, value)
@property
@abstractmethod
def _param_names(self) -> Tuple[str, ...]:
return tuple()
class _NonTrainableModel(_Model):
"Base class of models that do not need training."
@abstractmethod
def _predict(self, input: Any) -> Any:
pass
@abstractmethod
def _predict_core(self, input: Any) -> Any:
pass
@abstractmethod
def predict(self, input: Any) -> Any:
pass
class _TrainableModel(_Model):
"Base class of models that need training."
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# 0 for not fitted, 1 for fitted, 2 for univariate model fitted by DF
self._fitted = 0 # type: int
@abstractmethod
def _fit(self, input: Any) -> None:
pass
@abstractmethod
def _fit_core(self, input: Any) -> None:
pass
@abstractmethod
def fit(self, input: Any) -> None:
pass
@abstractmethod
def _predict(self, input: Any) -> Any:
pass
@abstractmethod
def _predict_core(self, input: Any) -> Any:
pass
@abstractmethod
def predict(self, input: Any) -> Any:
pass
@abstractmethod
def fit_predict(self, input: Any) -> Any:
pass
class _NonTrainableUnivariateModel(_NonTrainableModel):
"Base class of univariate detectors and transformers."
def _predict(
self, ts: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(ts, pd.Series):
s = ts.copy() # type: pd.Series
if not isinstance(s.index, pd.DatetimeIndex):
raise TypeError(
"Index of the input time series must be a pandas "
"DatetimeIndex object."
)
predicted = self._predict_core(s)
# if a Series-to-Series operation, make sure Series name keeps
if isinstance(predicted, pd.Series):
predicted.name = ts.name
elif isinstance(ts, pd.DataFrame):
df = ts.copy() # type: pd.DataFrame
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
# apply the model to each column
predicted_all_cols = []
for col in df.columns:
predicted_this_col = self._predict(df[col])
# if a Series-to-DF operation, update column name
if isinstance(predicted_this_col, pd.DataFrame):
predicted_this_col = predicted_this_col.rename(
columns={
col1: "{}_{}".format(col, col1)
for col1 in predicted_this_col.columns
}
)
predicted_all_cols.append(predicted_this_col)
predicted = pd.concat(predicted_all_cols, axis=1)
else:
raise TypeError("Input must be a pandas Series or DataFrame.")
# make sure index freq is the same (because pandas has a bug that some
# operation, e.g. concat, may change freq)
predicted.index.freq = ts.index.freq
return predicted
class _TrainableUnivariateModel(_TrainableModel):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._models = dict() # type: Dict[str, _TrainableUnivariateModel]
def _fit(self, ts: Union[pd.Series, pd.DataFrame]) -> None:
if isinstance(ts, pd.Series):
s = ts.copy() # type: pd.Series
self._fit_core(s)
self._fitted = 1
elif isinstance(ts, pd.DataFrame):
df = ts.copy()
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError(
"Index of the input time series must be a pandas "
"DatetimeIndex object."
)
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
# create internal models
self._models = {
col: self.__class__(**deepcopy(self.get_params()))
for col in df.columns
}
# fit model for each column
for col in df.columns:
self._models[col].fit(df[col])
self._fitted = 2
else:
raise TypeError("Input must be a pandas Series or DataFrame.")
def _predict(
self, ts: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if self._fitted == 0:
raise RuntimeError("The model must be trained first.")
if isinstance(ts, pd.Series):
if self._fitted == 2:
raise RuntimeError(
"The model was trained by a pandas DataFrame object, "
"it can only be applied to a pandas DataFrame object with "
"the same column names as the one used for training."
)
s = ts.copy()
if not isinstance(s.index, pd.DatetimeIndex):
raise TypeError(
"Index of the input time series must be a pandas "
"DatetimeIndex object."
)
predicted = self._predict_core(s)
# if a Series-to-Series operation, make sure Series name keeps
if isinstance(predicted, pd.Series):
predicted.name = ts.name
elif isinstance(ts, pd.DataFrame):
df = ts.copy()
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError(
"Index of the input time series must be a pandas "
"DatetimeIndex object."
)
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
if self._fitted == 1:
# apply the model to each column
predicted_all_cols = []
for col in df.columns:
predicted_this_col = self._predict(df[col])
if isinstance(predicted_this_col, pd.DataFrame):
predicted_this_col = predicted_this_col.rename(
columns={
col1: "{}_{}".format(col, col1)
for col1 in predicted_this_col.columns
}
)
predicted_all_cols.append(predicted_this_col)
predicted = pd.concat(predicted_all_cols, axis=1)
else:
# predict for each column
if not (set(self._models.keys()) >= set(df.columns)):
raise ValueError(
"The model was trained by a pandas DataFrame with "
"columns {}, but the input DataFrame contains columns "
"{} which are unknown to the model.".format(
list(set(self._models.keys())),
list(set(df.columns) - set(self._models.keys())),
)
)
predicted = pd.concat(
[
self._models[col]._predict(df[col])
for col in df.columns
],
axis=1,
)
else:
raise TypeError("Input must be a pandas Series or DataFrame.")
# make sure index freq is the same (because pandas has a bug that some
# operation, e.g. concat, may change freq)
predicted.index.freq = ts.index.freq
return predicted
class _NonTrainableMultivariateModel(_NonTrainableModel):
def _predict(self, df: pd.DataFrame) -> Union[pd.Series, pd.DataFrame]:
if isinstance(df, pd.DataFrame):
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
df_copy = df.copy()
predicted = self._predict_core(df_copy)
else:
raise TypeError("Input must be a pandas DataFrame.")
# make sure index freq is the same (because pandas has a bug that some
# operation, e.g. concat, may change freq)
predicted.index.freq = df.index.freq
return predicted
class _TrainableMultivariateModel(_TrainableModel):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._cols = [] # type: List[str]
def _fit(self, df: pd.DataFrame) -> None:
if isinstance(df, pd.DataFrame):
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
df_copy = df.copy()
self._fit_core(df_copy)
else:
raise TypeError("Input must be a pandas DataFrame.")
self._cols = list(df.columns)
self._fitted = 1
def _predict(self, df: pd.DataFrame) -> Union[pd.Series, pd.DataFrame]:
if self._fitted == 0:
raise RuntimeError("The model must be trained first.")
if isinstance(df, pd.DataFrame):
if df.columns.duplicated().any():
raise ValueError(
"Input DataFrame must have unique column names."
)
if not (set(df.columns) >= set(self._cols)):
raise ValueError(
"The model was trained by a pandas DataFrame with columns "
"{}, but the input DataFrame does not contain columns {}.".format(
self._cols, list(set(self._cols) - set(df.columns))
)
)
df_copy = (
df.loc[:, self._cols].copy() if self._cols else df.copy()
) # in a customized hd model that doesn't need fit, self._cols is empty
predicted = self._predict_core(df_copy)
else:
raise TypeError("Input must be a pandas DataFrame.")
# make sure index freq is the same (because pandas has a bug that some
# operation, e.g. concat, may change freq)
predicted.index.freq = df.index.freq
return predicted