代码较为简单。
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt
df_stats_Ch1_test2 = pd.read_csv("estadisticos_test2_ch1.csv" , sep = ',')
X_Ch1 = df_stats_Ch1_test2[['Min', 'Max', 'Kurt', 'ImpFactor', 'RMS', 'MargFactor', 'Skewness',
'ShapeFactor', 'PeakToPeak', 'CrestFactor']].values
from sklssearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler.fit(X_Ch1)
X_Ch1 = scaler.transform(X_Ch1)
X_Ch1_primerCuarto = X_Ch1[:int(len(X_Ch1)/4)]
encoder = keras.Sequential(name='encoder')
encoder.add(layers.Dense(units=10, activation = 'relu'))
encoder.add(layers.Dense(units=10, activation = 'relu'))
encoder.add(layers.Dense(units=5, activation = 'relu'))
decoder = keras.Sequential(name='decoder')
decoder.add(layers.Dense(units=5, activation = 'relu'))
decoder.add(layers.Dense(units=10, activation = 'relu'))
decoder.add(layers.Dense(units=10, activation = 'tanh'))
autoencoder = keras.Sequential([encoder, decoder])
autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss="mse")
from numpy.random import seed
seed(1)
history = autoencoder.fit(
X_Ch1_primerCuarto,
X_Ch1_primerCuarto,
epochs=1000,
batch_size=128,
validation_split=0.1
#callbacks=[
# keras.callbacks.EarlyStopping(monitor="val_loss", patience=20, mode="min")
#],
)
x_train = X_Ch1_primerCuarto
x_train_pred = autoencoder.predict(x_train)
x_train_pred
array([[ 0.35000628, -0.17480692, -0.24695596, ..., -0.5259716 ,
-0.24781543, 0.99913836],
[ 0.3868077 , -0.31456742, -0.31951994, ..., -0.49435195,
-0.36288378, 0.00516775],
[ 0.28412798, -0.18568926, -0.29336867, ..., -0.58107275,
-0.2037016 , 0.9999859 ],
...,
[ 0.3468851 , -0.39986593, -0.4437119 , ..., -0.610753 ,
-0.36369088, 0.92058384],
[ 0.39271683, -0.3842552 , -0.39312315, ..., -0.54627967,
-0.39505664, -0.22686467],
[ 0.37656352, -0.37563044, -0.38502413, ..., -0.53825307,
-0.38425663, -0.05728036]], dtype=float32)
x_test = X_Ch1
x_test_pred = autoencoder.predict(x_test)
error_test = np.abs(x_test - x_test_pred)
error_test
params = ['Min', 'Max', 'Kurt', 'ImpFactor', 'RMS', 'MargFactor', 'Skewness',
'ShapeFactor', 'PeakToPeak', 'CrestFactor']
error_min = error_test[:, 0]
error_max = error_test[:, 1]
error_kurt = error_test[:, 2]
error_if = error_test[:, 3]
error_rms = error_test[:, 4]
error_mf = error_test[:, 5]
error_skew = error_test[:, 6]
error_sf = error_test[:, 7]
error_ptp = error_test[:, 8]
error_cf = error_test[:, 9]
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6), (ax7, ax8), (ax9, ax10)) = plt.subplots(nrows=5, ncols=2, figsize=(20, 30))
sns.distplot(
error_min,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax1
)
ax1.set_title('Distribution of reconstruction errors - Min (Autoenconders)')
ax1.set_xlabel('Reconstruction errors');
sns.distplot(
error_max,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax2
)
ax2.set_title('Distribution of reconstruction errors - Max (Autoenconders)')
ax2.set_xlabel('Reconstruction errors');
sns.distplot(
error_kurt,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax3
)
ax3.set_title('Distribution of reconstruction errors - Kurtosis (Autoenconders)')
ax3.set_xlabel('Reconstruction errors');
sns.distplot(
error_if,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax4
)
ax4.set_title('Distribution of reconstruction errors - Impulse Factor (Autoenconders)')
ax4.set_xlabel('Reconstruction errors');
sns.distplot(
error_rms,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax5
)
ax5.set_title('Distribution of reconstruction errors - RMS (Autoenconders)')
ax5.set_xlabel('Reconstruction errors');
sns.distplot(
error_mf,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax6
)
ax6.set_title('Distribution of reconstruction errors - Margin Factor (Autoenconders)')
ax6.set_xlabel('Reconstruction errors');
sns.distplot(
error_skew,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax7
)
ax7.set_title('Distribution of reconstruction errors - Skewness (Autoenconders)')
ax7.set_xlabel('Reconstruction errors');
sns.distplot(
error_sf,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax8
)
ax8.set_title('Distribution of reconstruction errors - Shape Factor (Autoenconders)')
ax8.set_xlabel('Reconstruction errors');
sns.distplot(
error_ptp,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax9
)
ax9.set_title('Distribution of reconstruction errors - Peal to Peak (Autoenconders)')
ax9.set_xlabel('Reconstruction errors');
sns.distplot(
error_cf,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax10
)
ax10.set_title('Distribution of reconstruction errors - Crest Factor (Autoenconders)')
ax10.set_xlabel('Reconstruction errors');
ax1.set_xlim([-1, 12])
ax2.set_xlim([-1, 12])
ax3.set_xlim([-1, 12])
ax4.set_xlim([-1, 12])
ax5.set_xlim([-1, 12])
ax6.set_xlim([-1, 12])
ax7.set_xlim([-1, 12])
ax8.set_xlim([-1, 12])
ax9.set_xlim([-1, 12])
ax10.set_xlim([-1, 12])
X_primerCuarto = X_Ch1[:int(len(X_Ch1)/4)]
# Reconstrucciopnes
reconstruccion_train = autoencoder.predict(X_primerCuarto)
reconstruccion = autoencoder.predict(X_Ch1)
# RMSE:
error_reconstruccion_train = np.sqrt(((reconstruccion_train - X_primerCuarto) ** 2).mean(axis=1))
error_reconstruccion = np.sqrt(((reconstruccion - X_Ch1) ** 2).mean(axis=1))
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(15, 6))
sns.distplot(
error_reconstruccion_train,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax1
)
ax1.set_title('Distribution of reconstruction errors (Autoencoders) - Train')
ax1.set_xlabel('Reconstruction error');
sns.distplot(
error_reconstruccion,
hist = False,
rug = False,
color = 'red',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax2
)
ax2.set_title('Distribution of reconstruction errors (Autoencoders) - Complete signal')
ax2.set_xlabel('Reconstruction error');
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(15, 6))
sns.distplot(
error_reconstruccion,
hist = False,
rug = False,
color = 'red',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax,
label = 'Complete signal'
)
sns.distplot(
error_reconstruccion_train,
hist = False,
rug = False,
color = 'blue',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax,
label = 'Train'
)
ax.set_title('Distribution of reconstruction errors (Autoencoders) - Train vs Complete signal')
ax.set_xlabel('Reconstruction error');
ax.legend()
from sklearn.mixture import GaussianMixture
gm = GaussianMixture(n_components=2, random_state=33).fit(error_reconstruccion[int(len(error_reconstruccion)/4):].reshape(-1, 1))
gm.means_
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(9, 6))
sns.distplot(
error_reconstruccion[int(len(error_reconstruccion)/4):],
hist = False,
rug = False,
color = 'orange',
kde_kws = {'shade': True, 'linewidth': 1},
ax = ax
)
ax.axvline(gm.means_[0], ls = '--', color = 'black')
ax.annotate(str(round(gm.means_[0][0],8)), xy=(0.16, 0.5), xytext=(2, 0.6),
arrowprops=dict(facecolor='black', shrink=0.05)
)
ax.axvline(gm.means_[1], ls = ':', color = 'black')
ax.annotate(str(round(gm.means_[1][0],8)), xy=(1.28, 0.1), xytext=(2.8, 0.2),
arrowprops=dict(facecolor='black', shrink=0.05),
)
ax.set_title('Distribution of reconstruction errors (Autoenders) - Complete signal except the first quarter')
ax.set_xlabel('Reconstruction error');
pred_GM = [0] * int(len(error_reconstruccion)/4) # El primer cuarto lo suponemos normal
pred_GM_3cuartos = gm.predict(error_reconstruccion[int(len(error_reconstruccion)/4):].reshape(-1, 1))
for i in range(len(pred_GM_3cuartos)):
pred_GM.append(pred_GM_3cuartos[i])
pred_GM = np.array(pred_GM)
colores = ["#00cc44", "#f73e05"]
n_signal = list(range(len(pred_GM)))
n_signal = np.array(n_signal)
signals_0 = n_signal[pred_GM == 0]
error_rec_0 = error_reconstruccion[pred_GM == 0]
signals_1 = n_signal[pred_GM == 1]
error_rec_1 = error_reconstruccion[pred_GM == 1]
plt.figure(figsize=(9,6))
plt.scatter(signals_0, error_rec_0, c = "#00cc44", label = 'Normal')
plt.scatter(signals_1, error_rec_1, c = "#f73e05", label = 'Anomalies')
plt.title('Reconstruction error (Autoencoders) - Ch1 test2')
plt.xlabel('Signal')
plt.ylabel('Error')
plt.legend()
pred_GM = gm.predict(error_reconstruccion.reshape(-1, 1))
comienzo_1hora_anomalias = 'NA'
for i in range(len(pred_GM)):
if pred_GM[i:i+6].all():
comienzo_1hora_anomalias = i
break
pred_GM_1hora_anomalias = [0] * comienzo_1hora_anomalias + [1] * (len(pred_GM) - comienzo_1hora_anomalias)
colores = ["#00cc44", "#f73e05"]
x = np.arange(-10, len(df_stats_Ch1_test2)+10, 0.02)
n_signal = list(range(len(pred_GM_1hora_anomalias)))
plt.figure(figsize=(9,6))
plt.scatter(n_signal, error_reconstruccion, c = np.take(colores, pred_GM_1hora_anomalias))
plt.axvline(comienzo_1hora_anomalias, color = 'r', label = 'Beginning of anomalies')
plt.fill_between(x, min(error_reconstruccion)-0.5, max(error_reconstruccion)+1, where = x < comienzo_1hora_anomalias,
facecolor = 'green', alpha = 0.2, label = 'Normal')
plt.fill_between(x, min(error_reconstruccion)-0.5, max(error_reconstruccion)+1, where = x > comienzo_1hora_anomalias,
facecolor = 'red', alpha = 0.5, label = 'Anomalies ')
plt.title('Reconstruction error (Autoencoders) - Ch1 test2')
plt.xlabel('Signal')
plt.ylabel('Error')
plt.legend(loc = 2)
Z-Scores
mean = np.mean(error_reconstruccion_train)
std = np.std(error_reconstruccion_train)
zscore = (error_reconstruccion - mean) / std
threshold = 3
outlier = [0] * len(error_reconstruccion_train)
for i in range(len(error_reconstruccion_train), len(error_reconstruccion)):
z = (error_reconstruccion[i] - mean) / std
if abs(z) > threshold:
outlier.append(1)
else:
outlier.append(0)
outlier = np.array(outlier)
n_signal = list(range(len(error_reconstruccion)))
n_signal = np.array(n_signal)
signals_0 = n_signal[outlier == 0]
error_rec_0 = error_reconstruccion[outlier == 0]
signals_1 = n_signal[outlier == 1]
error_rec_1 = error_reconstruccion[outlier == 1]
plt.figure(figsize=(9,6))
plt.scatter(signals_0, error_rec_0, c = "#00cc44", label = 'Normal')
plt.scatter(signals_1, error_rec_1, c = "#f73e05", label = 'Anomalies')
plt.title('Reconstruction error (Autoencoders) - Ch1 test2')
plt.xlabel('Signal')
plt.ylabel('Error')
plt.legend()
z = (error_reconstruccion - mean) / std
comienzo_1hora_ouliers = 'NA'
for i in range(len(error_reconstruccion_train), len(error_reconstruccion)):
if (abs(z[i:i+6]) > threshold).all():
comienzo_1hora_ouliers = i
break
colores = ["#00cc44", "#f73e05"]
zscores_1hora_anomalias = [0] * comienzo_1hora_ouliers + [1] * (len(z) - comienzo_1hora_ouliers)
x = np.arange(-10, len(df_stats_Ch1_test2) + 10, 0.02)
n_signal = list(range(len(zscores_1hora_anomalias)))
plt.figure(figsize=(9,6))
plt.scatter(n_signal, error_reconstruccion, c = np.take(colores, zscores_1hora_anomalias))
plt.axvline(comienzo_1hora_ouliers, color = 'r', label = 'Beginning of anomalies')
plt.fill_between(x, min(error_reconstruccion)-0.5, max(error_reconstruccion)+1, where = x < comienzo_1hora_ouliers,
facecolor = 'green', alpha = 0.2, label = 'Normal')
plt.fill_between(x, min(error_reconstruccion)-0.5, max(error_reconstruccion)+1, where = x > comienzo_1hora_ouliers,
facecolor = 'red', alpha = 0.5, label = 'Anomalies ')
plt.title('Reconstruction error (Autoencoders) - Ch1 test2')
plt.xlabel('Signal')
plt.ylabel('Error')
plt.legend(loc = 2)
知乎学术咨询:
https://www.zhihu.com/consult/people/792359672131756032?isMe=1
工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。