需要的库
mne numpy scipy scikit-learn
pip install mne numpy scipy scikit-learn
数据下载
对Data sets 2a ‹4-class motor imagery› 四分类的运动想象来进行mne的处理。
BCI Competition IV
数据的说明如下
[22 EEG channels (0.5-100Hz; notch filtered), 3 EOG channels, 250Hz sampling rate, 4 classes, 9 subjects]
在此处下载数据集和数据的说明
数据是gdf格式的
除了这个官网的,还有github版本csv格式的谷歌云盘存储的数据
这个数据经过了共同平均参考(CAR)和8-30HZ的带通滤波器(只有alpha和beta频段了)
说明,有九个患者,T是训练集的版本,E是测试集的版本。
加载数据
import mne
import os
import numpy as np
# 设置数据路径
data_path = '/path/to/your/data/' # 替换为你的数据路径
# 加载训练和测试数据
subject = 'A01' # 替换为所需的受试者编号
training_data = mne.io.read_raw_gdf(os.path.join(data_path, f'{subject}T.gdf'), preload=True)
test_data = mne.io.read_raw_gdf(os.path.join(data_path, f'{subject}E.gdf'), preload=True)
取单人的训练数据,mne的版本不能太高,否则会报错
查看数据的信息
可以使用training_data.info来查看信息,
使用jupyterlab的界面可以直接查看到下面的信息
一个读取gdf格式文件的函数
def read_and_show_gdf_info(data_path):
"""
读取GDF文件并显示基本信息
参数:
data_path: GDF文件的完整路径
返回:
raw: MNE Raw对象
"""
try:
# 读取GDF文件
raw = mne.io.read_raw_gdf(data_path, preload=True)
# 获取并打印基本信息
print("\n=== 数据基本信息 ===")
print(f"采样率: {raw.info['sfreq']} Hz")
print(f"通道数: {len(raw.ch_names)}")
print(f"数据时长: {raw.n_times / raw.info['sfreq']:.2f} 秒")
print(f"通道名称: {raw.ch_names}")
# 获取事件信息
events, event_dict = mne.events_from_annotations(raw)
print("\n=== 事件信息 ===")
print(f"事件类型: {event_dict}")
print(f"事件总数: {len(events)}")
return raw
except FileNotFoundError:
print(f"错误: 文件不存在 - {data_path}")
return None
except Exception as e:
print(f"错误: {str(e)}")
return None
#现在先测试单个文件的处理流程
data_dir = r"E:\data\运动想象\IV\BCICIV_2a_gdf\BCICIV_2a_gdf"
#使用路径拼接函数
data_path = os.path.join(data_dir, 'A01T.gdf')
raw = read_and_show_gdf_info(data_path)
获取通道的名字的信息
training_data.ch_names
获取数据
training_data.get_data()
(25,672528)
按照采样率是250hz,可以得到采样的时间是2690s
training_data.times
通过时间的函数,我们可以看到
简单查看一些图像
设置matplotlib的后端
%matplotlib
inline:将图形内嵌在Notebook中显示。这是默认选项。
notebook:将图形显示在Notebook的一个单独的单元格中。
plt:将图形显示在一个专用的Figure窗口中,可以通过菜单栏或工具栏进行缩放、保存等操作。
qt:将图形显示在一个QT窗口中,支持交互式操作。
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use('Qt5Agg')#设置使用qt来作为处理的后端
training_data.plot()
会出现下面的界面
因为这里通道的名称不是标准的命名
通道信息的补全
因为在进行滤波算法和一些处理的过程中,可能会使用到空间信息,所以,要先把对应的通道信息给补充上
修改通道名称到标准的命名
raw.info[“ch_names”]
因为要电极进行自动的定位,使用标准的系统,使用 montage
如果不进行名称的修正,会报错。
standard_montage= mne.channels.make_standard_montage('standard_1020')
standard_montage.plot()
按照对应的电极的点位,
单个通道的{‘EEG Fp1’:‘Fp1’}
针对这个建立下面这个映射表,是根据标准的10-20的电极图确定的。
mapping = {'EEG-Fz':'Fz',
'EEG-0':'FC3',
'EEG-1':'FC1',
'EEG-2':'FCz',
'EEG-3':'FC2',
'EEG-4':'FC4',
'EEG-5': 'C5',
'EEG-C3': 'C3',
'EEG-6': 'C1',
'EEG-Cz': 'Cz',
'EEG-7':'C2',
'EEG-C4':'C4',
'EEG-8':'C6',
'EEG-9':'CP3',
'EEG-10':'CP1',
'EEG-11':'CPz',
'EEG-12':'CP2',
'EEG-13':'CP4',
'EEG-14':'P1',
'EEG-Pz':'Pz',
'EEG-15':'P2',
'EEG-16':'POz',
'EOG-left':'Fp1',
'EOG-central': 'Fpz',
'EOG-right':'Fp2'
}
我们可以看到有22个EEG电极和3个EOG电极,先确定位置,然后在修改通道类型。
#重命名通道,设置电极的标准
training_data.rename_channels(mapping)
#使用标准的通道,就
training_data.set_montage(standard_montage)
# 绘制通道位置
training_data.plot_sensors(show_names=True)
绘制出对应电极的位置
查看通道的类型,因为只能对齐已有的类别,所以先不对
raw.ch_names
raw.get_channel_types()
# 修改类型
# 指定要修改的通道名称
ch_names = raw.info['ch_names']
channels_to_modify = ch_names[-3:]
# 设置这些通道的新类型为 'eog'
raw.set_channel_types({ch: 'eog' for ch in channels_to_modify})
# 验证修改是否成功
print(raw.get_channel_types())
通道电极的属性的说明
查看电极的坐标
print(raw.info['chs'][0]['loc']) # 打印第一个通道的位置信息,现在loc的信息还是空的
前三个值:通道在笛卡尔坐标系中的位置(x, y, z)。
接下来的三个值:通道方向的单位向量(x, y, z)。
最后六个值:协方差矩阵,用于描述通道噪声的空间分布。
#绘制3D位置
training_data.get_montage().plot(kind='3d')
#绘制2d的,这里raw和train_data混用了。
raw.get_montage().plot()
再次绘制波形图
查看内置的电极布局
builtin_montages = mne.channels.get_builtin_montages(descriptions=True)
for montage_name, montage_description in builtin_montages:
print(f"{montage_name}: {montage_description}")
保存电极文件
raw.get_montage().save('my_montage.set')
如果想要去除一些通道,使用下面的方式,我们去除掉眼电数据。
training_data.drop_channels(['Fpz'])
滤波的处理操作
使用陷波处理工频噪声,某个固定hz的噪声,使用滤波进行高通和低通
#此处使用50Hz的陷波滤波器
raw = raw.notch_filter(freqs=(60))
raw = raw.filter(l_freq=0.1, h_freq=30)
training_data.filter(7.,35.,fir_design='firwin')
介绍两种fir的区别
firwin
函数用于设计基于窗口方法的FIR滤波器。其基本原理是通过选择合适的窗口函数和滤波器长度来得到一个理想频率响应逼近。firwin
函数的主要特点包括:
-
窗口方法设计:
firwin
采用窗口方法设计FIR滤波器,通过截断理想的无限脉冲响应(IIR)滤波器并应用窗函数来减少边缘效应。 -
频率响应平滑:窗口函数有助于减少频率响应中的旁瓣(sidelobes),从而获得较为平滑的频率响应。
-
简单易用:
firwin
设计的滤波器通常具有较少的参数,需要用户指定滤波器长度和截止频率(或频带)。 -
支持多种窗函数:
firwin
支持多种窗函数,如汉宁窗、海明窗、布莱克曼窗等。 -
应用:适用于大多数标准低通、高通、带通和带阻滤波器设计。
from scipy.signal import firwin, freqz
import matplotlib.pyplot as plt
import numpy as np
# 设计一个低通滤波器
numtaps = 51 # 滤波器长度
cutoff = 0.3 # 截止频率(相对于Nyquist频率)
# 使用汉宁窗设计滤波器
b = firwin(numtaps, cutoff, window='hanning')
# 频率响应
w, h = freqz(b, worN=8000)
plt.plot(0.5 * np.fft.fftfreq(len(w)), np.abs(h), 'b')
plt.show()
firwin2
firwin2
函数用于设计基于频率采样方法的FIR滤波器。与firwin
不同,firwin2
允许用户直接指定滤波器的任意频率响应特性。
-
频率采样设计:
firwin2
基于频率采样法设计滤波器,通过在频率域上指定目标频率响应来生成滤波器系数。 -
灵活性高:用户可以精确控制在不同频段上的频率响应,这使得
firwin2
在设计自定义滤波器时更加灵活。 -
复杂频率响应:可以设计具有非标准响应的滤波器,例如多通带或多阻带滤波器。
-
应用:适用于需要自定义频率响应的滤波器设计场景。
示例代码
from scipy.signal import firwin2, freqz
import matplotlib.pyplot as plt
import numpy as np
# 定义频率和相应的响应
freq = [0.0, 0.1, 0.2, 0.5, 0.7, 1.0] # 相对于Nyquist频率的频率点
gain = [0.0, 0.0, 1.0, 1.0, 0.0, 0.0] # 对应频率点的增益
# 设计滤波器
numtaps = 51 # 滤波器长度
b = firwin2(numtaps, freq, gain)
# 频率响应
w, h = freqz(b, worN=8000)
plt.plot(0.5 * np.fft.fftfreq(len(w)), np.abs(h), 'b')
plt.show()
共同平均参考
#去除眼电数据之后,进行平均参考
raw.ch_names[-3:]
raw.drop_channels(raw.ch_names[-3:])
raw_car = raw.set_eeg_reference('average', projection=True)
#使用这种投射
raw.apply_proj()
因为原csv好像没有进行什么复杂的操作,接下来进行如何从gdf文件提取到对应的csv文件的。
周期分割
读取文件注释标签
这是文件的事件的描述。
#取一下相关的时间和事件标签
events, event_dict = mne.events_from_annotations(raw)
了解什么是annotation
# onsets 和 durations 是以秒为单位的数组,descriptions 是一个字符串数组
onsets = [10, 20, 30] # 事件开始时间(秒)
durations = [1, 2, 1] # 事件持续时间(秒)
descriptions = ['Event1', 'Event2', 'Event3'] # 事件描述
annotations = mne.Annotations(onsets, durations, descriptions)
这三个行的一列,构成了events的信息,一般标签信息可能只有开始的时间,和事件描述组成的
event_dict,是对应的记录吧
events
#选中通道类型,选择什么通道作为要取的数据
picks = mne.pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False,
exclude='bads')
#选择要取的事件对应的字典
event_id ={'769': 7,
'770': 8,
'771': 9,
'772': 10}
#定义取出的时间起止点
tmin, tmax = 2., 5.5
epochs = mne.Epochs(raw, events, event_id, tmin, tmax, proj=True,
picks=picks, baseline=None, preload=True,
reject=dict(eeg=220e-6)
)
baseline参数用于指定数据基线的时间范围,以便进行基线校正。基线校正是一种预处理步骤,用于将数据重新缩放,使基线均值为0。这有助于消除数据中的直流偏移,并使不同epoch之间的数据具有可比性。
baseline=(None, 0): 使用事件前的时间段作为基线。这里None表示基线的起始时间与tmin相同。
baseline=(-0.2, 0): 使用事件前200毫秒到事件时刻作为基线。
执行完上述代码,就变成了一个epochs 的类了
epochs类的介绍
数组的形状必须是(n_epochs, n_chans, n_times)
import numpy as np
import neo
import mne
import matplotlib.pyplot as plt
"""
设置event id,用来识别events.
"""
event_id = 1
# 第一列表示样本编号
events = np.array([[200, 0, event_id],
[1200, 0, event_id],
[2000, 0, event_id]]) # List of three arbitrary events
sfreq = 1000 # 采样频率
times = np.arange(0, 10, 0.001) # Use 10000 samples (10s)
sin = np.sin(times * 10) # 乘以 10 缩短周期
cos = np.cos(times * 10)
"""
利用sin和cos创建一个2个通道的700 ms epochs的数据集
只要是(n_epochs, n_channels, n_times)形状的数据,都可以被用来创建
"""
epochs_data = np.array([[sin[:700], cos[:700]],
[sin[1000:1700], cos[1000:1700]],
[sin[1800:2500], cos[1800:2500]]])
ch_names = ['sin', 'cos']
ch_types = ['mag', 'mag']
#info需要通道,采样率,通道类型
info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)
#有原始数据,有信息,有events,还有event的id
epochs = mne.EpochsArray(epochs_data, info=info, events=events,
event_id={'arbitrary': 1})
epochs.plot(scalings='auto' )
plt.show()
取事件段并进行初步的分析
对应的csv文件是把对应的数据给存储为了(288,feature)的特征形式了。
a=epochs.get_data().reshape(288,-1)
a.shape
a.tocsv('eeg.csv')
#当然这样的形式,感觉一定程度
我们取对应的事件的所有数据
epochs_event1 = epochs['769']
epochs_event2 = epochs['770']
epochs_event3 = epochs['771']
epochs_event4 = epochs['772']
可视化分段后的数据
epochs_event1.plot(n_epochs=4)
绘制功率谱图(逐导联)
绘制功率谱图Theta、Alpha和Beta频段
#因为处理的时候,进行了滤波,只剩了8-30Hz的,就没取第一个
bands = [(4, 8, 'Theta'), (8, 12, 'Alpha'), (12, 30, 'Beta')]
epochs_event1.plot_psd_topomap(bands=bands, normalize=True, show=True)
epochs_event3.compute_psd().plot_topomap(bands=bands, normalize=True, show=True)
叠加平均
数据叠加平均
可视化叠加平均后的数据
绘制逐导联的时序信号
import numpy as np
evoked= epochs_event1.average()
evoked.plot()
#绘制地形图
times = np.linspace(2, 5.5, 5)
evoked.plot_topomap(times=times, colorbar=True)
绘制联合图
evoked.plot_joint()
绘制逐导联热力图
evoked.plot_image()
绘制拓扑时序信号图
evoked.plot_topo()
绘制平均所有电极后的ERP
mne.viz.plot_compare_evokeds(evokeds=evoked, combine='mean')
mne.viz.plot_compare_evokeds(evokeds=evoked, picks=['P1', 'Pz', 'P2'], combine='mean')
微状态的处理
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import seaborn as sns
from mne.viz import plot_topomap
# import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 中文无衬线字体(黑体)
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示为方块的问题
def visualize_microstate_distribution(evoked, time_points, n_microstates=4):
"""
可视化微状态的降维分布
参数:
evoked: mne.Evoked对象
time_points: array, 时间点
n_microstates: int, 微状态数量
"""
# 提取数据
data = np.array([extract_timepoint_data(evoked, t) for t in time_points]).T
# 数据标准化和微状态分析
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.T).T
# 执行微状态分析
kmeans = KMeans(n_clusters=n_microstates, random_state=42)
labels = kmeans.fit_predict(data_scaled.T)
# PCA降维
pca = PCA(n_components=2)
data_pca = pca.fit_transform(data_scaled.T)
# t-SNE降维
tsne = TSNE(n_components=2, random_state=42)
data_tsne = tsne.fit_transform(data_scaled.T)
# 创建图形
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# PCA散点图
scatter1 = ax1.scatter(data_pca[:, 0], data_pca[:, 1], c=labels,
cmap='viridis', alpha=0.6)
ax1.set_title('PCA降维的微状态分布')
ax1.set_xlabel('第一主成分')
ax1.set_ylabel('第二主成分')
plt.colorbar(scatter1, ax=ax1, label='微状态类别')
# t-SNE散点图
scatter2 = ax2.scatter(data_tsne[:, 0], data_tsne[:, 1], c=labels,
cmap='viridis', alpha=0.6)
ax2.set_title('t-SNE降维的微状态分布')
ax2.set_xlabel('t-SNE维度1')
ax2.set_ylabel('t-SNE维度2')
plt.colorbar(scatter2, ax=ax2, label='微状态类别')
plt.tight_layout()
return fig, (data_pca, data_tsne, labels)
def plot_typical_microstates(evoked, microstate_patterns, info):
"""
可视化典型微状态地形图
参数:
evoked: mne.Evoked对象
microstate_patterns: array, 微状态模式
info: mne.Info对象,电极位置信息
"""
n_states = len(microstate_patterns)
fig, axes = plt.subplots(1, n_states, figsize=(4*n_states, 4))
if n_states == 1:
axes = [axes]
# 绘制每个微状态的地形图
for idx, pattern in enumerate(microstate_patterns):
plot_topomap(pattern, info, axes=axes[idx], show=False)
axes[idx].set_title(f'微状态 {idx+1}')
plt.tight_layout()
return fig
def analyze_microstate_transitions(labels, time_points):
"""
分析微状态转换
参数:
labels: array, 微状态标签
time_points: array, 时间点
"""
# 计算转换矩阵
transitions = np.zeros((len(np.unique(labels)), len(np.unique(labels))))
for i in range(len(labels)-1):
transitions[labels[i], labels[i+1]] += 1
# 归一化
row_sums = transitions.sum(axis=1)
transitions_norm = transitions / row_sums[:, np.newaxis]
# 可视化转换矩阵
fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(transitions_norm, annot=True, cmap='YlOrRd', ax=ax)
ax.set_title('微状态转换概率矩阵')
ax.set_xlabel('目标微状态')
ax.set_ylabel('起始微状态')
return fig, transitions_norm
# 使用示例
def run_complete_analysis(evoked, time_points, n_microstates=4):
"""
运行完整的微状态分析和可视化
"""
# 执行微状态分析
results = perform_microstate_analysis(evoked, time_points, n_microstates)
# 可视化分布
dist_fig, (pca_data, tsne_data, labels) = visualize_microstate_distribution(
evoked, time_points, n_microstates)
# 可视化典型微状态
topo_fig = plot_typical_microstates(
evoked, results['patterns'], evoked.info)
# 分析转换
trans_fig, trans_matrix = analyze_microstate_transitions(
labels, time_points)
return {
'distribution_plot': dist_fig,
'topography_plot': topo_fig,
'transition_plot': trans_fig,
'pca_data': pca_data,
'tsne_data': tsne_data,
'labels': labels,
'transition_matrix': trans_matrix
}
results = run_complete_analysis(evoked, evoked.times, n_microstates=4)
绘制三维的
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import seaborn as sns
from mne.viz import plot_topomap
plt.rcParams['font.sans-serif'] = ['SimHei'] # 中文无衬线字体(黑体)
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示为方块的问题
def visualize_microstate_distribution_3d(evoked, time_points, n_microstates=4):
"""
可视化微状态的3D降维分布
参数:
evoked: mne.Evoked对象
time_points: array, 时间点
n_microstates: int, 微状态数量
"""
# 提取数据
data = np.array([extract_timepoint_data(evoked, t) for t in time_points]).T
# 数据标准化和微状态分析
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.T).T
# 执行微状态分析
kmeans = KMeans(n_clusters=n_microstates, random_state=42)
labels = kmeans.fit_predict(data_scaled.T)
# PCA降维到3维
pca = PCA(n_components=3)
data_pca = pca.fit_transform(data_scaled.T)
# t-SNE降维到3维
tsne = TSNE(n_components=3, random_state=42)
data_tsne = tsne.fit_transform(data_scaled.T)
# 创建3D图形
fig = plt.figure(figsize=(15, 6))
# PCA 3D散点图
ax1 = fig.add_subplot(121, projection='3d')
scatter1 = ax1.scatter(data_pca[:, 0], data_pca[:, 1], data_pca[:, 2],
c=labels, cmap='viridis', alpha=0.6)
ax1.set_title('PCA 3D降维的微状态分布')
ax1.set_xlabel('PC1')
ax1.set_ylabel('PC2')
ax1.set_zlabel('PC3')
# t-SNE 3D散点图
ax2 = fig.add_subplot(122, projection='3d')
scatter2 = ax2.scatter(data_tsne[:, 0], data_tsne[:, 1], data_tsne[:, 2],
c=labels, cmap='viridis', alpha=0.6)
ax2.set_title('t-SNE 3D降维的微状态分布')
ax2.set_xlabel('t-SNE1')
ax2.set_ylabel('t-SNE2')
ax2.set_zlabel('t-SNE3')
# 添加颜色条
plt.colorbar(scatter1, ax=ax1, label='微状态类别')
plt.colorbar(scatter2, ax=ax2, label='微状态类别')
# 添加交互性:设置可旋转视角
ax1.view_init(elev=20, azim=45)
ax2.view_init(elev=20, azim=45)
plt.tight_layout()
return fig, (data_pca, data_tsne, labels)
def plot_microstate_trajectories_3d(data_3d, labels, time_points):
"""
绘制微状态在3D空间中的时间轨迹
参数:
data_3d: array, 3D降维后的数据
labels: array, 微状态标签
time_points: array, 时间点
"""
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# 绘制散点
scatter = ax.scatter(data_3d[:, 0], data_3d[:, 1], data_3d[:, 2],
c=labels, cmap='viridis', alpha=0.6)
# 添加时间轨迹线
ax.plot(data_3d[:, 0], data_3d[:, 1], data_3d[:, 2],
'k-', alpha=0.3, linewidth=1)
# 在关键点添加时间标记
n_points = len(time_points)
marker_indices = np.linspace(0, n_points-1, 5, dtype=int)
for idx in marker_indices:
ax.text(data_3d[idx, 0], data_3d[idx, 1], data_3d[idx, 2],
f'{time_points[idx]:.2f}s', fontsize=8)
ax.set_title('微状态时间轨迹')
ax.set_xlabel('维度1')
ax.set_ylabel('维度2')
ax.set_zlabel('维度3')
plt.colorbar(scatter, label='微状态类别')
# 设置初始视角
ax.view_init(elev=20, azim=45)
return fig
def run_complete_analysis_3d(evoked, time_points, n_microstates=4):
"""
运行完整的3D微状态分析和可视化
"""
# 执行微状态分析
results = perform_microstate_analysis(evoked, time_points, n_microstates)
# 3D分布可视化
dist_fig, (pca_data, tsne_data, labels) = visualize_microstate_distribution_3d(
evoked, time_points, n_microstates)
# 绘制PCA轨迹图
traj_fig_pca = plot_microstate_trajectories_3d(pca_data, labels, time_points)
# 绘制t-SNE轨迹图
traj_fig_tsne = plot_microstate_trajectories_3d(tsne_data, labels, time_points)
# 可视化典型微状态
topo_fig = plot_typical_microstates(
evoked, results['patterns'], evoked.info)
# 分析转换
trans_fig, trans_matrix = analyze_microstate_transitions(
labels, time_points)
return {
'distribution_3d_plot': dist_fig,
'pca_trajectory_plot': traj_fig_pca,
'tsne_trajectory_plot': traj_fig_tsne,
'topography_plot': topo_fig,
'transition_plot': trans_fig,
'pca_data': pca_data,
'tsne_data': tsne_data,
'labels': labels,
'transition_matrix': trans_matrix
}