目录
- 机器翻译
- Seq2seq
- 编码器-解码器细节
- 训练
- 衡量生成序列的好坏的BLEU
- 总结
- 序列到序列学习
- 实现循环神经网络编码器
- 解码器
- 通过零值化屏蔽不相关的项
- 该部分总代码
- 通过扩展softmax交叉熵损失函数来遮蔽不相关的预测
- 训练
- 预测
- BLEU的代码实现
- 该部分总代码
机器翻译
给定一个源语言的句子,自动翻译成目标语言。
这两个句子可以有不用的长度。
Seq2seq
把上一个时刻的翻译作为下一个时刻的输入。
隐藏状态也过来。
编码器-解码器细节
编码器是没有输出的RNN
编码器最后时间步的隐状态用作解码器的初始隐状态。
把最后一层的RNN的最后那个时刻的那个隐藏状态(也就是它的输出)和句子的Embedding作为你的输入。
训练
训练时解码器使用目标句子作为输入。
推理
在训练的时候,encoder是一样的、decoder不一样。
decoder在训练的时候是知道目标句子的。所以它的输入是用的真正的那个目标句子里面的输入。
所以这里翻译错了,也不影响,因为给下一个时刻的输入还是正确的输入。
这样就是做时序序列预测的时候,就不会说预测长了就很难。
而推理的时候,没有真正的句子,所以每一个时刻只能用上一个时刻的输出作为当前时刻的输入,然后不断的预测下去。
衡量生成序列的好坏的BLEU
现在要预测一个句子了,之前是预测一个词。现在预测的句子可能和真实的句子长度就不一样了,然后怎么样去衡量两个句子的好坏?用BLEU来衡量。
p
n
p_n
pn是预测中所有n-gram的精度
标签序列A B C D E F和预测序列A B B C D
有
p
1
p_1
p1=
4
5
\frac{4}{5}
54,
p
2
p_2
p2=
3
4
\frac{3}{4}
43 ,
p
3
p_3
p3=
1
3
\frac{1}{3}
31,
p
4
p_4
p4=0
解释:
p
1
p_1
p1为预测的总共有五个正确的为四个
解释:
p
2
p_2
p2为预测的总共有四个正确的为五个
两两配对
预测的AB找真实的AB,有一个
预测的BB找真实的,没有
预测的BC找真实的BC,有一个
预测的CD找真实的CD,有一个
总共三个。
解释:
p
3
p_3
p3为预测的总共有三个正确的为两个
三三配对
预测的ABB找真实的,没有
预测的BBC找真实的,没有
预测的BCD找真实的BCD,有一个
解释:
p
4
p_4
p4为预测的总共有零个正确的
四四配对
预测的ABCD找真实的,没有
预测的BBCD找真实的,没有
BLEU定义:
总结
Seq2seq从一个句子生成另一个句子
编码器和解码器都是RNN
将编码器最后时间隐状态来初始解码器隐状态来完成信息传递
常用BLEU来衡量生成序列的好坏
序列到序列学习
import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
import os
实现循环神经网络编码器
import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
import os
# 实现循环神经网络编码器
class Seq2SeqEncoder(d2l.Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
# 词汇表大小、词嵌入向量维度大小、RNN隐藏层的单元数、RNN层数、丢弃率为0、接收任意数量的关键字参数
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 创建一个嵌入层,用于将输入的单词索引转换为词嵌入向量
self.embedding = nn.Embedding(vocab_size, embed_size)
# 创建一个GRU循环神经网络模型
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout)
def forward(self, X, *args):
# X的形状(批量大小,序列长度),将输入序列进行词嵌入操作后,X的形状为(批量大小,序列长度,词嵌入向量维度大小)
# 在本文中X的形状为(4,7)经过词嵌入后X的形状为(4,7,8)
X = self.embedding(X)
# 将输入序列的维度进行转置,以适应RNN模型的输入格式要求
# PyTorch中的张量默认形状为(批量大小, 序列长度, 词嵌入向量维度大小)转为RNN期望的形状(序列长度, 批量大小, 词嵌入向量维度大小)
# 将形状从 (4, 7, 8) 转换为 (7, 4, 8)
X = X.permute(1, 0, 2)
# 将转置后的输入序列输入到RNN模型中,得到输出和最终的隐藏状态
# 输出 output 的形状为 (7, 4, 16),隐藏状态 state 的形状为 (2, 4, 16)。
# 输出的形状为(序列长度, 批量大小, 隐藏状态维度),隐藏状态的形状为(层数, 批量大小, 隐藏状态维度)
# 7 个时间步,每个时间步有 4 个样本,每个样本的隐藏状态维度为 16。2 层,每层有 4 个样本,每个样本的隐藏状态维度为 16
output, state = self.rnn(X)
# 返回输出和最终隐藏状态
return output, state
# 创建一个Seq2SeqEncoder对象,设置词汇表大小为10,嵌入维度为8,隐藏状态维度为16,层数为2
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
# 将编码器设置为评估模式,这将影响一些层的行为,如dropout层
encoder.eval()
# 创建一个形状为(4, 7)的输入张量X,用于模拟4个样本,每个样本有7个单词
X = torch.zeros((4, 7), dtype=torch.long)
output, state = encoder(X)
print(output.shape)
# 打印最终隐藏状态的形状
print(state.shape)
解码器
import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
import os
# 实现循环神经网络编码器
class Seq2SeqEncoder(d2l.Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
# 词汇表大小、词嵌入向量维度大小、RNN隐藏层的单元数、RNN层数、丢弃率为0、接收任意数量的关键字参数
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 创建一个嵌入层,用于将输入的单词索引转换为词嵌入向量
self.embedding = nn.Embedding(vocab_size, embed_size)
# 创建一个GRU循环神经网络模型
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout)
def forward(self, X, *args):
# X的形状(批量大小,序列长度),将输入序列进行词嵌入操作后,X的形状为(批量大小,序列长度,词嵌入向量维度大小)
# 在本文中X的形状为(4,7)经过词嵌入后X的形状为(4,7,8)
X = self.embedding(X)
# 将输入序列的维度进行转置,以适应RNN模型的输入格式要求
# PyTorch中的张量默认形状为(批量大小, 序列长度, 词嵌入向量维度大小)转为RNN期望的形状(序列长度, 批量大小, 词嵌入向量维度大小)
# 将形状从 (4, 7, 8) 转换为 (7, 4, 8)
X = X.permute(1, 0, 2)
# 将转置后的输入序列输入到RNN模型中,得到输出和最终的隐藏状态
# 输出 output 的形状为 (7, 4, 16),隐藏状态 state 的形状为 (2, 4, 16)。
# 输出的形状为(序列长度, 批量大小, 隐藏状态维度),隐藏状态的形状为(层数, 批量大小, 隐藏状态维度)
# 7 个时间步,每个时间步有 4 个样本,每个样本的隐藏状态维度为 16。2 层,每层有 4 个样本,每个样本的隐藏状态维度为 16
output, state = self.rnn(X)
# 返回输出和最终隐藏状态
return output, state
# 解码器
class Seq2SeqDecoder(d2l.Decoder):
"""用于序列到序列学习的循环神经网络解码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
# 创建一个GRU循环神经网络模型
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout)
# 创建一个全连接层,用于将隐藏状态映射到词汇表大小的向量
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, *args):
# 返回编码器输出的最终隐藏状态作为解码器的初始隐藏状态
return enc_outputs[1]
def forward(self, X, state):
# 将输入序列进行词嵌入操作,并进行维度转置
# 输入的X的形状为(4,7)经过词嵌入层后,X的形状为(4,7,8)然后转置后形状为(7,4,8)
X = self.embedding(X).permute(1, 0, 2)
# 将编码器的最终隐藏状态进行复制,用于和每个解码器输入进行拼接
# state[-1]表示取最后一层的隐藏状态,X.shape[0] 表示输入张量 X 的第一个维度(序列长度),repeat 方法用于沿各个维度重复张量的值
# 第一维重复 X.shape[0] 次,即 7 次。
# 第二维重复 1 次。
# 第三维重复 1 次。
# [
# [ [4, 16] ],
# [ [4, 16] ],
# [ [4, 16] ],
# [ [4, 16] ],
# [ [4, 16] ],
# [ [4, 16] ],
# [ [4, 16] ]
# ]
# ⭐state[-1] 的形状为 (4, 16)复制后的形状为 (7,4,16)
context = state[-1].repeat(X.shape[0], 1, 1)
# 将词嵌入序列和编码器最终隐藏状态拼接起来作为解码器输入
# X的形状为(7,4,8),context的形状为(7,4,16),拼接后的形状为(7,4,24)
X_and_context = torch.cat((X, context), 2)
# 将拼接后的输入序列和初始隐藏状态输入到RNN模型中
# 输出 output 的形状为 (7, 4, 16)
# 隐藏状态 state 的形状为 (2, 4, 16)。
output, state = self.rnn(X_and_context, state)
# 将RNN模型的输出通过全连接层映射到词汇表大小的向量,并进行维度转置
# 经过全连接层后,输出的形状为(7,4,10),转置后的形状为(4,7,10)
output = self.dense(output).permute(1, 0, 2)
return output, state
# 创建一个Seq2SeqEncoder对象,设置词汇表大小为10,嵌入维度为8,隐藏状态维度为16,层数为2
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
# 将编码器设置为评估模式,这将影响一些层的行为,如dropout层
encoder.eval()
# 创建一个形状为(4, 7)的输入张量X,用于模拟4个样本,每个样本有7个单词
X = torch.zeros((4, 7), dtype=torch.long)
output, state = encoder(X)
print(output.shape)
# 打印最终隐藏状态的形状
print(state.shape)
print('=========================')
# 实例化解码器
# 创建一个Seq2SeqDecoder对象,设置词汇表大小为10,嵌入维度为8,隐藏状态维度为16,层数为2
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
decoder.eval()
# 使用编码器的输出来初始化解码器的隐藏状态
state = decoder.init_state(encoder(X))
# 将输入张量X和初始化的隐藏状态传递给解码器,得到输出张量output和更新后的隐藏状态state
output, state = decoder(X, state)
print(output.shape)
print(state.shape)
通过零值化屏蔽不相关的项
import torch
# 通过零值化屏蔽不相关的项
# X的形状为[批量大小,序列最大长度],valid_len为每个序列中实际有效的元素数量(长度),value用于填充不相关项的值,默认为0
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项"""
# 获取序列的最大长度(从X中的第二维获得)
maxlen = X.size(1)
# 创建一个掩码,标记不相关的项为False
# [None, :]通过增加一个维度,将其变为形状为[1, maxlen]的二维张量。
# valid_len[:, None]通过增加一个维度,将valid_len从形状[batch_size]变为[batch_size, 1]的二维张量。
# <操作符进行广播比较,生成一个形状为[batch_size, maxlen]的布尔型张量,表示每个位置是否在有效长度内。
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
# 将不相关的项零值化,即用指定的值进行填充
# ~mask对掩码进行逻辑非操作
X[~mask] = value
return X
# 创建一个输入张量X,用于演示
X = torch.tensor([[1, 2, 3], [4, 5, 6]])
# 调用sequence_mask函数,对输入张量X进行屏蔽操作,将填充的项标出来
# 表示第一个序列的有效长度为1,第二个序列的有效长度为2
print(sequence_mask(X, torch.tensor([1, 2])))
我们还可以使用此函数屏蔽最后几个轴上的所有项
# 创建一个全为1的输入张量X,用于演示
X = torch.ones(2,3,4)
# 调用sequence_mask函数,对输入张量X进行屏蔽操作,将最后几个轴上的所有项标出来,使用-1进行填充
sequence_mask(X, torch.tensor([1,2]),value=-1)
该部分总代码
import torch
# 通过零值化屏蔽不相关的项
# X的形状为[批量大小,序列最大长度],valid_len为每个序列中实际有效的元素数量(长度),value用于填充不相关项的值,默认为0
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项"""
# 获取序列的最大长度(从X中的第二维获得)
maxlen = X.size(1)
# 创建一个掩码,标记不相关的项为False
# [None, :]通过增加一个维度,将其变为形状为[1, maxlen]的二维张量。
# valid_len[:, None]通过增加一个维度,将valid_len从形状[batch_size]变为[batch_size, 1]的二维张量。
# <操作符进行广播比较,生成一个形状为[batch_size, maxlen]的布尔型张量,表示每个位置是否在有效长度内。
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
# 将不相关的项零值化,即用指定的值进行填充
# ~mask对掩码进行逻辑非操作
X[~mask] = value
return X
# 创建一个全为1的输入张量X,用于演示
X = torch.ones(2, 3, 4)
# 调用sequence_mask函数,对输入张量X进行屏蔽操作,将最后几个轴上的所有项标出来,使用-1进行填充
print(sequence_mask(X, torch.tensor([1, 2]), value=-1))
通过扩展softmax交叉熵损失函数来遮蔽不相关的预测
import torch
from torch import nn
# 通过零值化屏蔽不相关的项
# X的形状为[批量大小,序列最大长度],valid_len为每个序列中实际有效的元素数量(长度),value用于填充不相关项的值,默认为0
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项"""
# 获取序列的最大长度(从X中的第二维获得)
maxlen = X.size(1)
# 创建一个掩码,标记不相关的项为False
# [None, :]通过增加一个维度,将其变为形状为[1, maxlen]的二维张量。
# valid_len[:, None]通过增加一个维度,将valid_len从形状[batch_size]变为[batch_size, 1]的二维张量。
# <操作符进行广播比较,生成一个形状为[batch_size, maxlen]的布尔型张量,表示每个位置是否在有效长度内。
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
# 将不相关的项零值化,即用指定的值进行填充
# ~mask对掩码进行逻辑非操作
X[~mask] = value
return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""带遮蔽的softmax交叉熵损失函数"""
def forward(self, pred, label, valid_len):
weights = torch.ones_like(label)
# 这里的weights的形状是[3,4]
# tensor([[1, 1, 1, 1],
# [1, 1, 1, 1],
# [1, 1, 1, 1]])
# 然后经历过valid_len遮蔽后变成
# tensor([[1, 1, 1, 1],
# [1, 1, 0, 0],
# [0, 0, 0, 0]])
weights = sequence_mask(weights, valid_len)
# 设置损失函数的计算方式为不进行降维
self.reduction = 'none'
# 调用父类的forward方法计算未加权的交叉熵损失
# 把预测的维度放在中间,方便计算加权损失
# pred 的形状为 [3, 4, 10],label 的形状为 [3, 4],则 unweighted_loss 的形状为 [3, 4]。
# tensor([[2.3026, 2.3026, 2.3026, 2.3026],
# [2.3026, 2.3026, 2.3026, 2.3026],
# [2.3026, 2.3026, 2.3026, 2.3026]])
unweighted_loss = super().forward(pred.permute(0, 2, 1), label)
# 将未加权的损失乘以权重,然后在第1个维度上求均值,得到加权的损失
# weighted_loss的形状为[3]
# unweighted_loss * weights的结果(按元素相乘)
# tensor([[2.3026, 2.3026, 2.3026, 2.3026],
# [2.3026, 2.3026, 0.0000, 0.0000],
# [0.0000, 0.0000, 0.0000, 0.0000]])
weighted_loss = (unweighted_loss * weights).mean(dim=1) # 有效的留下来,没效的全部变为0
# 返回加权的损失张量
return weighted_loss
loss = MaskedSoftmaxCELoss()
# 调用损失函数对象的forward方法,计算损失
# 3是批量大小、4是时间长度、10是vacab_size
print(loss(torch.ones(3, 4, 10), # 预测
torch.ones((3, 4), dtype=torch.long), # 标签
torch.tensor([4, 2, 0]) #有效长度
)
)
训练
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
# 对于GRU层的每个参数
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs])
for epoch in range(num_epochs):
# 创建计时器对象,用于计算每个epoch的训练时间
timer = d2l.Timer()
# 创建累加器对象,用于累加损失和标记的数量
metric = d2l.Accumulator(2)
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
# 创建起始符号的张量bos,并移动到指定设备上
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
# 构造解码器的输入,将bos和去除最后一列的标签张量Y拼接起来
dec_input = torch.cat([bos, Y[:, :-1]], 1)
# 前向传播,得到预测结果Y_hat
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward()
# 对梯度进行裁剪,防止梯度爆炸
d2l.grad_clipping(net, 1)
# 计算标记的数量
num_tokens = Y_valid_len.sum()
# 更新模型参数
optimizer.step()
# 使用torch.no_grad()上下文管理器,关闭梯度计算,避免计算图的构建
with torch.no_grad():
# 累加损失和标记的数量
metric.add(l.sum(), num_tokens)
# 每10个epoch打印一次损失
if (epoch + 1) % 10 == 0:
# 绘制损失随训练epoch的变化情况
animator.add(epoch + 1, (metric[0] / metric[1],))
# 打印最终的损失值和训练速度
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
预测
# net:序列到序列模型,src_sentence:源序列(作为输入序列),src_vocab:源语言词汇表(用于将源句子中的单词转换为索引),
# tgt_vocab: 目标语言词汇表,用于将预测的输出索引转换回单词,num_steps:解码过程中的最大时间步数
# save_attention_weights:表示是否保存注意力权重。
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False):
"""序列到序列模型的预测"""
# 在预测时将net设置为评估模式
net.eval()
# 转为小写然后分割成单词列表,然后src_vocab[]将其转换为索引,并在末尾添加<eos>标记
# 例如:
# ['go', '.'] =>[9, 4, 3]
# ['i', 'lost', '.'] =>[6, 20, 4, 3]
# ["he's", 'calm', '.'] =>[58, 43, 4, 3]
# ["i'm", 'home', '.'] =>[7, 56, 4, 3]
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
# 计算源序列的实际长度,分别是3、4、4、4
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
# 对源序列进行截断或填充,使其长度等于 num_steps,num_steps=10
# [9, 4, 3, 1, 1, 1, 1, 1, 1, 1]
# [6, 20, 4, 3, 1, 1, 1, 1, 1, 1]
# [58, 43, 4, 3, 1, 1, 1, 1, 1, 1]
# [7, 56, 4, 3, 1, 1, 1, 1, 1, 1]
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# 添加批量轴,使输入数据符合模型的输入要求
# tensor([[9, 4, 3, 1, 1, 1, 1, 1, 1, 1]], device='cuda:0')
# tensor([[ 6, 20, 4, 3, 1, 1, 1, 1, 1, 1]], device='cuda:0')
# tensor([[58, 43, 4, 3, 1, 1, 1, 1, 1, 1]], device='cuda:0')
# tensor([[7, 56, 4, 3, 1, 1, 1, 1, 1, 1]], device='cuda:0')
enc_X = torch.unsqueeze(torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
# 使用模型的编码器对处理后的源序列进行编码
enc_outputs = net.encoder(enc_X, enc_valid_len)
# 初始化解码器的隐藏状态
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# 添加批量轴,使输入数据符合模型的输入要求
dec_X = torch.unsqueeze(torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
# 初始化输出序列和注意力权重序列
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
# 使用解码器net.decoder 和当前隐藏状态 dec_state 对当前输入 dec_X 进行解码,得到输出 Y 和更新后的隐藏状态 dec_state。
Y, dec_state = net.decoder(dec_X, dec_state)
# 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入
dec_X = Y.argmax(dim=2)
# 得到预测的索引
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# 保存注意力权重(稍后讨论)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 一旦序列结束词元被预测,输出序列的生成就完成了
if pred == tgt_vocab['<eos>']:
break
# 将当前预测的词元添加到输出序列output_seq
output_seq.append(pred)
# 使用 tgt_vocab.to_tokens 将输出序列的索引转换为单词,并连接成字符串
# tgt_vocab.to_tokens(output_seq)为预测的法语词,例如['va', '', '!']然后在连接成字符串va !
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
BLEU的代码实现
# BLEU的代码实现
# pred_seq:预测序列,label_seq:标签序列,k:用于计算BLEU的n-gram大小
def bleu(pred_seq, label_seq, k):
"""计算 BLEU"""
# 将预测序列和标签序列分割成词元列表
pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
# 计算预测序列和标签序列的长度
len_pred, len_label = len(pred_tokens), len(label_tokens)
# 计算长度惩罚因子,如果预测序列比标签序列长,则惩罚因子会小于1,反之则为1(因为没有超过标签序列长度,不惩罚)
score = math.exp(min(0, 1 - len_label / len_pred))
# 对每个n-gram进行计算,其中k为最大n-gram的大小
for n in range(1, k + 1):
# 初始化匹配次数和标签序列中的n-gram计数器
num_matches, label_subs = 0, collections.defaultdict(int)
# 遍历标签序列,计算标签序列中的n-gram出现次数,存储在label_subs字典中。
for i in range(len_label - n + 1):
# 更新标签序列中n-gram的计数
label_subs[''.join(label_tokens[i:i + n])] += 1
# 遍历预测序列,统计预测序列中与标签序列n-gram匹配的次数
for i in range(len_pred - n + 1):
# 如果预测序列中的n-gram在标签序列中出现,则增加匹配次数,并减少标签序列中该n-gram的计数
if label_subs[''.join(pred_tokens[i:i + n])] > 0:
num_matches += 1
label_subs[''.join(pred_tokens[i:i + n])] -= 1
# 根据匹配次数和预测序列的长度计算得分
score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
# 返回计算得到的BLEU得分
return score
该部分总代码
import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
import os
class Seq2SeqEncoder(d2l.Encoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout)
def forward(self, X, *args):
X = self.embedding(X)
X = X.permute(1, 0, 2)
output, state = self.rnn(X)
return output, state
# 解码器
class Seq2SeqDecoder(d2l.Decoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, *args):
return enc_outputs[1]
def forward(self, X, state):
X = self.embedding(X).permute(1, 0, 2)
context = state[-1].repeat(X.shape[0], 1, 1)
X_and_context = torch.cat((X, context), 2)
output, state = self.rnn(X_and_context, state)
output = self.dense(output).permute(1, 0, 2)
return output, state
def read_data_nmt():
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()
def preprocess_nmt(text):
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ''
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
out = [
' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
def tokenize_nmt(text, num_examples=None):
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
def truncate_pad(line, num_steps, padding_token):
if len(line) > num_steps:
return line[:num_steps]
return line + [padding_token] * (num_steps - len(line))
def build_array_nmt(lines, vocab, num_steps):
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = torch.tensor([truncate_pad(l, num_steps, vocab['<pad>']) for l in lines])
valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
return array, valid_len
def load_data_nmt(batch_size, num_steps, num_examples=600):
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
def sequence_mask(X, valid_len, value=0):
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
# 将不相关的项零值化,即用指定的值进行填充
# ~mask对掩码进行逻辑非操作
X[~mask] = value
return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
def forward(self, pred, label, valid_len):
weights = torch.ones_like(label)
weights = sequence_mask(weights, valid_len)
self.reduction = 'none'
unweighted_loss = super().forward(pred.permute(0, 2, 1), label)
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
# 对于GRU层的每个参数
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2)
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
dec_input = torch.cat([bos, Y[:, :-1]], 1)
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward()
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
# net:序列到序列模型,src_sentence:源序列(作为输入序列),src_vocab:源语言词汇表(用于将源句子中的单词转换为索引),
# tgt_vocab: 目标语言词汇表,用于将预测的输出索引转换回单词,num_steps:解码过程中的最大时间步数
# save_attention_weights:表示是否保存注意力权重。
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False):
"""序列到序列模型的预测"""
# 在预测时将net设置为评估模式
net.eval()
# 转为小写然后分割成单词列表,然后src_vocab[]将其转换为索引,并在末尾添加<eos>标记
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
# 计算源序列的实际长度
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
# 对源序列进行截断或填充,使其长度等于 num_steps
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# 添加批量轴,使输入数据符合模型的输入要求
enc_X = torch.unsqueeze(torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
# 使用模型的编码器对处理后的源序列进行编码
enc_outputs = net.encoder(enc_X, enc_valid_len)
# 初始化解码器的隐藏状态
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# 添加批量轴,使输入数据符合模型的输入要求
dec_X = torch.unsqueeze(torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
# 初始化输出序列和注意力权重序列
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
# 使用解码器net.decoder 和当前隐藏状态 dec_state 对当前输入 dec_X 进行解码,得到输出 Y 和更新后的隐藏状态 dec_state。
Y, dec_state = net.decoder(dec_X, dec_state)
# 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入
dec_X = Y.argmax(dim=2)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# 保存注意力权重(稍后讨论)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 一旦序列结束词元被预测,输出序列的生成就完成了
if pred == tgt_vocab['<eos>']:
break
# 将当前预测的词元添加到输出序列output_seq
output_seq.append(pred)
# 使用 tgt_vocab.to_tokens 将输出序列的索引转换为单词,并连接成字符串
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
# BLEU的代码实现
# pred_seq:预测序列,label_seq:标签序列,k:用于计算BLEU的n-gram大小
def bleu(pred_seq, label_seq, k):
"""计算 BLEU"""
# 将预测序列和标签序列分割成词元列表
pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
# 计算预测序列和标签序列的长度
len_pred, len_label = len(pred_tokens), len(label_tokens)
# 计算长度惩罚因子,如果预测序列比标签序列长,则惩罚因子会小于1,反之则为1(因为没有超过标签序列长度,不惩罚)
score = math.exp(min(0, 1 - len_label / len_pred))
# 对每个n-gram进行计算,其中k为最大n-gram的大小
for n in range(1, k + 1):
# 初始化匹配次数和标签序列中的n-gram计数器
num_matches, label_subs = 0, collections.defaultdict(int)
# 遍历标签序列,计算标签序列中的n-gram出现次数,存储在label_subs字典中。
for i in range(len_label - n + 1):
# 更新标签序列中n-gram的计数
label_subs[''.join(label_tokens[i:i + n])] += 1
# 遍历预测序列,统计预测序列中与标签序列n-gram匹配的次数
for i in range(len_pred - n + 1):
# 如果预测序列中的n-gram在标签序列中出现,则增加匹配次数,并减少标签序列中该n-gram的计数
if label_subs[''.join(pred_tokens[i:i + n])] > 0:
num_matches += 1
label_subs[''.join(pred_tokens[i:i + n])] -= 1
# 根据匹配次数和预测序列的长度计算得分
score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
# 返回计算得到的BLEU得分
return score
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
# 将几个英语句子翻译成法语
# 定义英语句子列表
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
# 定义法语句子列表
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
# 使用zip函数迭代英语句子和法语句子的对应元素
for eng, fra in zip(engs, fras):
# 调用predict_seq2seq函数进行翻译预测,并获取翻译结果和注意力权重序列
translation, attention_weight_seq = predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device)
# 调用bleu函数计算翻译结果的BLEU分数
# 打印英语句子、翻译结果和BLEU分数
print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')