基于自编码器的心电图信号异常检测(Python)

使用的数据集来自PTB心电图数据库,包括14552个心电图记录,包括两类:正常心跳和异常心跳,采样频率为125Hz。

import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
import os
isGPU = tf.config.list_physical_devices('GPU')
directory_path = r'ECG Heartbeat Categorization Dataset'
for dirname, _, filenames in os.walk(directory_path):
    for filename in filenames:
        print(os.path.join(dirname, filename))
normal_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_normal.csv").iloc[:, :-1]
anomaly_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_abnormal.csv").iloc[:, :-1]
print("Shape of Normal data", normal_df.shape)
print("Shape of Abnormal data", anomaly_df.shape)
Shape of Normal data (4045, 187)
Shape of Abnormal data (10505, 187)
def plot_sample(normal, anomaly):
    index = np.random.randint(0, len(normal_df), 2)


    fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
    ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")
    ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")
    ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
    ax[0].set_title("Normal")


    ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")
    ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")
    ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
    ax[1].set_title("Anomaly")


    plt.tight_layout()
    plt.show()
plot_sample(normal_df, anomaly_df)

CLASS_NAMES = ["Normal", "Anomaly"]


normal_df_copy = normal_df.copy()
anomaly_df_copy = anomaly_df.copy()
print(anomaly_df_copy.columns.equals(normal_df_copy.columns))
normal_df_copy = normal_df_copy.set_axis(range(1, 188), axis=1)
anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 188), axis=1)
normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])




df = pd.concat((normal_df_copy, anomaly_df_copy))
def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):
    df = pd.DataFrame(data)
    roll_df = df.rolling(step_size)
    smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)
    smoothed_std = roll_df.std().dropna().reset_index(drop=True)
    margin = 3*smoothed_std
    lower_bound = (smoothed_mean - margin).values.flatten()
    upper_bound = (smoothed_mean + margin).values.flatten()


    ax.plot(smoothed_mean.index, smoothed_mean)
    ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")
    ax.set_title(class_name, fontsize=9)
fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
axes = axes.flatten()
for i, label in enumerate(CLASS_NAMES, start=1):
    data_group = df.groupby("target")
    data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()
    plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
plt.tight_layout()

normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()


X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
Train shape: (3438, 187), Test shape: (607, 187), anomaly shape: (10505, 187)
class AutoEncoder(Model):
    def __init__(self, input_dim, latent_dim):
        super(AutoEncoder, self).__init__()
        self.input_dim = input_dim
        self.latent_dim = latent_dim


        self.encoder = tf.keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1D
            layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
            layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
            layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
        ])
        # Previously, I was using UpSampling. I am trying Transposed Convolution this time around.
        self.decoder = tf.keras.Sequential([
            layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Flatten(),
            layers.Dense(input_dim)
        ])


    def call(self, X):
        encoded = self.encoder(X)
        decoded = self.decoder(encoded)
        return decoded




input_dim = X_train.shape[-1]
latent_dim = 32


model = AutoEncoder(input_dim, latent_dim)
model.build((None, input_dim))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
model.summary()
Model: "auto_encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 sequential (Sequential)     (None, 24, 32)            63264     
                                                                 
 sequential_1 (Sequential)   (None, 187)               640603    
                                                                 
=================================================================
Total params: 703867 (2.69 MB)
Trainable params: 702715 (2.68 MB)
Non-trainable params: 1152 (4.50 KB)
epochs = 100
batch_size = 128
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)




history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
                    validation_split=0.1, callbacks=[early_stopping])

plt.plot(history.history['loss'], label="Training loss")
plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.title("Training loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.show()

train_mae = model.evaluate(X_train, X_train, verbose=0)
test_mae = model.evaluate(X_test, X_test, verbose=0)
anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)


print("Training dataset error: ", train_mae)
print("Testing dataset error: ", test_mae)
print("Anormaly dataset error: ", anomaly_mae)

Training dataset error:  0.014224529266357422
Testing dataset error:  0.01488062646239996
Anormaly dataset error:  0.043484628200531006
def predict(model, X):
    pred = model.predict(X, verbose=False)
    loss = mae(pred, X)
    return pred, loss

_, train_loss = predict(model, X_train)
_, test_loss = predict(model, X_test)
_, anomaly_loss = predict(model, anomaly)
threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous data


bins = 40
plt.figure(figsize=(9, 5), dpi=100)
sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")


ax = plt.gca()  # Get the current Axes
ylim = ax.get_ylim()
plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),
             arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.show()

def plot_examples(model, data, ax, title):
    pred, loss = predict(model, data)
    ax.plot(data.flatten(), label="Actual")
    ax.plot(pred[0], label = "Predicted")
    ax.fill_between(range(1, 188), data.flatten(), pred[0], alpha=0.3, color="r")
    ax.legend(shadow=True, frameon=True,
              facecolor="inherit", loc=1, fontsize=7)
#                bbox_to_anchor = (0, 0, 0.8, 0.25))


    ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)
fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
random_indexes = np.random.randint(0, len(X_train), size=5)


for i, idx in enumerate(random_indexes):
    data = X_train[[idx]]
    plot_examples(model, data, ax=axes[0, i], title="Normal")


for i, idx in enumerate(random_indexes):
    data = anomaly[[idx]]
    plot_examples(model, data, ax=axes[1, i], title="anomaly")


plt.tight_layout()
fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
fig.savefig("autoencoder.png")
plt.show()

Model Evaluation

def evaluate_model(model, data):
    pred, loss = predict(model, data)
    if id(data) == id(anomaly):
        accuracy = np.sum(loss > threshold)/len(data)
    else:
        accuracy = np.sum(loss <= threshold)/len(data)
    return f"Accuracy: {accuracy:.2%}"
print("Training", evaluate_model(model, X_train))
print("Testing", evaluate_model(model, X_test))
print("Anomaly", evaluate_model(model, anomaly))
Training Accuracy: 88.66%
Testing Accuracy: 84.51%
Anomaly Accuracy: 77.34%
def prepare_labels(model, train, test, anomaly, threshold=threshold):
    ytrue = np.concatenate((np.ones(len(X_train)+len(X_test), dtype=int), np.zeros(len(anomaly), dtype=int)))
    _, train_loss = predict(model, train)
    _, test_loss = predict(model, test)
    _, anomaly_loss = predict(model, anomaly)
    train_pred = (train_loss <= threshold).numpy().astype(int)
    test_pred = (test_loss <= threshold).numpy().astype(int)
    anomaly_pred = (anomaly_loss < threshold).numpy().astype(int)
    ypred = np.concatenate((train_pred, test_pred, anomaly_pred))


    return ytrue, ypred
def plot_confusion_matrix(model, train, test, anomaly, threshold=threshold):
    ytrue, ypred = prepare_labels(model, train, test, anomaly, threshold=threshold)
    accuracy = accuracy_score(ytrue, ypred)
    precision = precision_score(ytrue, ypred)
    recall = recall_score(ytrue, ypred)
    f1 = f1_score(ytrue, ypred)
    print(f"""\
        Accuracy: {accuracy:.2%}
        Precision: {precision:.2%}
        Recall: {recall:.2%}
        f1: {f1:.2%}\n
        """)


    cm = confusion_matrix(ytrue, ypred)
    cm_norm = confusion_matrix(ytrue, ypred, normalize="true")
    data = np.array([f"{count}\n({pct:.2%})" for count, pct in zip(cm.ravel(), cm_norm.ravel())]).reshape(cm.shape)
    labels = ["Anomaly", "Normal"]


    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=data, fmt="", xticklabels=labels, yticklabels=labels)
    plt.ylabel("Actual")
    plt.xlabel("Predicted")
    plt.title("Confusion Matrix", weight="bold")
    plt.tight_layout()
plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
Accuracy: 80.32%
Precision: 59.94%
Recall: 88.03%
f1: 71.32%

ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

precision recall f1-score support
Normal 0.94 0.77 0.85 10505
Anomaly 0.60 0.88 0.71 4045
accuracy 0.80 14550
macro avg 0.77 0.83 0.78 14550
weighted avg 0.85 0.80 0.81 14550

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/718088.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

qss实现登录界面美化

qss实现登录界面美化 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);// 去掉头部this->setWindowFlag(Qt::FramelessWindowHint);// 去掉空白部分th…

批量修改文件后缀名

背景引言 bat 文件是dos下的批处理文件。批处理文件是无格式的文本文件&#xff0c;它包含一条或多条命令。它的文件扩展名为 .bat 或 .cmd。在命令提示下输入批处理文件的名称&#xff0c;或者双击该批处理文件&#xff0c;系统就会调用cmd.exe按照该文件中各个命令出现的顺序…

鸿蒙 用tabs的 divider 属性 添加分割线,非常好用

divider10DividerStyle | null 用于设置区分TabBar和TabContent的分割线样式设置分割线样式&#xff0c;默认不显示分割线。 DividerStyle: 分割线的样式&#xff1b; null: 不显示分割线。 添加前&#xff1a; 添加后的效果&#xff1a;

JavaScript-事件监听及对象

添加事件监听 语法&#xff1a;对象名.addEventListener(事件类型,要执行的函数) 作用&#xff1a;当事件触发时&#xff0c;就调用这个函数 事件类型&#xff1a;比如用鼠标点击&#xff0c;或用滚轮滑动&#xff0c;鼠标经过这些 要执行的函数&#xff1a;要做的事 &l…

物联网主机E6000:智慧安防的核心动力

随着科技的不断进步&#xff0c;物联网&#xff08;IoT&#xff09;技术已经深入到我们生活的各个领域&#xff0c;尤其是在智慧安防领域&#xff0c;物联网技术的应用正变得越来越广泛。物联网主机E6000作为一款高性能的智能设备&#xff0c;其在智慧安防系统中扮演着至关重要…

海外仓系统有哪些?主流海外仓系统类型、优缺点,不同海外仓如何选择

作为海外仓的经营者&#xff0c;不管海外仓大小&#xff0c;你都应该知道海外仓系统对提升仓库管理效率有多重要。 不过现在市场上的海外仓系统确实种类太多了&#xff0c;想选到一个适合自己海外仓&#xff0c;性价比又比较高的wms海外仓系统也不是一件容易的事情。 本文会详…

小白入手实现AI客服机器人demo

一、环境准备 1 安装python 2 安装vscode 3 安装相关python库 pip install flask flask_cors openai 4.在vscode里安装TONGYI Lingma(AI编程助手&#xff09; 二、后端搭建 创建一个后端文件夹chatbot&#xff0c;再新建一个app.py的python文件 from flask import Flask, requ…

睿烨蜘蛛池福建官网下载

baidu搜索&#xff1a;如何联系八爪鱼SEO? baidu搜索&#xff1a;如何联系八爪鱼SEO? baidu搜索&#xff1a;如何联系八爪鱼SEO? 现在做站群程序的时候,由于百度、搜狗蜘蛛越来越少了,所以缓存也跟着减少,很多人都降低了服务器的配置,这个时候google蜘蛛却疯狂涌入,烦不胜烦…

Airflow任务流调度

0 前言 Airflow是Airbnb内部发起的一个工作流管理平台。使用Python编写实现的任务管理、调度、监控工作流平台。Airflow的调度依赖于crontab命令&#xff0c;与crontab相比&#xff0c;Airflow可以方便地查看任务的执行状况&#xff08;执行是否成功、执行时间、执行依赖等&…

CMSIS-RTOS2简介

本文介绍CMSIS-RTOS2。 1.引入 CMSIS-RTOS2在基于Arm Cortex处理器的设备上运行的实时操作系统内核上指定了通用RTOS接口。应用程序和中间件组件可以使用CMSIS-RTOS2 API在各种软件生态系统中实现更好的代码重用和更简单的集成。 CMSIS-RTOS2还指定了RTOS内核使用的标准OS T…

归并排序的应用—计算逆序对的个数

归并排序的应用—计算逆序对的个数 什么是逆序对题目的思路 题目 如果你还不会归并排序&#xff0c;那么请你先学会它&#xff0c;再来看本篇文章效果更佳。 什么是逆序对 逆序对的定义&#xff1a;在一个数列中&#xff0c;如果前面的数字大于后面的数字&#xff0c;那么这两…

NTP8835数字功放-智能投影仪音频解决方案

数字功放是智能投影仪音频解决方案的一种重要技术&#xff1b;与传统的模拟功放相比&#xff0c;数字功放具有更高的效率和更低的失真&#xff1b;在智能投影仪中应用数字功放技术&#xff0c;可以提供更清晰、更真实的音频效果&#xff0c;为用户带来更好的听觉体验。 数字功放…

Shell脚本(.sh文件)如何执行完毕之后不自动关闭?

Shell脚本异常傲娇&#xff0c;出错后、执行完根本不给你机会让你查看报错信息、输出信息&#xff0c;直接闪退。 废话不多说&#xff0c;调教方法如下&#xff0c;直接在Shell脚本末尾加上如下代码&#xff1a; 1、实现方式一 1.1 使用read命令达到类似bat中的pause命令效果…

转型AI产品经理(11):“损失规避”如何应用在Chatbot产品中

损失规避是行为经济学和心理学中的一个重要概念&#xff0c;它揭示了人们在面对潜在的收益和损失时&#xff0c;表现出对损失的强烈偏好避免&#xff0c;相比于获得同等价值的利益&#xff0c;人们对损失的感受更为强烈。它主要有以下特征&#xff1a; 1、不对称性 损失规避体…

jvm必知必会-类的生命周期图文详解

类的生命周期描述了一个从加载、使用到卸载的过程; 而其中的 连接 部分又分为一下三个阶段: 验证准备解析6.1 加载阶段 Loading阶段第一步是 类加载器 会根据类全限定名通过不同的渠道以二进制流的方式获取字节码信息,程序员可以使用Java代码扩展不同的渠道。 比如通过 …

【前端】Nesj 学习笔记

1、前置知识 1.1 装饰器 装饰器的类型 declare type ClassDecorator <TFunction extends Function>(target: TFunction) > TFunction | void; declare type PropertyDecorator (target: Object, propertyKey: string | symbol) > void; declare type MethodDe…

AI在线创作歌曲智能绘画对话三合一源码系统 前后端分离 带完整的安装代码包以及搭建教程

系统概述 在数字化时代背景下&#xff0c;艺术与技术的融合正以前所未有的速度推进&#xff0c;催生出一系列创新应用。为了满足创作者对多元化、高效能创作工具的需求&#xff0c;我们自豪地推出了“AI在线创作歌曲、智能绘画对话三合一源码系统”。这一系统不仅实现了音乐、…

关于事务流的思考

关于事务流的思考 1 事务流业务分析 ​ 不同业务可能有不同的审核流程&#xff0c;而activiti为大家提供了一套公用的审核功能&#xff0c;基于这些功能我们可以根据自己的业务需求组合出我们自己的审核流程&#xff0c;而这里我要实现的事务流有如下功能&#xff1a;角色为结…

springboot整合sentinel接口熔断

背景 请求第三方接口或者慢接口需要增加熔断处理&#xff0c;避免因为慢接口qps过大导致应用大量工作线程陷入阻塞以至于其他正常接口都不可用&#xff0c;最近项目测试环境就因为一个查询的慢接口调用次数过多&#xff0c;导致前端整个首页都无法加载。 依赖下载 springboo…

【Ubuntu通用压力测试】Ubuntu16.04 CPU压力测试

​ 使用 stress 对CPU进行压力测试 我也是一个ubuntu初学者&#xff0c;分享是Linux的优良美德。写的不好请大佬不要喷&#xff0c;多谢支持。 sudo apt-get update 日常先更新再安装东西不容易出错 sudo apt-get upgrade -y 继续升级一波 sudo apt-get install -y linux-to…