Source code for adtk.data._data

"""Module is for data (time series and anomaly list) processing.
"""

from typing import Dict, List, Optional, Tuple, Union, overload

import numpy as np
import pandas as pd


[docs]def validate_series( ts: Union[pd.Series, pd.DataFrame], check_freq: bool = True, check_categorical: bool = False, ) -> Union[pd.Series, pd.DataFrame]: """Validate time series. This functoin will check some common critical issues of time series that may cause problems if anomaly detection is performed without fixing them. The function will automatically fix some of them and raise errors for the others. Issues will be checked and automatically fixed include: - Time index is not monotonically increasing; - Time index contains duplicated time stamps (fix by keeping first values); - (optional) Time index attribute `freq` is missed while the index follows a frequency; - (optional) Time series include categorical (non-binary) label columns (to fix by converting categorical labels into binary indicators). Issues will be checked and raise error include: - Wrong type of time series object (must be pandas Series or DataFrame); - Wrong type of time index object (must be pandas DatetimeIndex). Parameters ---------- ts: pandas Series or DataFrame Time series to be validated. check_freq: bool, optional Whether to check time index attribute `freq` is missed. Default: True. check_categorical: bool, optional Whether to check time series include categorical (non-binary) label columns. Default: False. Returns ------- pandas Series or DataFrame Validated time series. """ ts = ts.copy() # check input type if not isinstance(ts, (pd.Series, pd.DataFrame)): raise TypeError("Input is not a pandas Series or DataFrame object") # check index type if not isinstance(ts.index, pd.DatetimeIndex): raise TypeError( "Index of time series must be a pandas DatetimeIndex object." ) # check duplicated if any(ts.index.duplicated(keep="first")): ts = ts[ts.index.duplicated(keep="first") == False] # check sorted if not ts.index.is_monotonic_increasing: ts.sort_index(inplace=True) # check time step frequency if check_freq: if (ts.index.freq is None) and (ts.index.inferred_freq is not None): ts = ts.asfreq(ts.index.inferred_freq) # convert categorical labels into binary indicators if check_categorical: if isinstance(ts, pd.DataFrame): ts = pd.get_dummies(ts) if isinstance(ts, pd.Series): seriesName = ts.name ts = pd.get_dummies( ts.to_frame(), prefix="" if seriesName is None else seriesName, prefix_sep="" if seriesName is None else "_", ) if len(ts.columns) == 1: ts = ts[ts.columns[0]] ts.name = seriesName return ts
[docs]def validate_events( event_list: List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], point_as_interval: bool = False, ) -> List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]: """Validate event list. This function will check and fix some common issues in an event list (a list of time windows), including invalid time window, overlapped time windows, unsorted events, etc. Parameters ---------- event_list: list A list of events, where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval. point_as_interval: bool, optional Whether to return all instantaneous event as a close interval with identicial start point and end point. Default: False. Returns ------- list: A validated list of events. """ if not isinstance(event_list, list): raise TypeError("Argument `event_list` must be a list.") for event in event_list: if not ( isinstance(event, pd.Timestamp) or ( isinstance(event, tuple) and (len(event) == 2) and all([isinstance(event[i], pd.Timestamp) for i in [0, 1]]) ) ): raise TypeError( "Every event in the list must be a pandas Timestamp, " "or a 2-tuple of Timestamps." ) time_window_ends = [] # type: List[pd.Timestamp] time_window_type = [] # type: List[int] for time_window in event_list: if isinstance(time_window, tuple): if time_window[0] <= time_window[1]: time_window_ends.append(time_window[0]) time_window_type.append(+1) time_window_ends.append(time_window[1]) time_window_type.append(-1) else: time_window_ends.append(time_window) time_window_type.append(+1) time_window_ends.append(time_window) time_window_type.append(-1) time_window_end_series = pd.Series( time_window_type, index=pd.DatetimeIndex(time_window_ends), dtype=int ) # type: pd.Series time_window_end_series.sort_index(kind="mergesort", inplace=True) time_window_end_series = time_window_end_series.cumsum() status = 0 merged_event_list = ( [] ) # type: List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] for t, v in time_window_end_series.iteritems(): # type: pd.Timestamp, int if (status == 0) and (v > 0): start = t # type: pd.Timestamp status = 1 if (status == 1) and (v <= 0): end = t # type: pd.Timestamp merged_event_list.append([start, end]) status = 0 for i in range(1, len(merged_event_list)): this_start = merged_event_list[i][0] # type: pd.Timestamp this_end = merged_event_list[i][1] # type: pd.Timestamp last_start = merged_event_list[i - 1][0] # type: pd.Timestamp last_end = merged_event_list[i - 1][1] # type: pd.Timestamp if last_end + pd.Timedelta("1ns") >= this_start: merged_event_list[i] = [last_start, this_end] merged_event_list[i - 1] = None merged_event_list = [ w[0] if (w[0] == w[1] and not point_as_interval) else tuple(w) for w in merged_event_list if w is not None ] return merged_event_list
@overload def to_events( labels: pd.Series, freq_as_period: bool = True, merge_consecutive: Optional[bool] = None, ) -> List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]: ... @overload def to_events( # type: ignore labels: pd.DataFrame, freq_as_period: bool = True, merge_consecutive: Optional[bool] = None, ) -> Dict[str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]]: ...
[docs]def to_events( labels: Union[pd.Series, pd.DataFrame], freq_as_period: bool = True, merge_consecutive: Optional[bool] = None, ) -> Union[ List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], Dict[str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]], ]: """Convert binary label series to event list. Parameters ---------- labels: pandas Series or DataFrame Binary series of anomaly labels. If a DataFrame, each column is regarded as a type of anomaly independently. freq_as_period: bool, optional Whether to regard time index with regular frequency (i.e. attribute `freq` of time index is not None) as time intervals. For example, DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04', '2017-01-05'], dtype='datetime64[ns]', freq='D') has daily frequency. If freq_as_period=True, each time point in the index represents that day (24 hours). Otherwsie, each time point represents the instantaneous time instance of 00:00:00 on that day. Default: True. merge_consecutive: bool, optional Whether to merge consecutive events into a single time window. If not specified, it is on automatically if the input time index has a regular frequency and freq_as_period=True, and it is off otherwise. Default: None. Returns ------- list or dict - If input is a Series, output is a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval. - If input is a DataFrame, every column is treated as an independent binary series, and output is a dict where keys are column names and values are event lists. """ if isinstance(labels, pd.Series): labels = validate_series( labels, check_freq=False, check_categorical=False ) labels = labels == 1 if merge_consecutive is None: if freq_as_period and (labels.index.freq is not None): merge_consecutive = True else: merge_consecutive = False if not merge_consecutive: if freq_as_period and (labels.index.freq is not None): period_end = pd.date_range( start=labels.index[1], periods=len(labels.index), freq=labels.index.freq, ) - pd.Timedelta( "1ns" ) # type: pd.DatetimeIndex return [ (start, end) if start != end else start for start, end in zip( list(labels.index[labels]), list(period_end[labels]) ) ] else: return list(labels.index[labels]) else: labels_values = labels.values.astype(int).reshape( -1, 1 ) # type: np.ndarray mydiff = np.vstack( [ labels_values[0, :] - 0, np.diff(labels_values, axis=0), 0 - labels_values[-1, :], ] ) # type: np.ndarray starts = np.argwhere(mydiff == 1) # type: np.ndarray ends = np.argwhere(mydiff == -1) # type: np.ndarray if freq_as_period and (labels.index.freq is not None): period_end = pd.date_range( start=labels.index[1], periods=len(labels.index), freq=labels.index.freq, ) - pd.Timedelta("1ns") return [ (labels.index[start], period_end[end - 1]) if labels.index[start] != period_end[end - 1] else labels.index[start] for start, end in zip(starts[:, 0], ends[:, 0]) ] else: return [ (labels.index[start], labels.index[end - 1]) if start != end - 1 else labels.index[start] for start, end in zip(starts[:, 0], ends[:, 0]) ] else: if labels.columns.duplicated().any(): raise ValueError("Input DataFrame must have unique column names.") return { col: to_events(labels[col], freq_as_period, merge_consecutive) for col in labels.columns }
@overload def to_labels( lists: List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], time_index: pd.DatetimeIndex, freq_as_period: bool = True, ) -> pd.Series: ... @overload def to_labels( lists: Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ], time_index: pd.DatetimeIndex, freq_as_period: bool = True, ) -> pd.DataFrame: ...
[docs]def to_labels( lists: Union[ List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ], ], time_index: pd.DatetimeIndex, freq_as_period: bool = True, ) -> Union[pd.Series, pd.DataFrame]: """Convert event list to binary series along a time index. Parameters ---------- lists: list or dict A list of events, or a dict of lists of events. - If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval. - If dict, each key-value pair represents an independent list of events. time_index: pandas DatatimeIndex Time index to build the label series. freq_as_period: bool, optional Whether to regard time index with regular frequency (i.e. attribute `freq` of time index is not None) as time intervals. For example, DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04', '2017-01-05'], dtype='datetime64[ns]', freq='D') has daily frequency. If freq_as_period=True, each time piont represents that day, and that day will be marked positive if an event in the event list overlaps with the period of that day (24 hours). Otherwsie, each time point represents the instantaneous time instance of 00:00:00 on that day, and that time point will be marked positive if an event in the event list covers it. Default: True. Returns ------- pandas Series or DataFrame Series of binary labels. - If input is asingle list, the output is a Series. - If input is a dict of lists, the output is a DataFrame where each column corresponds a list in the dict. """ if not isinstance(time_index, pd.DatetimeIndex): raise TypeError("Time index must be a pandas DatetimeIndex object.") if not time_index.is_monotonic_increasing: raise ValueError("Time index must be monotoic increasing.") if isinstance(lists, list): labels = pd.Series(False, index=time_index) # type: pd.Series lists = validate_events(lists) if freq_as_period and (time_index.freq is not None): period_end = pd.date_range( start=time_index[1], periods=len(time_index), freq=time_index.freq, ) - pd.Timedelta( "1ns" ) # type: pd.DatetimeIndex for event in lists: isOverlapped = pd.Series( index=time_index, dtype=bool ) # type: pd.Series if isinstance(event, tuple): isOverlapped = (time_index <= event[1]) & ( period_end >= event[0] ) else: isOverlapped = (time_index <= event) & ( period_end >= event ) labels.loc[isOverlapped] = True else: for event in lists: if isinstance(event, tuple): labels.loc[ (labels.index >= event[0]) & (labels.index <= event[1]) ] = True else: labels.loc[labels.index == event] = True return labels elif isinstance(lists, dict): labels_df = pd.DataFrame( False, index=time_index, columns=lists.keys() ) # pd.DataFrame for col, key in zip(labels_df.columns, lists.keys()): labels_df[col] = to_labels(lists[key], time_index, freq_as_period) return labels_df else: raise TypeError("Argument `lists` must be a list or a dict.")
@overload def expand_events( lists: List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], left_expand: Union[pd.Timedelta, str, int] = 0, right_expand: Union[pd.Timedelta, str, int] = 0, freq_as_period: bool = True, ) -> List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]: ... @overload def expand_events( lists: Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ], left_expand: Union[pd.Timedelta, str, int] = 0, right_expand: Union[pd.Timedelta, str, int] = 0, freq_as_period: bool = True, ) -> Dict[str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]]: ... @overload def expand_events( lists: pd.Series, left_expand: Union[pd.Timedelta, str, int] = 0, right_expand: Union[pd.Timedelta, str, int] = 0, freq_as_period: bool = True, ) -> pd.Series: ... @overload def expand_events( # type:ignore lists: pd.DataFrame, left_expand: Union[pd.Timedelta, str, int] = 0, right_expand: Union[pd.Timedelta, str, int] = 0, freq_as_period: bool = True, ) -> pd.DataFrame: ...
[docs]def expand_events( # type:ignore events: Union[ List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], Dict[ str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] ], pd.Series, pd.DataFrame, ], left_expand: Union[pd.Timedelta, str, int] = 0, right_expand: Union[pd.Timedelta, str, int] = 0, freq_as_period: bool = True, ) -> Union[ List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]], Dict[str, List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]]], pd.Series, pd.DataFrame, ]: """Expand duration of events. Parameters ---------- events: list, dict, pandas Series, or pandas DataFrame Events to be expanded. - If list, a list of events where an event is a pandas Timestamp if it is instantaneous or a 2-tuple of pandas Timestamps if it is a closed time interval. - If dict, each key-value pair represents an independent list of events. - If pandas Series, it is binary where 1 represents events cover this time point. - If pandas DataFrame, each column is treated as an independent Series. left_expand: pandas Timedelta, str, or int, optional Time range to expand backward. - If str, it must be able to be converted into a pandas Timedelta object. - If int, it must be in nanosecond. Default: 0. right_expand: pandas Timedelta, str, or int, optional Time range to expand forward. - If str, it must be able to be converted into a pandas Timedelta object. - If int, it must be in nanosecond. Default: 0. freq_as_period: bool, optional Whether to regard time index with regular frequency (i.e. attribute `freq` of time index is not None) as time intervals. Only used when input events is pandas Series or DataFrame. For example, DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04', '2017-01-05'], dtype='datetime64[ns]', freq='D') has daily frequency. If freq_as_period=True, each time point in the index represents that day (24 hours). Otherwsie, each time point represents the instantaneous time instance of 00:00:00 on that day. Default: True. Returns ------- list, dict, pandas Series, or pandas DataFrame Expanded events. """ if not isinstance(left_expand, pd.Timedelta): left_expand = pd.Timedelta(left_expand) if not isinstance(right_expand, pd.Timedelta): right_expand = pd.Timedelta(right_expand) if isinstance(events, pd.Series): labels = validate_series(events) # type: pd.Series lists = to_events( labels, freq_as_period=freq_as_period ) # type:List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] expanded_lists = expand_events( # type:ignore events=lists, left_expand=left_expand, right_expand=right_expand ) # type:List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] expanded_labels = to_labels( lists=expanded_lists, time_index=labels.index, freq_as_period=freq_as_period, ) # type: pd.Series expanded_labels.loc[ (expanded_labels == False) & (labels.isna()) ] = float("nan") expanded_labels = expanded_labels.rename(labels.name) expanded_labels.index = labels.index return expanded_labels elif isinstance(events, pd.DataFrame): expanded_df = pd.concat( [ expand_events( s, left_expand=left_expand, right_expand=right_expand, freq_as_period=freq_as_period, ) for _, s in events.iteritems() ], axis=1, ) # type: pd.DataFrame return expanded_df elif isinstance(events, list): expanded_list = ( [] ) # type: List[Union[Tuple[pd.Timestamp, pd.Timestamp], pd.Timestamp]] for ano in events: if isinstance(ano, tuple): expanded_list.append( (ano[0] - left_expand, ano[1] + right_expand) ) else: expanded_list.append((ano - left_expand, ano + right_expand)) expanded_list = validate_events(expanded_list) return expanded_list elif isinstance(events, dict): return { key: expand_events(value, left_expand, right_expand) for key, value in events.items() } else: raise TypeError("Arugment `events` must be a list or a dict of lists.")
[docs]def split_train_test( ts: Union[pd.Series, pd.DataFrame], mode: int = 1, n_splits: int = 1, train_ratio: float = 0.7, ) -> List[ Tuple[Union[pd.Series, pd.DataFrame], Union[pd.Series, pd.DataFrame]] ]: """Split time series into training and testing set for cross validation. Parameters ---------- ts: pandas Series or DataFrame Time series to process. mode: int, optional The split mode to use. Choose from 1, 2, 3 and 4. 1. Divide time series into n_splits folds of equal length, split each fold into training and testing based on train_ratio. 2. Create n_splits folds, where each fold starts at t_0 and ends at t_(n/n_splits), where n goes from 0 to n_splits and the first train_ratio of the fold is for training. 3. Create n_splits folds, where each fold starts at t_0. Each fold has len(ts)/(1 + n_splits) test points at the end. Each fold is n * len(ts)/(1 + n_splits) long, where n ranges from 1 to n_splits. 4. Create n_splits folds, where each fold starts at t_0. Each fold has n * len(ts)/(1 + n_splits) training points at the beginning of the time series, where n ranges from 1 to n_splits and the remaining points are testing points. Default: 1. n_splits: int, optional Number of splits. Default: 1. train_ratio: float, optional Ratio between length of training series and each fold, only used by mode 1 and 2. Default: 0.7. Returns ------- list of 2-tuples (train, test) Splitted training and testing series. Examples -------- In the following description of the four modes, 1s represent positions assigned to training, 2s represent those assigned to testing, 0s are those not assigned. For a time series with length 40, if `n_splits=4`, `train_ratio=0.7`, - If `mode=1`: 1111111222000000000000000000000000000000 0000000000111111122200000000000000000000 0000000000000000000011111112220000000000 0000000000000000000000000000001111111222 - If `mode=2`: 1111111222000000000000000000000000000000 1111111111111122222200000000000000000000 1111111111111111111112222222220000000000 1111111111111111111111111111222222222222 - If `mode=3`: 1111111122222222000000000000000000000000 1111111111111111222222220000000000000000 1111111111111111111111112222222200000000 1111111111111111111111111111111122222222 - If `mode=4`: 1111111122222222222222222222222222222222 1111111111111111222222222222222222222222 1111111111111111111111112222222222222222 1111111111111111111111111111111122222222 """ if not isinstance(ts, (pd.DataFrame, pd.Series)): raise ValueError("Argument `ts` must be a pandas Series or DataFrame.") splits = [] if mode == 1: fold_len = round(len(ts) / n_splits) fold_pos = 0 for _ in range(n_splits - 1): splits.append( ( ts.iloc[ fold_pos : (fold_pos + round(fold_len * train_ratio)) ], ts.iloc[ (fold_pos + round(fold_len * train_ratio)) : ( fold_pos + fold_len ) ], ) ) fold_pos = fold_pos + fold_len splits.append( ( ts.iloc[ fold_pos : ( fold_pos + round((len(ts) - fold_pos) * train_ratio) ) ], ts.iloc[ (fold_pos + round((len(ts) - fold_pos) * train_ratio)) : ], ) ) elif mode == 2: for k_fold in range(n_splits - 1): fold_len = round(len(ts) / n_splits) * (k_fold + 1) splits.append( ( ts.iloc[: round(fold_len * train_ratio)], ts.iloc[round(fold_len * train_ratio) : fold_len], ) ) splits.append( ( ts.iloc[: round(len(ts) * train_ratio)], ts.iloc[round(len(ts) * train_ratio) : len(ts)], ) ) elif mode == 3: fold_len = round(len(ts) / (n_splits + 1)) for k_fold in range(n_splits - 1): splits.append( ( ts.iloc[: ((k_fold + 1) * fold_len)], ts.iloc[ ((k_fold + 1) * fold_len) : ((k_fold + 2) * fold_len) ], ) ) splits.append( ( ts.iloc[: (n_splits * fold_len)], ts.iloc[(n_splits * fold_len) :], ) ) elif mode == 4: fold_len = round(len(ts) / (n_splits + 1)) for k_fold in range(n_splits): splits.append( ( ts.iloc[: ((k_fold + 1) * fold_len)], ts.iloc[((k_fold + 1) * fold_len) :], ) ) else: raise ValueError("Argument `mode` must be one of 1, 2, 3, and 4.") return splits