1.目标
题主最开始是想做一个音乐情感分类的模型,但是查阅相关文献发现这个范围太大了,音乐情感特征包括文本,音频,甚至有的还有画面,是一个多模态的范畴。所以退而求其次,找了一个接近的语音情感分类来学习。主要是学习所使用的数据集,数据集格式,一些经典的模型,为后续进一步学习打基础。
2.LSTM
找了很久在码云上找到的一个项目,代码结构很清晰,也有注释,用来学习很适合不过。
代码地址:https://github.com/frank330/Emotion_analysis/tree/main
接下来将以LSTM模型为例来讲述语音情感分类模型代码
2.1 代码目录
data文件中存放的是数据集相关文件,result文件中存放的是训练结果,saved.dict文件存放的是训练好的模型。main.py是唯一可执行文件,代码中包含参数设置,数据获取,模型定义,训练设置等。
代码运行界面
2.2 具体代码
# -*- coding: utf-8 -*-
import numpy as np
import pickle as pkl
from tqdm import tqdm
from datetime import timedelta
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import time
import torch
from sklearn import metrics
from sklearn.model_selection import train_test_split
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, accuracy_score, f1_score, recall_score
# 超参数设置
data_path = './data/data.txt' # 数据集
vocab_path = './data/vocab.pkl' # 词表
save_path = './saved_dict/lstm.ckpt' # 模型训练结果
embedding_pretrained = \
torch.tensor(
np.load(
'./data/embedding_Tencent.npz')
["embeddings"].astype('float32'))
# 预训练词向量
embed = embedding_pretrained.size(1) # 词向量维度
dropout = 0.5 # 随机丢弃
num_classes = 2 # 类别数
num_epochs = 30 # epoch数
batch_size = 128 # mini-batch大小
pad_size = 50 # 每句话处理成的长度(短填长切)
learning_rate = 1e-3 # 学习率
hidden_size = 128 # lstm隐藏层
num_layers = 2 # lstm层数
MAX_VOCAB_SIZE = 10000 # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号
def get_data():
tokenizer = lambda x: [y for y in x] # 字级别
vocab = pkl.load(open(vocab_path, 'rb'))
# print('tokenizer',tokenizer)
print('vocab',vocab)
print(f"Vocab size: {len(vocab)}")
train,dev,test = load_dataset(data_path, pad_size, tokenizer, vocab)
return vocab, train, dev, test
def load_dataset(path, pad_size, tokenizer, vocab):
'''
将路径文本文件分词并转为三元组返回
:param path: 文件路径
:param pad_size: 每个序列的大小
:param tokenizer: 转为字级别
:param vocab: 词向量模型
:return: 二元组,含有字ID,标签
'''
contents = []
n=0
with open(path, 'r', encoding='gbk') as f:
# tqdm可以看进度条
for line in tqdm(f):
# 默认删除字符串line中的空格、’\n’、't’等。
lin = line.strip()
if not lin:
continue
# print(lin)
label,content = lin.split(' #### ')
# word_line存储每个字的id
words_line = []
# 分割器,分词每个字
token = tokenizer(content)
# print(token)
# 字的长度
seq_len = len(token)
if pad_size:
# 如果字长度小于指定长度,则填充,否则截断
if len(token) < pad_size:
token.extend([vocab.get(PAD)] * (pad_size - len(token)))
else:
token = token[:pad_size]
seq_len = pad_size
# 将每个字映射为ID
# 如果在词表vocab中有word这个单词,那么就取出它的id;
# 如果没有,就去除UNK(未知词)对应的id,其中UNK表示所有的未知词(out of vocab)都对应该id
for word in token:
words_line.append(vocab.get(word, vocab.get(UNK)))
n+=1
contents.append((words_line, int(label)))
train, X_t = train_test_split(contents, test_size=0.4, random_state=42)
dev,test= train_test_split(X_t, test_size=0.5, random_state=42)
return train,dev,test
# get_data()
class TextDataset(Dataset):
def __init__(self, data):
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
self.x = torch.LongTensor([x[0] for x in data]).to(self.device)
self.y = torch.LongTensor([x[1] for x in data]).to(self.device)
def __getitem__(self,index):
self.text = self.x[index]
self.label = self.y[index]
return self.text, self.label
def __len__(self):
return len(self.x)
# 以上是数据预处理的部分
def get_time_dif(start_time):
"""获取已使用时间"""
end_time = time.time()
time_dif = end_time - start_time
return timedelta(seconds=int(round(time_dif)))
# 定义LSTM模型
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
# 使用预训练的词向量模型,freeze=False 表示允许参数在训练中更新
# 在NLP任务中,当我们搭建网络时,第一层往往是嵌入层,对于嵌入层有两种方式初始化embedding向量,
# 一种是直接随机初始化,另一种是使用预训练好的词向量初始化。
self.embedding = nn.Embedding.from_pretrained(embedding_pretrained, freeze=False)
# bidirectional=True表示使用的是双向LSTM
self.lstm = nn.LSTM(embed, hidden_size, num_layers,
bidirectional=True, batch_first=True, dropout=dropout)
# 因为是双向LSTM,所以层数为config.hidden_size * 2
self.fc = nn.Linear(hidden_size * 2, num_classes)
def forward(self, x):
out = self.embedding(x)
# lstm 的input为[batchsize, max_length, embedding_size],输出表示为 output,(h_n,c_n),
# 保存了每个时间步的输出,如果想要获取最后一个时间步的输出,则可以这么获取:output_last = output[:,-1,:]
out, _ = self.lstm(out)
out = self.fc(out[:, -1, :]) # 句子最后时刻的 hidden state
return out
def get_time_dif(start_time):
"""获取已使用时间"""
end_time = time.time()
time_dif = end_time - start_time
return timedelta(seconds=int(round(time_dif)))
# 权重初始化,默认xavier
# xavier和kaiming是两种初始化参数的方法
def init_network(model, method='xavier', exclude='embedding'):
for name, w in model.named_parameters():
if exclude not in name:
if 'weight' in name:
if method == 'xavier':
nn.init.xavier_normal_(w)
elif method == 'kaiming':
nn.init.kaiming_normal_(w)
else:
nn.init.normal_(w)
elif 'bias' in name:
nn.init.constant_(w, 0)
else:
pass
def plot_acc(train_acc):
sns.set(style='darkgrid')
plt.figure(figsize=(10, 7))
x = list(range(len(train_acc)))
plt.plot(x, train_acc, alpha=0.9, linewidth=2, label='train acc')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.legend(loc='best')
plt.savefig('results/acc.png', dpi=400)
def plot_loss(train_loss):
sns.set(style='darkgrid')
plt.figure(figsize=(10, 7))
x = list(range(len(train_loss)))
plt.plot(x, train_loss, alpha=0.9, linewidth=2, label='train acc')
plt.xlabel('Epoch')
plt.ylabel('loss')
plt.legend(loc='best')
plt.savefig('results/loss.png', dpi=400)
# 定义训练的过程
def train( model, dataloaders):
'''
训练模型
:param model: 模型
:param dataloaders: 处理后的数据,包含trian,dev,test
'''
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_function = torch.nn.CrossEntropyLoss()
dev_best_loss = float('inf')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print("Start Training...\n")
plot_train_acc = []
plot_train_loss = []
for i in range(num_epochs):
# 1,训练循环----------------------------------------------------------------
# 将数据全部取完
# 记录每一个batch
step = 0
train_lossi=0
train_acci = 0
for inputs, labels in dataloaders['train']:
# 训练模式,可以更新参数
model.train()
# print(inputs.shape)
inputs = inputs.to(device)
labels = labels.to(device)
# 梯度清零,防止累加
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_function(outputs, labels)
loss.backward()
optimizer.step()
step += 1
true = labels.data.cpu()
predic = torch.max(outputs.data, 1)[1].cpu()
train_lossi += loss.item()
train_acci += metrics.accuracy_score(true, predic)
# 2,验证集验证----------------------------------------------------------------
dev_acc, dev_loss = dev_eval(model, dataloaders['dev'], loss_function,Result_test=False)
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), save_path)
train_acc = train_acci/step
train_loss = train_lossi/step
plot_train_acc.append(train_acc)
plot_train_loss.append(train_loss)
print("epoch = {} : train_loss = {:.3f}, train_acc = {:.2%}, dev_loss = {:.3f}, dev_acc = {:.2%}".
format(i+1, train_loss, train_acc, dev_loss, dev_acc))
plot_acc(plot_train_acc)
plot_loss(plot_train_loss)
# 3,验证循环----------------------------------------------------------------
model.load_state_dict(torch.load(save_path))
model.eval()
start_time = time.time()
test_acc, test_loss = dev_eval(model, dataloaders['test'], loss_function,Result_test=True)
print('================'*8)
print('test_loss: {:.3f} test_acc: {:.2%}'.format(test_loss, test_acc))
def result_test(real, pred):
cv_conf = confusion_matrix(real, pred)
acc = accuracy_score(real, pred)
precision = precision_score(real, pred, average='micro')
recall = recall_score(real, pred, average='micro')
f1 = f1_score(real, pred, average='micro')
patten = 'test: acc: %.4f precision: %.4f recall: %.4f f1: %.4f'
print(patten % (acc, precision, recall, f1,))
labels11 = ['negative', 'active']
disp = ConfusionMatrixDisplay(confusion_matrix=cv_conf, display_labels=labels11)
disp.plot(cmap="Blues", values_format='')
plt.savefig("results/reConfusionMatrix.tif", dpi=400)
# 模型评估
def dev_eval(model, data, loss_function,Result_test=False):
'''
:param model: 模型
:param data: 验证集集或者测试集的数据
:param loss_function: 损失函数
:return: 损失和准确率
'''
model.eval()
loss_total = 0
predict_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad():
for texts, labels in data:
outputs = model(texts)
loss = loss_function(outputs, labels)
loss_total += loss.item()
labels = labels.data.cpu().numpy()
predic = torch.max(outputs.data, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, labels)
predict_all = np.append(predict_all, predic)
acc = metrics.accuracy_score(labels_all, predict_all)
if Result_test:
result_test(labels_all, predict_all)
else:
pass
return acc, loss_total / len(data)
if __name__ == '__main__':
# 设置随机数种子,保证每次运行结果一致,不至于不能复现模型
np.random.seed(1)
torch.manual_seed(1)
torch.cuda.manual_seed_all(1)
torch.backends.cudnn.deterministic = True # 保证每次结果一样
start_time = time.time()
print("Loading data...")
vocab, train_data, dev_data, test_data = get_data()
dataloaders = {
'train': DataLoader(TextDataset(train_data), batch_size, shuffle=True),
'dev': DataLoader(TextDataset(dev_data), batch_size, shuffle=True),
'test': DataLoader(TextDataset(test_data), batch_size, shuffle=True)
}
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = Model().to(device)
init_network(model)
train(model, dataloaders)
2.3 代码结构
超参数设置 数据预处理 LSTM模型定义 权重初始化 训练脚本设置 模型评估 main主函数
2.3.1 超参数设置
# 超参数设置
data_path = './data/data.txt' # 数据集
vocab_path = './data/vocab.pkl' # 词表
save_path = './saved_dict/lstm.ckpt' # 模型训练结果
embedding_pretrained = \
torch.tensor(
np.load(
'./data/embedding_Tencent.npz')
["embeddings"].astype('float32'))
# 预训练词向量
embed = embedding_pretrained.size(1) # 词向量维度
dropout = 0.5 # 随机丢弃
num_classes = 2 # 类别数
num_epochs = 30 # epoch数
batch_size = 128 # mini-batch大小
pad_size = 50 # 每句话处理成的长度(短填长切)
learning_rate = 1e-3 # 学习率
hidden_size = 128 # lstm隐藏层
num_layers = 2 # lstm层数
MAX_VOCAB_SIZE = 10000 # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号
(1)embedding_pretrained = \ torch.tensor(
np.load( './data/embedding_Tencent.npz') ["embeddings"].astype('float32'))
embedding_pretrained是一个预训练的嵌入矩阵,它是通过加载Tencent AI Lab发布的中文词向量进行初始化的。这个嵌入矩阵可以用于自然语言处理任务,如文本分类、命名实体识别等。它的维度是根据预训练模型的设置而定,通常是一个较大的矩阵,每一行代表一个词的向量表示。
加载过程中,首先使用np.load函数加载了一个.npz文件,该文件包含了预训练的词向量。然后通过[“embeddings”]索引获取到具体的词向量矩阵。最后,将该矩阵转换为torch.tensor,并指定数据类型为float32。
(2)什么是嵌入矩阵?
嵌入矩阵是在自然语言处理(NLP)中常用的一种技术,用于将离散的词或字符转换为连续的向量表示。它是一种将高维离散数据映射到低维连续空间的方法。
在NLP任务中,我们通常使用词嵌入(word embedding)来表示单词。词嵌入是一种将单词映射到实数向量的技术,它可以捕捉到单词之间的语义和语法关系。嵌入矩阵是用来存储这些词嵌入的矩阵,其中每一行对应一个单词的向量表示。
嵌入矩阵的大小通常由两个参数确定:词汇表大小(vocabulary size)和嵌入维度(embedding dimension)。词汇表大小表示有多少个不同的单词需要进行嵌入,而嵌入维度则表示每个单词的向量长度。
通过嵌入矩阵,我们可以将文本中的每个单词转换为对应的向量表示,从而方便进行后续的机器学习或深度学习任务,如文本分类、情感分析等。
(3) 什么是词嵌入?
词嵌入(Word Embedding)是一种将词语映射到实数向量空间的技术,它在自然语言处理领域中被广泛应用。词嵌入的目标是将词语的语义信息编码为向量表示,使得具有相似语义的词在向量空间中距离较近。通过词嵌入,我们可以将文本数据转化为计算机可以理解和处理的形式。
常见的词嵌入模型有Word2Vec、GloVe和FastText等。这些模型通过分析大规模文本语料库中的上下文关系来学习词语的向量表示。具体而言,这些模型会考虑一个词语周围的上下文词语,并尝试预测该词语出现的概率。通过多次迭代训练,模型会学习到每个词语的向量表示,使得具有相似上下文的词在向量空间中距离较近。
词嵌入的应用非常广泛,例如在文本分类、情感分析、机器翻译和信息检索等任务中,可以利用词嵌入来表示文本数据,并进行进一步的分析和处理。
2.3.2数据预处理
(1)函数 get_data()
def get_data():
tokenizer = lambda x: [y for y in x] # 字级别
vocab = pkl.load(open(vocab_path, 'rb'))
# print('tokenizer',tokenizer)
print('vocab',vocab)
print(f"Vocab size: {len(vocab)}")
train,dev,test = load_dataset(data_path, pad_size, tokenizer, vocab)
return vocab, train, dev, test
这段代码是一个函数
get_data()
的实现。函数的功能是加载数据集并返回相关的数据。首先,函数内部定义了一个
tokenizer
,它是一个匿名函数,将输入的字符串按字符进行切分。然后,通过pkl.load()
函数加载了一个名为vocab_path
的文件,得到了一个词汇表vocab
。接着,打印了词汇表的大小。接下来,调用了
load_dataset()
函数,传入了数据路径data_path
、填充大小pad_size
、刚刚定义的tokenizer
和词汇表vocab
作为参数。这个函数的作用是根据给定的数据路径和词汇表,加载训练集、验证集和测试集。最后,函数返回了词汇表
vocab
以及加载的训练集、验证集和测试集。
打印输出的内容
(2)load_dataset函数
def load_dataset(path, pad_size, tokenizer, vocab):
'''
将路径文本文件分词并转为三元组返回
:param path: 文件路径
:param pad_size: 每个序列的大小
:param tokenizer: 转为字级别
:param vocab: 词向量模型
:return: 二元组,含有字ID,标签
'''
contents = []
n=0
with open(path, 'r', encoding='gbk') as f:
# tqdm可以看进度条
for line in tqdm(f):
# 默认删除字符串line中的空格、’\n’、't’等。
lin = line.strip()
if not lin:
continue
# print(lin)
label,content = lin.split(' #### ')
# word_line存储每个字的id
words_line = []
# 分割器,分词每个字
token = tokenizer(content)
# print(token)
# 字的长度
seq_len = len(token)
if pad_size:
# 如果字长度小于指定长度,则填充,否则截断
if len(token) < pad_size:
token.extend([vocab.get(PAD)] * (pad_size - len(token)))
else:
token = token[:pad_size]
seq_len = pad_size
# 将每个字映射为ID
# 如果在词表vocab中有word这个单词,那么就取出它的id;
# 如果没有,就去除UNK(未知词)对应的id,其中UNK表示所有的未知词(out of vocab)都对应该id
for word in token:
words_line.append(vocab.get(word, vocab.get(UNK)))
n+=1
contents.append((words_line, int(label)))
train, X_t = train_test_split(contents, test_size=0.4, random_state=42)
dev,test= train_test_split(X_t, test_size=0.5, random_state=42)
return train,dev,test
1)contents = [] ;n=0 :定义变量列表contents和int变量n,设初始值为0
2) with open(path, 'r', encoding='gbk') as f:
# tqdm可以看进度条
for line in tqdm(f):
# 默认删除字符串line中的空格、’\n’、't’等。
lin = line.strip()
if not lin:
continue
# print(lin)
label,content = lin.split(' #### ')
# word_line存储每个字的id
words_line = []
# 分割器,分词每个字
token = tokenizer(content)
# print(token)
# 字的长度
seq_len = len(token)
使用
with open(path, 'r', encoding='gbk') as f:
语句打开一个文件,并指定编码为GBK。接下来,使用tqdm
库对文件进行迭代,每次读取一行内容。然后,使用strip()
方法去除字符串中的空格、换行符等特殊字符。如果字符串为空,则跳过该行。接着,使用split()
方法将字符串按照####
进行分割,得到标签和内容
3) if pad_size:
# 如果字长度小于指定长度,则填充,否则截断
if len(token) < pad_size:
token.extend([vocab.get(PAD)] * (pad_size - len(token)))
else:
token = token[:pad_size]
seq_len = pad_size
# 将每个字映射为ID
# 如果在词表vocab中有word这个单词,那么就取出它的id;
# 如果没有,就去除UNK(未知词)对应的id,其中UNK表示所有的未知词(out of vocab)都对应该id
for word in token:
words_line.append(vocab.get(word, vocab.get(UNK)))
n+=1
contents.append((words_line, int(label)))
对内容进行分词处理,将每个字转换为对应的id,并存储在
words_line
列表中。如果指定了pad_size
(填充大小),则根据指定大小进行填充或截断处理。最后,将处理后的内容和标签添加到contents
列表中。
4)train, X_t = train_test_split(contents, test_size=0.4, random_state=42) dev,test= train_test_split(X_t, test_size=0.5, random_state=42)
train_test_split是一个用于将数据集划分为训练集和测试集的函数。它的作用是将给定的数据集按照指定的比例进行划分,并返回划分后的训练集和测试集。
在你提供的代码中,train_test_split函数被用来将contents数据集划分为三个部分:train、dev和test。具体的划分方式如下:
首先,使用train_test_split函数将contents数据集按照test_size参数指定的比例(这里是0.4)进行划分,其中random_state参数用于设置随机种子,保证每次划分结果的一致性。划分后,得到的训练集和测试集被赋值给变量X_t。
接着,再次使用train_test_split函数将X_t数据集按照test_size参数指定的比例(这里是0.5)进行划分,同样使用random_state参数设置随机种子。划分后,得到的训练集和测试集被赋值给变量dev和test。
总结一下,train_test_split函数的作用是将给定的数据集按照指定的比例进行划分,并返回划分后的训练集和测试集。在你提供的代码中,通过两次调用train_test_split函数,将contents数据集划分为了train、dev和test三个部分。
(3)自定义类 TextDataset
class TextDataset(Dataset):
def __init__(self, data):
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
self.x = torch.LongTensor([x[0] for x in data]).to(self.device)
self.y = torch.LongTensor([x[1] for x in data]).to(self.device)
def __getitem__(self,index):
self.text = self.x[index]
self.label = self.y[index]
return self.text, self.label
def __len__(self):
return len(self.x)
class TextDataset是一个自定义的数据集类,继承自torch.utils.data.Dataset。它用于加载文本数据,并将其转换为模型可以处理的格式。
在初始化方法__init__中,它接受一个参数data,该参数是一个包含文本和标签的列表。首先,它通过torch.cuda.is_available()判断是否有可用的GPU,如果有,则将self.device设置为torch.device('cuda'),否则设置为torch.device('cpu')。接下来,它将文本和标签分别存储在self.x和self.y中,并将它们转换为torch.LongTensor类型,并将其移动到self.device上。
在__getitem__方法中,它接受一个索引index,并从self.x和self.y中获取对应索引的文本和标签。然后,将文本和标签作为元组返回。
在__len__方法中,它返回self.x的长度,即数据集的样本数量。
2.3.3 LSTM模型定义
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
# 使用预训练的词向量模型,freeze=False 表示允许参数在训练中更新
# 在NLP任务中,当我们搭建网络时,第一层往往是嵌入层,对于嵌入层有两种方式初始化embedding向量,
# 一种是直接随机初始化,另一种是使用预训练好的词向量初始化。
self.embedding = nn.Embedding.from_pretrained(embedding_pretrained, freeze=False)
# bidirectional=True表示使用的是双向LSTM
self.lstm = nn.LSTM(embed, hidden_size, num_layers,
bidirectional=True, batch_first=True, dropout=dropout)
# 因为是双向LSTM,所以层数为config.hidden_size * 2
self.fc = nn.Linear(hidden_size * 2, num_classes)
def forward(self, x):
out = self.embedding(x)
# lstm 的input为[batchsize, max_length, embedding_size],输出表示为 output,(h_n,c_n),
# 保存了每个时间步的输出,如果想要获取最后一个时间步的输出,则可以这么获取:output_last = output[:,-1,:]
out, _ = self.lstm(out)
out = self.fc(out[:, -1, :]) # 句子最后时刻的 hidden state
return out
这段代码是一个PyTorch中的神经网络模型类,继承自nn.Module。该模型包含了一个嵌入层(embedding)、一个双向LSTM层(lstm)和一个全连接层(fc)。
在初始化方法__init__中,首先调用了父类nn.Module的初始化方法,然后定义了模型的各个层。其中,self.embedding是一个嵌入层,使用预训练的词向量模型进行初始化,并设置freeze=False表示允许参数在训练中更新。self.lstm是一个双向LSTM层,输入为嵌入层的输出,输出为LSTM的输出和最后一个时间步的隐藏状态。self.fc是一个全连接层,将LSTM的输出映射到最终的类别数。
在前向传播方法forward中,首先将输入x通过嵌入层进行词嵌入,然后将词嵌入结果输入到LSTM层中得到输出。最后,将LSTM的输出的最后一个时间步的隐藏状态通过全连接层映射到最终的类别数,并返回结果。
(1)nn.Moudel
nn.Module是PyTorch中的一个基类,用于构建神经网络模型。它提供了一些常用的方法和属性,方便我们定义和管理神经网络的结构。
nn.Module的主要作用是将神经网络的各个层组织起来,并提供了一些方法来管理这些层。通过继承nn.Module类,我们可以定义自己的神经网络模型,并在其中定义各个层的结构和操作。
在nn.Module中,我们可以使用add_module()方法来添加子模块,使用forward()方法来定义前向传播的逻辑。nn.Module还提供了一些其他常用的方法,如parameters()用于获取模型的可学习参数,zero_grad()用于将参数的梯度置零等。
使用nn.Module构建神经网络模型的好处是可以方便地管理和调用各个层,同时也可以利用PyTorch提供的自动求导功能进行反向传播和参数更新。
(2)super(Model, self).__init__()
super(Model, self).init()是Python中用于调用父类构造函数的一种方式。它在子类的构造函数中使用,以便在子类中可以继承并初始化父类的属性和方法。
具体来说,super()函数返回一个临时对象,该对象绑定了父类的方法。通过调用该对象的方法,可以在子类中调用父类的方法。在调用父类构造函数时,通常使用super(Model, self).init()来确保父类的构造函数被正确地调用。
这种方式的好处是可以避免在子类中重复编写父类的构造函数代码,同时确保父类的构造函数被正确地执行。这对于多层继承的情况尤为重要,因为使用super()可以按照方法解析顺序(MRO)调用所有父类的构造函数。
(3)def forward(self, x):
在Python中,
def forward(self, x):
是一个函数定义的语法。通常,这是在类中定义的一个方法,用于执行某些操作或计算。在深度学习中,
forward
方法通常用于定义神经网络模型的前向传播过程。在这个方法中,输入x
会经过一系列的网络层和激活函数,最终得到输出结果。具体来说,
forward
方法接收一个输入x
作为参数,并返回一个输出结果。在这个方法中,你可以使用各种神经网络层(如全连接层、卷积层等)和激活函数(如ReLU、Sigmoid等)来对输入进行处理和转换。
2.3.4 权重初始化
def init_network(model, method='xavier', exclude='embedding'):
for name, w in model.named_parameters():
if exclude not in name:
if 'weight' in name:
if method == 'xavier':
nn.init.xavier_normal_(w)
elif method == 'kaiming':
nn.init.kaiming_normal_(w)
else:
nn.init.normal_(w)
elif 'bias' in name:
nn.init.constant_(w, 0)
else:
pass
这段代码是一个初始化神经网络权重和偏置的函数。它接受一个模型对象和一些可选参数,其中
method
参数用于指定初始化方法,默认为’xavier’,exclude
参数用于指定不需要初始化的部分,默认为’embedding’。函数通过遍历模型的参数来进行初始化。对于权重参数,如果参数名中不包含
exclude
字符串,则根据method
参数选择相应的初始化方法,分别为xavier初始化、kaiming初始化和普通初始化。对于偏置参数,使用常数0进行初始化。如果参数名既不包含’weight’也不包含’bias’,则不进行任何操作。
2.3.5 训练脚本设置
def train( model, dataloaders):
'''
训练模型
:param model: 模型
:param dataloaders: 处理后的数据,包含trian,dev,test
'''
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_function = torch.nn.CrossEntropyLoss()
dev_best_loss = float('inf')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print("Start Training...\n")
plot_train_acc = []
plot_train_loss = []
for i in range(num_epochs):
# 1,训练循环----------------------------------------------------------------
# 将数据全部取完
# 记录每一个batch
step = 0
train_lossi=0
train_acci = 0
for inputs, labels in dataloaders['train']:
# 训练模式,可以更新参数
model.train()
# print(inputs.shape)
inputs = inputs.to(device)
labels = labels.to(device)
# 梯度清零,防止累加
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_function(outputs, labels)
loss.backward()
optimizer.step()
step += 1
true = labels.data.cpu()
predic = torch.max(outputs.data, 1)[1].cpu()
train_lossi += loss.item()
train_acci += metrics.accuracy_score(true, predic)
# 2,验证集验证----------------------------------------------------------------
dev_acc, dev_loss = dev_eval(model, dataloaders['dev'], loss_function,Result_test=False)
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), save_path)
train_acc = train_acci/step
train_loss = train_lossi/step
plot_train_acc.append(train_acc)
plot_train_loss.append(train_loss)
print("epoch = {} : train_loss = {:.3f}, train_acc = {:.2%}, dev_loss = {:.3f}, dev_acc = {:.2%}".
format(i+1, train_loss, train_acc, dev_loss, dev_acc))
plot_acc(plot_train_acc)
plot_loss(plot_train_loss)
# 3,验证循环----------------------------------------------------------------
model.load_state_dict(torch.load(save_path))
model.eval()
start_time = time.time()
test_acc, test_loss = dev_eval(model, dataloaders['test'], loss_function,Result_test=True)
print('================'*8)
print('test_loss: {:.3f} test_acc: {:.2%}'.format(test_loss, test_acc))
(1)优化器设置,参数定义
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
:这行代码定义了优化器,使用Adam优化算法来更新模型的参数。model.parameters()
表示需要优化的参数,lr=learning_rate
表示学习率。
loss_function = torch.nn.CrossEntropyLoss()
:这行代码定义了损失函数,使用交叉熵损失函数来计算模型的预测值与真实值之间的差异。
dev_best_loss = float('inf')
:这行代码初始化了一个变量dev_best_loss
,用于保存最佳的验证集损失值。初始值设置为正无穷大。
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
:这行代码根据是否有可用的GPU来选择设备,如果有GPU则使用cuda,否则使用cpu。
print("Start Training...\n")
:这行代码打印出开始训练的提示信息。
plot_train_acc = []
:这行代码初始化了一个空列表plot_train_acc
,用于保存训练过程中的准确率。
plot_train_loss = []
:这行代码初始化了一个空列表plot_train_loss
,用于保存训练过程中的损失值。
(2)模型训练
这段代码是一个训练循环,用于对模型进行训练。下面是对代码的解释:
1. `for i in range(num_epochs):`:这是一个循环,用于指定训练的轮数,`num_epochs`表示总共的训练轮数。
2. `step = 0`:初始化步数,用于记录每一个batch的训练进度。
3. `train_lossi=0`和`train_acci = 0`:初始化训练损失和训练准确率,用于记录每一个batch的训练结果。
4. `for inputs, labels in dataloaders['train']:`:这是一个数据迭代器,用于遍历训练数据集。`inputs`表示输入数据,`labels`表示对应的标签。
5. `model.train()`:将模型设置为训练模式,这样可以更新模型的参数。
6. `inputs = inputs.to(device)`和`labels = labels.to(device)`:将输入数据和标签移动到指定的设备上进行计算,比如GPU。
7. `optimizer.zero_grad()`:梯度清零,防止梯度累加。
8. `outputs = model(inputs)`:将输入数据输入到模型中进行前向传播,得到模型的输出。
9. `loss = loss_function(outputs, labels)`:计算模型输出与真实标签之间的损失。
10. `loss.backward()`:反向传播,计算梯度。
11. `optimizer.step()`:更新模型的参数。
12. `step += 1`:更新步数。
13. `true = labels.data.cpu()`和`predic = torch.max(outputs.data, 1).cpu()`:将标签和预测结果移动到CPU上,并转换为numpy数组。
14. `train_lossi += loss.item()`和`train_acci += metrics.accuracy_score(true, predic)`:累加训练损失和训练准确率。
(3)验证集验证
dev_acc, dev_loss = dev_eval(model, dataloaders['dev'], loss_function,Result_test=False)
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), save_path)
train_acc = train_acci/step
train_loss = train_lossi/step
plot_train_acc.append(train_acc)
plot_train_loss.append(train_loss)
print("epoch = {} : train_loss = {:.3f}, train_acc = {:.2%}, dev_loss = {:.3f}, dev_acc = {:.2%}".
format(i+1, train_loss, train_acc, dev_loss, dev_acc))
plot_acc(plot_train_acc)
plot_loss(plot_train_loss)
这段代码是一个训练过程中的评估部分。首先,使用
dev_eval
函数对模型在验证集上进行评估,得到dev_acc
和dev_loss
。然后,通过比较当前的dev_loss
与之前的最佳dev_loss
,如果当前的dev_loss
更小,则更新最佳dev_loss
并保存模型参数到save_path
。接下来,计算训练集上的准确率和损失,并将其存储到train_acc
和train_loss
中。最后,打印出当前的训练损失、训练准确率、验证损失和验证准确率。
(4) 测试集测试
model.load_state_dict(torch.load(save_path)) model.eval() start_time = time.time() test_acc, test_loss = dev_eval(model, dataloaders['test'], loss_function,Result_test=True) print('================'*8) print('test_loss: {:.3f} test_acc: {:.2%}'.format(test_loss, test_acc))
model.load_state_dict(torch.load(save_path))
是一个用于加载模型参数的函数。它将保存在save_path
路径下的模型参数加载到model
中。这个函数通常在训练过程中使用,当我们需要保存和加载模型的参数时,可以使用这个函数。
model.eval()
是一个用于将模型设置为评估模式的函数。在评估模式下,模型会关闭一些具有随机性的操作,例如dropout和batch normalization。这样可以确保在测试集上的结果是稳定和可重复的。
start_time = time.time()
是用于记录代码执行开始时间的语句。
test_acc, test_loss = dev_eval(model, dataloaders['test'], loss_function, Result_test=True)
是调用了一个名为dev_eval
的函数来对模型在测试集上进行评估。它接受四个参数:model
表示要评估的模型,dataloaders['test']
表示测试集的数据加载器,loss_function
表示损失函数,Result_test=True
表示要返回测试集上的结果。最后两行代码用于打印测试集上的损失和准确率。
2.3.6 模型评估
def dev_eval(model, data, loss_function,Result_test=False):
'''
:param model: 模型
:param data: 验证集集或者测试集的数据
:param loss_function: 损失函数
:return: 损失和准确率
'''
model.eval()
loss_total = 0
predict_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad():
for texts, labels in data:
outputs = model(texts)
loss = loss_function(outputs, labels)
loss_total += loss.item()
labels = labels.data.cpu().numpy()
predic = torch.max(outputs.data, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, labels)
predict_all = np.append(predict_all, predic)
acc = metrics.accuracy_score(labels_all, predict_all)
if Result_test:
result_test(labels_all, predict_all)
else:
pass
return acc, loss_total / len(data)
这是一个用于模型评估的函数。它接受一个模型、数据集、损失函数作为输入,并返回损失和准确率。
函数首先将模型设置为评估模式(model.eval()),然后初始化损失总和(loss_total)和空的预测结果数组(predict_all)和标签数组(labels_all)。
接下来,使用torch.no_grad()上下文管理器,对数据集中的每个样本进行循环。对于每个样本,将输入数据传递给模型并计算输出。然后使用损失函数计算损失,并将其累加到损失总和中。同时,将标签转换为numpy数组,并将模型的预测结果转换为numpy数组。将标签和预测结果分别添加到labels_all和predict_all数组中。
最后,使用metrics.accuracy_score计算准确率,并根据Result_test参数决定是否调用result_test函数进行结果测试。
2.3.7 main主函数
if __name__ == '__main__':
# 设置随机数种子,保证每次运行结果一致,不至于不能复现模型
np.random.seed(1)
torch.manual_seed(1)
torch.cuda.manual_seed_all(1)
torch.backends.cudnn.deterministic = True # 保证每次结果一样
start_time = time.time()
print("Loading data...")
vocab, train_data, dev_data, test_data = get_data()
dataloaders = {
'train': DataLoader(TextDataset(train_data), batch_size, shuffle=True),
'dev': DataLoader(TextDataset(dev_data), batch_size, shuffle=True),
'test': DataLoader(TextDataset(test_data), batch_size, shuffle=True)
}
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = Model().to(device)
init_network(model)
train(model, dataloaders)
(1)if name == ‘main’
if name == ‘main’ 是Python中的一个常用语句,它用于判断当前模块是否是直接被运行的,而不是被导入的。当一个Python文件被直接运行时,其__name__属性的值为’main’,而当它被导入时,__name__属性的值为模块名。
(2)设置随机种子
# 设置随机数种子,保证每次运行结果一致,不至于不能复现模型
np.random.seed(1)
torch.manual_seed(1)
torch.cuda.manual_seed_all(1)
torch.backends.cudnn.deterministic = True # 保证每次结果一样
(3)加载数据
start_time = time.time()
print("Loading data...")
vocab, train_data, dev_data, test_data = get_data()
dataloaders = {
'train': DataLoader(TextDataset(train_data), batch_size, shuffle=True),
'dev': DataLoader(TextDataset(dev_data), batch_size, shuffle=True),
'test': DataLoader(TextDataset(test_data), batch_size, shuffle=True)
}
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)
这段代码是一个简单的数据加载和处理的过程。首先,通过调用
get_data()
函数获取数据集的词汇表(vocab)以及训练集(train_data)、验证集(dev_data)和测试集(test_data)。然后,使用DataLoader
类将数据集封装成可迭代的数据加载器,其中训练集、验证集和测试集分别对应不同的数据加载器。最后,通过调用get_time_dif()
函数计算代码执行的时间,并将结果打印出来。
(4)调用train函数进行训练
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = Model().to(device)
init_network(model)
train(model, dataloaders)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
这行代码的作用是检查是否有可用的GPU,如果有,则将设备设置为CUDA,否则设置为CPU。这样可以利用GPU的并行计算能力来加速模型训练。
model = Model().to(device)
这行代码创建了一个模型对象,并将其移动到之前设置好的设备上。这样可以确保模型在正确的设备上进行计算。
init_network(model)
这行代码是一个自定义函数,用于初始化模型的权重和其他参数。这个函数可能会根据具体的需求进行不同的操作,比如随机初始化权重、加载预训练的权重等。
train(model, dataloaders)
这行代码是一个自定义函数,用于训练模型。它接受一个模型对象和一个数据加载器作为输入,并在训练过程中对模型进行更新。具体的训练过程可能包括前向传播、计算损失、反向传播、优化器更新等步骤。
最终结果