"""
We don't typing the visualization module because there are a lot recursion on
nested tree structure which would be messy if we type rigorously."""
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas.plotting import register_matplotlib_converters
from ..data import to_events, to_labels, validate_events
register_matplotlib_converters()
[docs]def plot(
ts=None,
anomaly=None,
curve_group="each",
ts_linewidth=0.5,
ts_color=None,
ts_alpha=1.0,
ts_marker=".",
ts_markersize=2,
match_curve_name=True,
anomaly_tag="span",
anomaly_color=None,
anomaly_alpha=0.3,
anomaly_marker="o",
anomaly_markersize=4,
freq_as_period=True,
axes=None,
figsize=None,
legend=True,
):
"""Plot time series and/or anomalies.
Parameters
----------
ts: pandas Series or DataFrame, optional
Time series to plot.
anomaly: list, pandas Series, DataFrame, or (nested) dict of them, optional
Anomalies to plot.
- If list, a list of anomalous events (pandas Timestamp for an
instantaneous event or 2-tuple of pandas Timestamps for an interval);
- If pandas Series, a binary series indicating normal/anomalous;
- If pandas DataFrame, each column is treated independently as a binary
Series.
- If (nested) dict, every leaf node (list, Series, or DataFrame) is
treated independently as above.
curve_group: str or list, optional
Groups of curves to be drawn at same plots.
- If str, 'each' means every dimension is drawn in a separated plot,
'all' means all dimensions are drawn in the same plot.
- If list, each element corresponds to a subplot, which is the name of
time series to plot in this subplot, or a list of names. For example,
["A", ("B", "C")] means two subplots, where the first one contain
series A, while the second one contains series B and C.
Default: 'each'.
ts_linewidth: float or dict, optional
Line width of each time series curve.
- If float, all curves have the same line width.
- If dict, the key is series name, the value is line width of that
series.
Default: 0.5.
ts_color: str or dict, optional
Color of each time series curve.
- If str, all curves have the same color.
- If dict, the key is series name, the value is color of that series.
- If None, color will be assigned automatically.
Default: None.
ts_alpha: float or dict, optional
Opacity of each time series curve.
- If float, all curves have the same opacity.
- If dict, the key is series name, the value is opacity of that series.
Default: 1.0.
ts_marker: str or dict, optional
Marker type of each time series curve.
- If str, all curves have the same marker type.
- If dict, the key is series name, the value is marker type of that
series.
Default: ".".
ts_markersize: int or dict, optional
Marker size of each time series curve.
- If int, all curves have the same marker size.
- If dict, the key is series name, the value is marker size of that
series.
Default: 2.
match_curve_name: bool, optional
Whether to plot anomaly with corresponding curve by matching series
names. If False, plot anomaly with all curves.
Default: True.
anomaly_tag: str, or (nested) dict, optional
Plot anomaly as horizontal spans or markers on curves.
- If str, either 'span' or 'marker', all anomalies are marked with the
same type of tag.
- If (nested) dict, it must have a tree structure identical to or
smaller than that of (nested) dict argument `anomaly`, which can
define tags for all leaf nodes in `anomaly`.
Default: "span".
anomaly_color: str, or (nested) dict, optional
Color of each anomaly tag.
- If str, all anomalies are marked with the same color.
- If (nested) dict, it must have a tree structure identical to or
smaller than that of (nested) dict argument `anomaly`, which can
define colors for all leaf nodes in `anomaly`.
- If None, color will be assigned automatically.
Default: None.
anomaly_alpha: float, or (nested) dict, optional
Opacity of each anomaly tag. Only used for anomaly drawn as horizontal
spans.
- If float, all anomalies are marked with the same opacity.
- If (nested) dict, it must have a tree structure identical to or
smaller than that of (nested) dict argument `anomaly`, which can
define opacity for all leaf nodes in `anomaly`.
Default: 0.3.
anomaly_marker: str, or (nested) dict, optional
Marker type of each anomaly marker. Only used for anomaly drawn as
markers on curves.
- If str, all anomalies are marked with the same type of marker.
- If (nested) dict, it must have a tree structure identical to or
smaller than that of (nested) dict argument `anomaly`, which can
define marker types for all leaf nodes in `anomaly`.
Default: "o".
anomaly_markersize: int, or (nested) dict, optional
Marker size of each anomaly marker. Only used for anomaly drawn as
markers on curves.
- If int, all anomalies are marked with the same size of marker.
- If (nested) dict, it must have a tree structure identical to or
smaller than that of (nested) dict argument `anomaly`, which can
define marker sizes for all leaf nodes in `anomaly`.
Default: 4.
freq_as_period: bool, optional
Whether to regard time index with regular frequency (i.e. attribute
`freq` of time index is not None) as time intervals. Only used when
anomaly is given as binary series.
For example, DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03',
'2017-01-04', '2017-01-05'], dtype='datetime64[ns]', freq='D') has
daily frequency. If freq_as_period=True, each time point in the index
represents that day (24 hours). Otherwsie, each time point represents
the instantaneous time instance of 00:00:00 on that day.
Default: True.
axes: matplotlib Axes object, or array of Axes objects, optional
Axes to plot at. The number of Axes objects should be equal to the
number of plots. If not specified, figure axes will be automatically
generated. Default: None.
figsize: tuple, optional
Size of the figure. If not specified, the size of each subplot is 16 x
4. Default: None.
legend: bool, optional
Whether to show legend in the plot. Default: True.
Returns
--------
matplotlib Axes object or array of Axes objects
Axes where the plot(s) is drawn.
"""
# setup style
plt.style.use("seaborn-whitegrid")
# initialize color generator
color_generator = ColorGenerator()
# plot time series
if ts is not None:
# type check for ts
if isinstance(ts, pd.Series):
if ts.name is None:
df = ts.to_frame("Time Series")
else:
df = ts.to_frame()
elif isinstance(ts, pd.DataFrame):
df = ts.copy()
else:
raise TypeError(
"Argument `ts` must be a pandas Series or DataFrame."
)
# check series index
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError(
"Index of the input time series must be a pandas "
"DatetimeIndex object."
)
# check duplicated column names
if df.columns.duplicated().any():
raise ValueError("Input DataFrame must have unique column names.")
# set up curve groups
if curve_group == "each":
curve_group = list(df.columns)
elif curve_group == "all":
curve_group = [tuple(df.columns)]
# validate curve groups
curve2axes = _validate_curve_group(df, curve_group)
# set up default figure size
if figsize is None:
figsize = (16, 4 * len(curve_group))
# setup axes
if axes is None:
_, axes = plt.subplots(
nrows=len(curve_group), figsize=figsize, sharex=True
)
if not isinstance(axes, (list, np.ndarray)):
axes = [axes]
for ax in axes:
ax.xaxis_date()
# expand ts properties to a dict, if not yet
ts_color = _assign_properties(ts_color, df)
ts_linewidth = _assign_properties(ts_linewidth, df, 0.5)
ts_marker = _assign_properties(ts_marker, df, ".")
ts_markersize = _assign_properties(ts_markersize, df, 2)
ts_alpha = _assign_properties(ts_alpha, df, 1.0)
# plot curves
_plot_curve(
df,
axes,
curve2axes,
ts_color,
ts_linewidth,
ts_marker,
ts_markersize,
ts_alpha,
color_generator,
)
else: # no time series, just event
df = pd.DataFrame(dtype=int)
curve2axes = dict()
# never try to match curve name, because there is no curve anyway
match_curve_name = False
# never try to plot on curve, because there is no curve anyway
anomaly_tag = "span"
# setup figure
if figsize is None:
figsize = (16, 4)
# setup axes
if axes is None:
_, axes = plt.subplots(figsize=figsize)
if not isinstance(axes, (list, np.ndarray)):
axes = [axes]
for ax in axes:
ax.xaxis_date()
# plot anomaly
if anomaly is not None:
# validate anomaly
_validate_anomaly(anomaly)
# this is for showing a legend even if the series does not have a key
if isinstance(anomaly, (list, pd.Series)):
anomaly = {"Anomaly": anomaly}
# expand tree struct of anomaly properties to match that of `anomaly`
anomaly_tag = _assign_properties(anomaly_tag, anomaly, "span")
anomaly_color = _assign_properties(anomaly_color, anomaly)
anomaly_alpha = _assign_properties(anomaly_alpha, anomaly, 0.3)
anomaly_marker = _assign_properties(anomaly_marker, anomaly, "o")
anomaly_markersize = _assign_properties(anomaly_markersize, anomaly, 4)
# plot anomalies
_plot_anomaly(
anomaly,
axes,
df,
curve2axes,
anomaly_tag,
anomaly_color,
anomaly_marker,
anomaly_markersize,
anomaly_alpha,
match_curve_name,
freq_as_period,
color_generator,
)
# display legend
if legend and ((ts is not None) or (anomaly is not None)):
for ax in axes:
ax.legend()
return axes
def _validate_curve_group(df, curve_group):
"Validate curve group, and return inverse map."
curve2group = {col: set() for col in df.columns}
for ind, group in enumerate(curve_group):
if not isinstance(
group, (list, tuple)
): # this group has a single curve
if group in set(df.columns):
curve2group[group].add(ind)
else:
raise ValueError(
"{} is not a seriers in input `ts`.".format(group)
)
else:
for curve in group:
if curve in set(df.columns):
curve2group[curve].add(ind)
else:
raise ValueError(
"{} is not a seriers in input `ts`.".format(curve)
)
return curve2group
def _plot_curve(
df,
axes,
curve2axes,
ts_color,
ts_linewidth,
ts_marker,
ts_markersize,
ts_alpha,
color_generator,
):
"Plot all curves"
for col, axes_inds in curve2axes.items():
color = color_generator.emit(ts_color[col])
for axes_ind in axes_inds:
# df[col].plot(
# ax=axes[axes_ind],
# color=color,
# linewidth=ts_linewidth[col],
# marker=ts_marker[col],
# markersize=ts_markersize[col],
# alpha=ts_alpha[col],
# label=str(col),
# )
axes[axes_ind].plot_date(
df.index,
df[col],
fmt="-",
color=color,
linewidth=ts_linewidth[col],
marker=ts_marker[col],
markersize=ts_markersize[col],
alpha=ts_alpha[col],
label=str(col),
)
def _plot_anomaly(
anomaly,
axes,
df,
curve2axes,
anomaly_tag,
anomaly_color,
anomaly_marker,
anomaly_markersize,
anomaly_alpha,
match_curve_name,
freq_as_period,
color_generator,
anomaly_name=None,
anomaly_label=None,
):
if isinstance(anomaly, (list, pd.Series)):
anomaly_color = color_generator.emit(anomaly_color)
if anomaly_tag == "span":
# turn anomaly into list, if not yet
if isinstance(anomaly, pd.Series):
anomaly = to_events(anomaly, freq_as_period=freq_as_period)
anomaly = validate_events(anomaly, point_as_interval=True)
if match_curve_name and (
anomaly_name in set(df.columns)
): # match found, plot on it
for axes_ind in curve2axes[anomaly_name]:
_add_anomaly_list_to_axes(
anomaly,
axes[axes_ind],
anomaly_color,
anomaly_alpha,
(
anomaly_label
if (anomaly_label != anomaly_name)
else "Anomaly - {}".format(anomaly_name)
),
)
else: # not match found or don't match, plot on all
for ax in axes:
_add_anomaly_list_to_axes(
anomaly,
ax,
anomaly_color,
anomaly_alpha,
(
"Anomaly - {}".format(anomaly_name)
if (
(anomaly_label == anomaly_name)
and (anomaly_name in set(df.columns))
)
else anomaly_label
),
)
elif anomaly_tag == "marker":
# turn anomaly into binary series, if not yet
if isinstance(anomaly, list):
anomaly = to_labels(
anomaly, df.index, freq_as_period=freq_as_period
)
else:
try:
pd.testing.assert_index_equal(
anomaly.index, df.index, check_names=False
)
except AssertionError:
raise ValueError(
"Series index in argument `anomaly` must be the same "
"as the input time series."
)
if match_curve_name and (
anomaly_name in set(df.columns)
): # match found, plot on it
for axes_ind in curve2axes[anomaly_name]:
_add_anomaly_series_to_curve(
anomaly,
axes[axes_ind],
df[anomaly_name],
anomaly_color,
anomaly_marker,
anomaly_markersize,
(
anomaly_label
if (anomaly_label != anomaly_name)
else "Anomaly - {}".format(anomaly_name)
),
)
else: # not match found or don't match, plot on all
# hasLegend is an auxilary variable to make sure an anomaly
# series only appears once in legend in an axes
hasLegend = [False] * len(axes)
for curve, axes_inds in curve2axes.items():
for axes_ind in axes_inds:
_add_anomaly_series_to_curve(
anomaly,
axes[axes_ind],
df[curve],
anomaly_color,
anomaly_marker,
anomaly_markersize,
(
"Anomaly - {}".format(anomaly_name)
if (
(anomaly_label == anomaly_name)
and (anomaly_name in set(df.columns))
)
else anomaly_label
)
if not hasLegend[axes_ind]
else None,
)
hasLegend[axes_ind] = True
else:
raise ValueError(
"An anomaly tag must be either 'span' or 'marker'."
)
elif isinstance(anomaly, (pd.DataFrame, dict)):
for key in (
anomaly.columns
if isinstance(anomaly, pd.DataFrame)
else anomaly.keys()
):
_plot_anomaly(
anomaly[key],
axes,
df,
curve2axes,
anomaly_tag[key],
anomaly_color[key],
anomaly_marker[key],
anomaly_markersize[key],
anomaly_alpha[key],
match_curve_name,
freq_as_period,
color_generator,
anomaly_name=key,
anomaly_label=(
"{} - {}".format(anomaly_name, key)
if (anomaly_name is not None)
else key
),
)
else:
raise TypeError(
"Argument `anomaly` must be a list, pandas Series, DataFrame, or "
"a (nested) dict of them."
)
def _add_anomaly_list_to_axes(
anomaly, ax, anomaly_color, anomaly_alpha, anomaly_label
):
"Add a list of anomalous event to an axes as spans"
for i, event in enumerate(anomaly):
ax.axvspan(
xmin=event[0],
xmax=event[1],
color=anomaly_color,
alpha=anomaly_alpha,
label=(anomaly_label if i == 0 else None),
)
def _add_anomaly_series_to_curve(
anomaly,
ax,
s,
anomaly_color,
anomaly_marker,
anomaly_markersize,
anomaly_label,
):
"Add anomalies represented by a binary series as markers on a curve"
anomaly_curve = s.loc[anomaly == 1]
# anomaly_curve.plot(
# ax=ax,
# linewidth=0,
# marker=anomaly_marker,
# markersize=anomaly_markersize,
# color=anomaly_color,
# label=anomaly_label,
# )
ax.plot_date(
anomaly_curve.index,
anomaly_curve,
fmt="-",
linewidth=0,
marker=anomaly_marker,
markersize=anomaly_markersize,
color=anomaly_color,
label=anomaly_label,
)
def _validate_anomaly(anomaly):
"Validate argument `anomaly`."
if isinstance(anomaly, (list, pd.Series)):
pass
elif isinstance(anomaly, pd.DataFrame):
if anomaly.columns.duplicated().any():
raise ValueError(
"DataFrame in argument `anomaly` must have unique column names."
)
elif isinstance(anomaly, dict):
for _, value in anomaly.items():
_validate_anomaly(value)
else:
raise TypeError(
"Argument `anomaly` must be a list, pandas Series, DataFrame, or "
"a (nested) dict of them."
)
def _assign_properties(prop, anomaly, default=None):
"Expand the tree structure of `prop` to that of `anomaly`"
if (not isinstance(prop, dict)) and isinstance(
anomaly, (dict, pd.DataFrame)
):
return {
key: _assign_properties(prop, anomaly[key])
for key in (
anomaly.keys()
if isinstance(anomaly, dict)
else anomaly.columns
)
}
elif (not isinstance(prop, dict)) and (
not isinstance(anomaly, (dict, pd.DataFrame))
):
return prop
elif isinstance(prop, dict) and (
not isinstance(anomaly, (dict, pd.DataFrame))
):
raise ValueError("Property dict and anomaly dict are inconsistent.")
else: # isinstance(prop, dict) & isinstance(anomaly, (dict, pd.DataFrame))
if set(prop.keys()) <= set(
anomaly.keys() if isinstance(anomaly, dict) else anomaly.columns
):
return {
key: _assign_properties(
(prop[key] if (key in prop.keys()) else default),
anomaly[key],
)
for key in (
anomaly.keys()
if isinstance(anomaly, dict)
else anomaly.columns
)
}
else:
raise ValueError(
"Property dict and anomaly dict are inconsistent."
)
class ColorGenerator:
"""
Generate color
"""
def __init__(self):
self.latest_auto_color = -1
def emit(self, color=None):
if color is not None:
return color
else:
self.latest_auto_color += 1
return "C{}".format(self.latest_auto_color)