lrhao:时间序列中的异常检测&孤立森林&异常可视化介绍
指标值的突然上升或下降是一种异常行为,这两种情况都需要注意。如果我们在建模之前就有异常行为的信息,那么异常检测可以通过监督学习算法来解决,但在没有反馈的情况下,最初很难识别这些点。因此,我们可以使用孤立森林(Isolation Forest)、支持向量机和LSTM等算法将其建模为一个无监督问题。下面使用孤立森林识别异常点。
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory
import warnings
warnings.filterwarnings('ignore')
import os
print(os.listdir("../input"))
# Any results you write to the current directory are saved as output.
这里的数据是一个用例(如收益、流量等),每天有12个指标。我们必须首先确定在用例级别上是否存在异常。然后,为了获得更好的可操作性,我们深入到单个指标,并识别其中的异常情况。
df=pd.read_csv("../input/metric_data.csv")
df.head()
现在在数据框中做一个透视,来创建一个数据框,在一个日期级别上包含所有指标。
metrics_df=pd.pivot_table(df,values='actuals',index='load_date',columns='metric_name')
metrics_df.head()
将数据框展平,并填充nan值为0
metrics_df.reset_index(inplace=True)
metrics_df.fillna(0,inplace=True)
metrics_df.head()
定义孤立森林并指定参数:
隔离森林试图分离数据中的每个点。在2D的情况下,它随机创建一条线,并试图选出一个点。在这种情况下,一个异常点只需几步就可以分离出来,而距离较近的正常点则需要相当多的步骤才能分离出来。
这里举例几个重要的参数。Contamination在这里是一个重要的参数,但是没有为它指定任何值,因为它是无监督的,我们没有关于异常值百分比的信息。
这里还可以通过在2D绘图中使用离群值验证其结果时的试错来指定它,或者如果数据是有监督的,则使用该信息来指定它,代表数据中离群点的百分比。
这里使用sklearn中自带的孤立森林,因为它是一个只有几个月数据的小数据集,而最近h2o的孤立森林也可用,它在高容量数据集上更可扩展,值得探索。
这个算法的更多细节可以参考这篇论文:https://cs.nju.edu.cn/zhouzh/zhouzh.files/publication/icdm08b.pdf
h2o孤立森林更多细节可以参考这个github链接:https://github.com/h2oai/h2o-tutorials/tree/master/tutorials/isolation-forest
metrics_df.columns
Index(['load_date', 'metric_1', 'metric_10', 'metric_11', 'metric_12',
'metric_2', 'metric_3', 'metric_4', 'metric_5', 'metric_6', 'metric_7',
'metric_8', 'metric_9'],
dtype='object', name='metric_name')
#specify the 12 metrics column names to be modelled
to_model_columns=metrics_df.columns[1:13]
from sklearn.ensemble import IsolationForest
clf=IsolationForest(n_estimators=100, max_samples='auto', \
max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0)
clf.fit(metrics_df[to_model_columns])
IsolationForest(behaviour='old', bootstrap=False, contamination='legacy',
max_features=1.0, max_samples='auto', n_estimators=100, n_jobs=-1,
random_state=42, verbose=0)
pred = clf.predict(metrics_df[to_model_columns])
metrics_df['anomaly']=pred
outliers=metrics_df.loc[metrics_df['anomaly']==-1]
outlier_index=list(outliers.index)
#print(outlier_index)
#Find the number of anomalies and normal points here points classified -1 are anomalous
print(metrics_df['anomaly'].value_counts())
1 109
-1 12
Name: anomaly, dtype: int64
/opt/conda/lib/python3.6/site-packages/sklearn/ensemble/iforest.py:417: DeprecationWarning: threshold_ attribute is deprecated in 0.20 and will be removed in 0.22.
" be removed in 0.22.", DeprecationWarning)
现在我们有了12个指标根据孤立森林的情况对异常情况进行了分类。我们将尝试将结果可视化,并检查分类是否有意义。
将指标归一化并拟合到PCA上,以减少维数,然后以3D方式将其绘制出来,突出显示异常。
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D
pca = PCA(n_components=3) # Reduce to k=3 dimensions
scaler = StandardScaler()
#normalize the metrics
X = scaler.fit_transform(metrics_df[to_model_columns])
X_reduce = pca.fit_transform(X)
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.set_zlabel("x_composite_3")
# Plot the compressed data points
ax.scatter(X_reduce[:, 0], X_reduce[:, 1], zs=X_reduce[:, 2], s=4, lw=1, label="inliers",c="green")
# Plot x's for the ground truth outliers
ax.scatter(X_reduce[outlier_index,0],X_reduce[outlier_index,1], X_reduce[outlier_index,2],
lw=2, s=60, marker="x", c="red", label="outliers")
ax.legend()
plt.show()
我们可以看到3D点,异常点大多是集群正常点,但一个2D点将帮助我们更好地判断。接下来,我们试着把同样的绘制成缩小到二维的主成分分析。
from sklearn.decomposition import PCA
pca = PCA(2)
pca.fit(metrics_df[to_model_columns])
res=pd.DataFrame(pca.transform(metrics_df[to_model_columns]))
Z = np.array(res)
figsize=(12, 7)
plt.figure(figsize=figsize)
plt.title("IsolationForest")
plt.contourf( Z, cmap=plt.cm.Blues_r)
b1 = plt.scatter(res[0], res[1], c='blue',
s=40,label="normal points")
b1 = plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='red',
s=40, edgecolor="red",label="predicted outliers")
plt.legend(loc="upper right")
plt.show()
因此,2D绘图为我们提供了一幅清晰的画面,表明算法正确地分类了用例中的异常点。
异常用红色边缘突出显示,正常点用绿色点表示。
在这里,Contamination参数起着很大的作用。我们的想法是捕获系统中所有的异常点。因此,最好是识别几个可能是正常的异常点(假阳性),但不要错过捕捉异常点(真阴性)。(所以我指定了12%作为Contamination,这取决于具体用例)
#Installing specific version of plotly to avoid Invalid property for color error in recent version which needs change in layout
!pip install plotly==2.7.0
现在我们已经发现了用例级别的异常行为。但是,要对异常采取行动,重要的是识别并提供信息,单独说明哪些指标标准是异常的。
当业务用户可以直观地看到(突然的下降/峰值)算法识别的异常时,就可以对其采取行动。所以在这个过程中,创造一个好的视觉效果也同样重要。
这个函数在时间序列上创建实际绘图,并在其上突出显示异常点。还有一个表,它提供了实际数据、更改和基于异常的条件格式化。
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.plotly as py
import matplotlib.pyplot as plt
from matplotlib import pyplot
import plotly.graph_objs as go
init_notebook_mode(connected=True)
def plot_anomaly(df,metric_name):
df.load_date = pd.to_datetime(df['load_date'].astype(str), format="%Y%m%d")
dates = df.load_date
#identify the anomaly points and create a array of its values for plot
bool_array = (abs(df['anomaly']) > 0)
actuals = df["actuals"][-len(bool_array):]
anomaly_points = bool_array * actuals
anomaly_points[anomaly_points == 0] = np.nan
#A dictionary for conditional format table based on anomaly
color_map = {0: "'rgba(228, 222, 249, 0.65)'", 1: "yellow", 2: "red"}
#Table which includes Date,Actuals,Change occured from previous point
table = go.Table(
domain=dict(x=[0, 1],
y=[0, 0.3]),
columnwidth=[1, 2],
# columnorder=[0, 1, 2,],
header=dict(height=20,
values=[[&#39;<b>Date</b>&#39;], [&#39;<b>Actual Values </b>&#39;], [&#39;<b>% Change </b>&#39;],
],
font=dict(color=[&#39;rgb(45, 45, 45)&#39;] * 5, size=14),
fill=dict(color=&#39;#d562be&#39;)),
cells=dict(values=[df.round(3)[k].tolist() for k in [&#39;load_date&#39;, &#39;actuals&#39;, &#39;percentage_change&#39;]],
line=dict(color=&#39;#506784&#39;),
align=[&#39;center&#39;] * 5,
font=dict(color=[&#39;rgb(40, 40, 40)&#39;] * 5, size=12),
# format = [None] + [&#34;,.4f&#34;] + [&#39;,.4f&#39;],
# suffix=[None] * 4,
suffix=[None] + [&#39;&#39;] + [&#39;&#39;] + [&#39;%&#39;] + [&#39;&#39;],
height=27,
fill=dict(color=[test_df[&#39;anomaly_class&#39;].map(color_map)],#map based on anomaly level from dictionary
)
))
#Plot the actuals points
Actuals = go.Scatter(name=&#39;Actuals&#39;,
x=dates,
y=df[&#39;actuals&#39;],
xaxis=&#39;x1&#39;, yaxis=&#39;y1&#39;,
mode=&#39;line&#39;,
marker=dict(size=12,
line=dict(width=1),
color=&#34;blue&#34;))
#Highlight the anomaly points
anomalies_map = go.Scatter(name=&#34;Anomaly&#34;,
showlegend=True,
x=dates,
y=anomaly_points,
mode=&#39;markers&#39;,
xaxis=&#39;x1&#39;,
yaxis=&#39;y1&#39;,
marker=dict(color=&#34;red&#34;,
size=11,
line=dict(
color=&#34;red&#34;,
width=2)))
axis = dict(
showline=True,
zeroline=False,
showgrid=True,
mirror=True,
ticklen=4,
gridcolor=&#39;#ffffff&#39;,
tickfont=dict(size=10))
layout = dict(
width=1000,
height=865,
autosize=False,
title=metric_name,
margin=dict(t=75),
showlegend=True,
xaxis1=dict(axis, **dict(domain=[0, 1], anchor=&#39;y1&#39;, showticklabels=True)),
yaxis1=dict(axis, **dict(domain=[2 * 0.21 + 0.20, 1], anchor=&#39;x1&#39;, hoverformat=&#39;.2f&#39;)))
fig = go.Figure(data=[table, anomalies_map, Actuals], layout=layout)
iplot(fig)
pyplot.show()
#return res
一个helper函数来查找百分比变化,根据严重程度对异常进行分类。
预测函数根据决策函数的结果,对数据进行异常分类。如果企业需要发现下一个可能产生影响的异常,可以使用这个来识别这些点。
前12个分位数为识别异常(高严重性),根据决策函数识别12-24个分位数点,将其分类为低严重性异常。
def classify_anomalies(df,metric_name):
df[&#39;metric_name&#39;]=metric_name
df = df.sort_values(by=&#39;load_date&#39;, ascending=False)
#Shift actuals by one timestamp to find the percentage chage between current and previous data point
df[&#39;shift&#39;] = df[&#39;actuals&#39;].shift(-1)
df[&#39;percentage_change&#39;] = ((df[&#39;actuals&#39;] - df[&#39;shift&#39;]) / df[&#39;actuals&#39;]) * 100
#Categorise anomalies as 0-no anomaly, 1- low anomaly , 2 - high anomaly
df[&#39;anomaly&#39;].loc[df[&#39;anomaly&#39;] == 1] = 0
df[&#39;anomaly&#39;].loc[df[&#39;anomaly&#39;] == -1] = 2
df[&#39;anomaly_class&#39;] = df[&#39;anomaly&#39;]
max_anomaly_score = df[&#39;score&#39;].loc[df[&#39;anomaly_class&#39;] == 2].max()
medium_percentile = df[&#39;score&#39;].quantile(0.24)
df[&#39;anomaly_class&#39;].loc[(df[&#39;score&#39;] > max_anomaly_score) & (df[&#39;score&#39;] <= medium_percentile)] = 1
return df
识别单个指标的异常并绘制结果。
X轴-日期,Y轴-实际值和异常点。
指标的实际值显示在蓝线中,异常点以红点突出显示。在表中,背景红色表示高异常,黄色表示低异常。
import warnings
warnings.filterwarnings(&#39;ignore&#39;)
for i in range(1,len(metrics_df.columns)-1):
clf.fit(metrics_df.iloc[:,i:i+1])
pred = clf.predict(metrics_df.iloc[:,i:i+1])
test_df=pd.DataFrame()
test_df[&#39;load_date&#39;]=metrics_df[&#39;load_date&#39;]
#Find decision function to find the score and classify anomalies
test_df[&#39;score&#39;]=clf.decision_function(metrics_df.iloc[:,i:i+1])
test_df[&#39;actuals&#39;]=metrics_df.iloc[:,i:i+1]
test_df[&#39;anomaly&#39;]=pred
#Get the indexes of outliers in order to compare the metrics with use case anomalies if required
outliers=test_df.loc[test_df[&#39;anomaly&#39;]==-1]
outlier_index=list(outliers.index)
test_df=classify_anomalies(test_df,metrics_df.columns)
plot_anomaly(test_df,metrics_df.columns)
从图中,我们能够捕捉到指标的突然峰值和低谷,并将它们投射出来。
此外,条件格式的表(可以运行代码获取,这里太占用篇幅没有展现)还可以让我们了解一些情况,比如数据不存在(值为零),这可能是数据处理过程中pipeline破裂的潜在结果,需要修复,并突出显示高级别和低级别异常。
如何使用呢?
- 如果当前时间戳对于用例来说是异常的,那么深入到指标,找出时间戳中有高度异常的指标集,以便在其上执行PCA。
- 此外,业务用户的反馈可以更新回数据中,这将有助于将其转换为监督/半监督学习问题,并比较其结果。
- 这里的一种增强是将不断发生的反常行为结合起来。如,大促销日(游戏绑定:可能会导致数天内的指标飙升)可以显示为单一行为。
如果这对你有用,点赞关注一键三连呀,支持我的工作。 |