简单的基于小波分解和独立分量分析的脑电信号降噪(Python)

脑电信号是一种典型的非平稳随机信号且存在一定的非高斯性和非线性。传统的分析处理方法是将脑电信号近似看做线性、准平稳、高斯分布的随机信号,这使得分析结果往往不能令人满意,实用性较差。现代的小波变换方法和独立分量分析方法的提出为有效地分析脑电信号提供了新的途径。由于所要提取的特征波频率不精确并受到噪声的影响,如果单独应用小波提取出的特征信号往往特征不够明显。独立分量分析是根据信号的多元统计特性进行分析处理,可以将多道混合信号进行独立分离。考虑到所要提取的特征波就是混合脑电信号中的一个独立分量,应用独立分量分析在一定程度上可以分离出特征波。

鉴于此,采用简单的小波分解和独立分量分析对脑电信号降噪,完整代码如下:

import numpy as np
import matplotlib.pyplot as plt
import pyedflib
# import sklearn.linear_model as slm
from sklearn import metrics
from statsmodels.tsa.ar_model import AutoReg
# import scipy
# import scipy.signal as signal
from sklearn.decomposition import FastICA
import pywt

class EEGSignalProcessing:
    def __init__(self) -> None:
        pass

    def read_signal(filename, number_of_samples = None, offset = 0):
        file = pyedflib.EdfReader(filename)
        if number_of_samples is None:
            number_of_samples = file.getNSamples()[0]
        number_of_signals = file.signals_in_file
        signal = np.zeros((number_of_signals, number_of_samples))

        for i in range(number_of_signals):
            signal[i, :] = file.readSignal(i)[offset:offset + number_of_samples]
        
        file.close()
        return signal
    
    def plot_signal(data, sampling_frequency, title, number_of_channels = None, channel_labels = None, yaxis_label = None, xaxis_label = None):       
        plt.rcParams['font.size'] = '16'
        fig = plt.figure()
        ax = fig.add_subplot(1,1,1)
        lenght = len(data)
        
        if number_of_channels is None:
            number_of_channels_useful = range(0, lenght)
        else:
            if isinstance(number_of_channels, str):
                number_of_channels_useful = range(0, lenght-1)
            else:
                number_of_channels_useful = number_of_channels
        
        for channel in number_of_channels_useful:
            if channel_labels is None:
                label = 'ch' + str(channel + 1)
            else:
                label = channel_labels[channel]

            limit = data[channel, :].size
            x_values = [num/sampling_frequency for num in range(0, limit)]
            ax.plot(x_values, data[channel, :], label = label)
        
        fig.set_size_inches(15,5)
        plt.title(title)
        plt.legend()

        if yaxis_label is not None:
            plt.ylabel(yaxis_label)
        if xaxis_label is not None:
            plt.xlabel(xaxis_label)
        
        plt.show(block = True)

    def channel_desynchronize(data_1d, delay, value = 0):
        number_of_samples = len(data_1d)
        if delay > 0:
            for i in range(number_of_samples - 1, delay - 1, -1):
                data_1d[i] = data_1d[i - delay]
            for i in range(0, delay):
                data_1d[i] = value
        if delay < 0:
            delay = -delay
            for i in range(0, number_of_samples - delay):
                data_1d[i] = data_1d[i + delay]
            for i in range(number_of_samples - delay, number_of_samples):
                data_1d[i] = value        

    def all_channels_desynchronize(data, delay, value = 0):
        for i in range(0, len(data)):
            EEGSignalProcessing.channel_desynchronize(data[i], delay, value)        

    class NoiseReduction:

        def autoregression(data, delay):
            signals_number = len(data)
            samples_number = len(data[0])
            output = np.zeros((signals_number, samples_number))

            for i in range(0, signals_number):
                model = AutoReg(data[i], lags=delay)
                model_fit = model.fit()
                predictions = model_fit.predict(start=0, end=samples_number-1, dynamic=False)
                output[i, :samples_number] = predictions
            return output

        def wavelet(linear_array):
            name = 'bior3.1'

            # Create wavelet object and define parameters
            wav = pywt.Wavelet(name)
            max_level = pywt.dwt_max_level(len(linear_array) + 1, wav.dec_len)
            # print("Maximum level is " + str(max_level))
            threshold = 0.04  # Threshold for filtering

            # Decompose into wavelet components, to the level selected:
            coeffs = pywt.wavedec(linear_array, name, level=5)
            plt.figure()
            for i in range(1, len(coeffs)):
                plt.subplot(max_level, 1, i)
                plt.plot(coeffs[i])
                coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
                plt.plot(coeffs[i])
            plt.show()
            datarec = pywt.waverec(coeffs, name)
            return np.array(datarec)

        def wavelet_all_channels(data):
            output = []
            for c in data:
                output.append(EEGSignalProcessing.NoiseReduction.wavelet(c))
            return np.stack(output)
        
        def ica(data, mask=None):
            # maska do wyboru składowych
            reduce_level = [True, True, True, True, True, True, True, True, True]
            reduce_level[7] = False

            if mask is not None:
                reduce_level = mask

            sigT = data.T
            n = data.shape[0]

            # obliczanie ICA
            ica = FastICA(n_components=n)
            sig_ica = ica.fit_transform(sigT)
            # Macierz mmieszania
            A_ica = ica.mixing_
            # Przycięcie macierzy mieszającej, aby odrzucić najmniej znaczące wartości
            A_ica_reduced = A_ica
            sig_ica = sig_ica[:, reduce_level]
            X_reduced = np.dot(sig_ica, A_ica_reduced.T[reduce_level, :]) + ica.mean_
            ica_reconstruct = X_reduced.T
            return ica_reconstruct

    class Noise:
        def add_uniform_noise(data, low, high, seed=None):
            signals_number = len(data)
            samples_number = len(data[0])
            output = np.zeros((signals_number, samples_number))

            if seed is not None:
                np.random.seed(seed)

            for i in range(signals_number):
                if isinstance(low, str):
                    if low == "min_value":
                        low = min(data[i])

                if isinstance(high, str):
                    if high == "max_value":
                        high = max(data[i])
                noise = np.random.uniform(low, high, samples_number)
                output[i] = data[i] + noise
            return output

        def add_normal_noise(data, mean, std, amplitude=1, seed=None):
            signals_number = len(data)
            samples_number = len(data[0])
            output = np.zeros((signals_number, samples_number))

            if seed is not None:
                np.random.seed(seed)

            for i in range(signals_number):
                noise = np.random.normal(mean, std, samples_number)
                output[i] = data[i] + noise
            return amplitude*output
        
        def add_triangular_noise(data, left, peak, right, seed=None):
            signals_number = len(data)
            samples_number = len(data[0])
            output = np.zeros((signals_number, samples_number))

            if seed is not None:
                np.random.seed(seed)
                
            for i in range(signals_number):
                noise = np.random.triangular(left, peak, right, samples_number)
                output[i] = data[i] + noise
            return output

class Metrics:
    def __init__(self) -> None:
        pass

    def evaluate_signal(signal, prediction, cut_left=100, cut_right=100):
        signal_cut = signal[cut_left:-cut_right]
        predicted_cut = prediction[cut_left:-cut_right]

        # metryki z sklearn
        mae = metrics.mean_absolute_error(signal_cut, predicted_cut)
        mse = metrics.mean_squared_error(signal_cut, predicted_cut)

        # wyświetlanie
        print('MAE z biblioteki sklearn: {}'.format(round(mae, 2)))
        print('MSE z biblioteki sklearn: {}'.format(round(mse, 2)))

    def differantial(sigA, sigB, cutleft=100, cutright=100):
        differential = sigA[:,cutleft:-cutright] - sigB[:,cutleft:-cutright]
        return differential


def main():
    channels_to_plot = [0,1,2,3,4]
    # signal = EEGSignalProcessing.read_signal(filename = "Subject00_2.edf", number_of_samples=1000)
    # signal = EEGSignalProcessing.read_signal(filename = "Subject00_1.edf", number_of_samples=1000)
    signal = EEGSignalProcessing.read_signal(filename = "rsvp_10Hz_08b.edf", number_of_samples=1000)

    EEGSignalProcessing.plot_signal(signal,sampling_frequency= 2048, title = "Orginalne sygnały EEG", number_of_channels = channels_to_plot,
                                    yaxis_label='Wartosc sygnalu', xaxis_label='Czas [s]')

    low = 2
    high = 4
    sig_noise_uniform = EEGSignalProcessing.Noise.add_uniform_noise(signal, low=low, high=high, seed=100)
    EEGSignalProcessing.plot_signal(sig_noise_uniform, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Jednostajny Low={}, High={})".format(
        low, high), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')

    mean = 0
    std = 2
    ampl = 2
    sig_noise_normal = EEGSignalProcessing.Noise.add_normal_noise(signal, mean=mean, std=std, amplitude=ampl, seed=100)
    EEGSignalProcessing.plot_signal(sig_noise_normal, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Normalny Low={}, High={})".format(
        mean, std), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')

    sig_n3_left = 0
    sig_n3_peak = 4
    sig_n3_right = 6
    sig_noise_triangular = EEGSignalProcessing.Noise.add_triangular_noise(signal, left=sig_n3_left, peak=sig_n3_peak, right=sig_n3_right, seed=100)
    EEGSignalProcessing.plot_signal(sig_noise_triangular, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Trójkątny Left={}, Peak={}, High={})".format(
        sig_n3_left, sig_n3_peak, sig_n3_right), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')
    

    # Odszumianie sygnałów
    # Autoregresja
    AR_lag = 10
    signal_autoregresion = EEGSignalProcessing.NoiseReduction.autoregression(sig_noise_triangular, delay = AR_lag)
    EEGSignalProcessing.plot_signal(signal_autoregresion,
                               title="5 odszumionych kanałów EEG - regresja liniowa delay={}".format(
                                   AR_lag), sampling_frequency=2048, number_of_channels=channels_to_plot,
                               yaxis_label='wartość sygnału', xaxis_label='czas [s]')
    
    # Wavelet
    signal_wavelet = EEGSignalProcessing.NoiseReduction.wavelet_all_channels(sig_noise_triangular)
    EEGSignalProcessing.plot_signal(signal_wavelet,
                               title="5 odszumionych kanałów EEG - Wavelet", sampling_frequency=2048, number_of_channels=channels_to_plot,
                               yaxis_label='wartość sygnału', xaxis_label='czas [s]')
    
    # ICA
    signal_ICA = EEGSignalProcessing.NoiseReduction.ica(sig_noise_triangular)
    EEGSignalProcessing.plot_signal(signal_ICA,
                               title="5 odszumionych kanałów EEG - ICA", sampling_frequency=2048, number_of_channels=channels_to_plot,
                               yaxis_label='wartość sygnału', xaxis_label='czas [s]')
    
    # Metryki
    ch = 4
    noise_signal = sig_noise_normal

    print('\nORYGINALNY')
    Metrics.evaluate_signal(signal[ch], signal[ch])

    print('\nNIEODSZUMIONY, dodano szum rozkład normalny')
    Metrics.evaluate_signal(signal[ch], noise_signal[ch])

    print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem autoregresja')
    Metrics.evaluate_signal(signal[ch], signal_autoregresion[ch])

    print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem ICA')
    Metrics.evaluate_signal(signal[ch], signal_ICA[ch])

    print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem wavelet')
    Metrics.evaluate_signal(signal[ch], signal_wavelet[ch])

    # Sygnał różnicowy
    differential_noise = Metrics.differantial(sig_noise_normal, signal)
    differential_AR = Metrics.differantial(signal_autoregresion, signal)
    differential_ICA = Metrics.differantial(signal_ICA, signal)
    differential_Wavelet = Metrics.differantial(signal_wavelet, signal)

    EEGSignalProcessing.plot_signal(differential_noise, title="Sygnał różnicowy, zaszumiony-orginalny",
                                    sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",
                                    xaxis_label="Czas [s]")
    
    EEGSignalProcessing.plot_signal(differential_AR, title="Sygnał różnicowy, AR-orginalny",
                                    sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",
                                    xaxis_label="Czas [s]")
    
    EEGSignalProcessing.plot_signal(differential_ICA, title="Sygnał różnicowy, ICA-orginalny",
                                    sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",
                                    xaxis_label="Czas [s]")
    
    EEGSignalProcessing.plot_signal(differential_Wavelet, title="Sygnał różnicowy, wavelet-orginalny",
                                    sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",
                                    xaxis_label="Czas [s]")
完整代码:https://mbd.pub/o/bread/mbd-ZZWYlJlp

if __name__ == '__main__':
    main()

图片

图片

图片

图片

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/670728.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode---字符串

344. 反转字符串 编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。 不要给另外的数组分配额外的空间&#xff0c;你必须原地修改输入数组、使用 O(1) 的额外空间解决这一问题。 代码示例&#xff1a; //时间复杂度: O(n) //空间…

职场思考-在行业坚守中实现个人增值(13)

滚石不生苔&#xff0c;转行不聚财 在自己工作几年后&#xff0c;职业竞争力会由专业能力向行业经验进行转化 如果你不具备足够的行业积累&#xff0c;即使在某个专业上有足够的能力&#xff0c;你也难以得到待遇或职位的提升&#xff0c;陷入高不成低不就的局面 掌握完成岗位工…

使用pikachu Xss后台出现的问题

在进行xss-x漏洞实验的时候&#xff0c;一直出现上述错误&#xff0c;查找了很多&#xff0c;终于找到问题所在 pikachu使用的数据库为同一个数据库&#xff0c;千万别被pkxss误导&#xff0c;以为pikachu还有一个数据库为pkxss,所以在配置的时候写下如下图的

Python知识点5---字符串的使用

提前说一点&#xff1a;如果你是专注于Python开发&#xff0c;那么本系列知识点只是带你入个门再详细的开发点就要去看其他资料了&#xff0c;而如果你和作者一样只是操作其他技术的Python API那就足够了。 Python的字符串在使用上和其他语言的差别不大&#xff0c;常规操作都…

Nginx实战:nginx支持带下划线的header

nginx对header 的名字字符做了限制&#xff0c;默认 underscores_in_headers 为off&#xff0c;表示如果header name中包含下划线&#xff0c;则忽略掉&#xff0c;后端服务就获取不到该请求头。 为了支持header带下划线的参数&#xff0c;可以在http内或者server内设置如下参数…

FreeRTOS基础(七):临界段代码保护及调度器挂起与恢复

上一篇博客我们详细介绍了FreeRTOS是怎么管理中断的&#xff0c;其实&#xff0c;从本质上来讲就是将就是利用的BASEPRI这个寄存器&#xff0c;来屏蔽优先级低于某一个阈值的中断&#xff0c;当设置为0的时候&#xff0c;就是打开所有中断&#xff0c;所有中断都可以响应。这样…

【VMware虚拟机中ubuntu系列】—— 在虚拟机中使用本机摄像头的详细教程与常见问题分析及解决

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、虚拟机调用本机摄像头(1) 启动VMware USB 服务(2) 连接本机摄像头(3) 测试摄像头的连接 二、安装usb驱动二、运行usb_cam.launch时出现select timeout的报错…

希捷硬盘怎么恢复数据? 5 个免费希捷数据恢复软件

希捷已迅速成为全球最大的数字存储提供商。许多人选择并使用希捷外置硬盘来存储他们的媒体文件、学校或工作文件以及其他重要数据。有时&#xff0c;希捷硬盘中的数据会丢失。 如果您丢失了希捷硬盘上的数据&#xff0c;请不要惊慌。在专业的希捷数据恢复软件的帮助下&#xf…

【c++进阶(一)】STL之string接口介绍

&#x1f493;博主CSDN主页:Am心若依旧&#x1f493; ⏩专栏分类c从入门到精通⏪ &#x1f69a;代码仓库:青酒余成&#x1f69a; &#x1f339;关注我&#x1faf5;带你学习更多c   &#x1f51d;&#x1f51d; 1.前言 本章重点 本章着重讲解string中一些重要的接口函数&…

SOUI Combobox 实现半透明弹出下拉框

SOUI默认情况下combobox的弹出框不是半透明的&#xff0c;这个时候如果背景透明时&#xff0c;滚动条会出现黑色背景&#xff0c;这个时候只需要在在combobox下添加一个子节点 <dropdownStyle translucent"1"></dropdownStyle> 这样一个窗口默认即实现…

Nature Communications|柔性自驱动仿生眼(离子凝胶/仿生眼/柔性电子)

2024年4月10日,黄维(Wei Huang)院士、南京工业大学刘举庆(Juqing Liu)教授和刘正东(Zhengdong Liu)副教授课题组,在《Nature Communications》上发布了一篇题为“A bionic self-driven retinomorphic eye with ionogel photosynaptic retina”的论文,罗旭(Xu Luo)、陈晨(…

vscode过滤器@modified(查看配置了哪些设置)

文档 visualstudio•docs•getstarted•settingshttps://code.visualstudio.com/docs/getstarted/settings 说明 使用modified可以过滤出&#xff1a; 配置过的设置&#xff08;和默认值不同&#xff09;&#xff1b; 在 settings.json 文件中配置了值的设置 步骤 1.打开…

Golang省市二级联动实现 从数据收集、清洗到数据存储

1.背景&#xff1a; 最近在写项目&#xff0c;在项目中有一个需求是获取用户的地理位置&#xff0c;一开始是打算让前端使用JSON包的形式去实现&#xff0c;但是考虑到后期可能需要对省市的数据做一些修改和控制操作&#xff0c;所以改为后端实现&#xff0c;并向后台暴露一套…

六一去哪儿,跟着蒙自源开启一段关于童年记忆与美味奇妙旅程

夏日微风轻拂&#xff0c;童心随风起舞。在这个充满欢声笑语的季节里&#xff0c;蒙自源诚挚地邀请您和您的家人&#xff0c;一同参加为六一儿童节精心准备的庆祝活动&#xff0c;共同开启一段关于童年记忆与美味的奇妙旅程。 从5月25日起&#xff0c;蒙自源的各大门店将化身为…

Vue3实战笔记(56)—实战:DefineModel的使用方法细节

文章目录 前言一、实战DefineModel二、思考原理总结 前言 今天写个小例子&#xff0c;实战DefineModel的使用方法细节 一、实战DefineModel 上文官方说的挺清楚&#xff0c;实战验证一下&#xff0c;新建DefineModel.vue&#xff08;这是儿子&#xff09;&#xff1a; <te…

mac油猴Safari浏览器插件:Tampermonkey for Mac下载

Tampermonkey 是一款用于浏览器的用户脚本管理器插件&#xff0c;它允许用户安装、管理和运行用户脚本&#xff0c;从而可以自定义网页的功能和外观。该插件支持在谷歌浏览器、火狐浏览器、Safari等主流浏览器上使用。提供了丰富的用户脚本管理和自定义功能&#xff0c;使用户可…

基于小波分析的一维时间序列多重分形分析(MATLAB R2018a)

分形与小波变换在尺度性能上具有很多相似性&#xff0c;因此小波变换被认为是分析、刻画分形现象一个有力的工具。在分析分形的一般方法中&#xff0c;需要按照“盒维数”的计算思想&#xff0c;首先要将研究序列进行不同长度的分割&#xff0c;然后建立起结构函数&#xff0c;…

【人工智能】第一部分:ChatGPT的基本概念和技术背景

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

【因果推断python】10_分组和虚拟变量回归1

目录 分组数据回归 分组数据回归 并非所有数据点都是一样的。 如果我们再次查看我们的 ENEM 数据集&#xff0c;相比小规模学校的分数&#xff0c;我们更相信规模较大的学校的分数。 这并不是说大型学校更好或什么&#xff0c; 而只是因为它们的较大规模意味着更小的方差。 i…

【CVE-2021-3156】——漏洞复现、原理分析以及漏洞修复

文章目录 前言1、漏洞概述2、漏洞复现2.1、漏洞复现测试环境2.2、漏洞复现具体步骤 3、漏洞原理3.1、前置知识3.1.1、sudo3.1.2、sudoedit3.1.3、转义字符 3.2、漏洞分析 4、漏洞修复5、参考文献总结 前言 2021年01月27日&#xff0c;RedHat官方发布了Sudo缓冲区/栈溢出漏洞的风…