文章目录
- 深入解析 Transformer
- 1 理解 Transformer
- 1.1 理解自注意力机制 (Self-Attention)
- 1.2 理解位置编码 (Positional Encoding)
- 1.2.1 整数编码
- 1.2.2 正弦编码
- 1.3 理解编码器和解码器模块
- 1.3.1 编码器
- 1.4 最终线性层和 Softmax 层
- 2 编写 Transformer 的代码
- 2.1 摘要和引言
- 2.2 背景
- 2.3 模型架构
- 2.4 代码
- 2.4.1 多头注意力机制MHA
- 2.4.2 前馈网络
- 2.4.3 位置编码
- 2.4.4 编码器层
- 2.4.5 解码器层
- 2.4.6 编码器
- 2.4.7 解码器
- 2.4.8 整个Transformer
- 2.4.9 训练Transformers
- 3 其它及进一步阅读
深入解析 Transformer
https://goyalpramod.github.io/blogs/Transformers_laid_out/
提醒:阅读本文大约需要40分钟或者更多。
我发现现在关于 Transformer 的博客、视频和教程主要分为以下三类:
- 解释 Transformer 工作原理的: 其中 Jay Alammar 的博客是佼佼者。
- 解读 “Attention is All You Need” 论文的: 例如 The Annotated Transformer。
- 用 PyTorch 实现 Transformer 的: 例如 “Coding a ChatGPT Like Transformer From Scratch in PyTorch”。
这些资料都以出色的方式,从多个角度帮助我们理解单个概念。(本博客深受上述作品影响)
本文旨在:
- 直观地解释 Transformer 的工作原理。
- 详细阐述论文的每个部分,以及如何理解和实现它们。
- 从初学者的角度,用 PyTorch 编写代码实现。
- 将所有内容整合在一起,一站式学习。
如何使用本博客
我将首先简要概述 Transformer 的工作原理,以及它最初被开发的原因。
在建立基本背景之后,我们将深入代码本身。
我将引用论文中的章节,并说明我们将要编码的 Transformer 部分,同时提供代码示例,并附带提示和文档链接,例如:
class TransformerLRScheduler:
def __init__(self, optimizer, d_model, warmup_steps):
"""
Args:
optimizer: Optimizer to adjust learning rate for
d_model: Model dimensionality
warmup_steps: Number of warmup steps
"""
# YOUR CODE HERE
def step(self, step_num):
"""
Update learning rate based on step number
"""
# lrate = d_model^(-0.5) * min(step_num^(-0.5), step_num * warmup_steps^(-1.5))
# YOUR CODE HERE - implement the formula
我将在代码块后添加有用的链接,但我建议你首先自己进行研究。这是成为一名优秀的工程师的第一步。
我建议你复制这些代码块,并尝试自己实现它们。
为了简化过程,在开始编码之前,我将详细解释每个部分。如果你仍然无法解决,请回来查看我的实现。
1 理解 Transformer
最初的 Transformer 是为机器翻译任务而设计的,这也是我们将要做的。我们将尝试将英语句子“I like Pizza”翻译成印地语。
不过,在此之前,让我们先简单了解一下 Transformer 这个黑盒。我们可以看到它由编码器(Encoder)和解码器(Decoder)组成。
在传递给编码器之前,句子“I like Pizza”会被分解成各个单词*,并使用嵌入(embedding)矩阵(与 Transformer 一起训练)进行嵌入。
然后,将位置信息添加到这些嵌入中。
之后,这些嵌入被传递到编码器模块,它主要完成两件事:
- 应用自注意力机制(Self-Attention)来理解各个单词彼此之间的关系。
- 将自注意力分数输出到前馈网络(Feed Forward Network)。
解码器模块接收来自编码器的输出,对其进行自身处理,产生一个输出,并将该输出发送回自身以生成下一个单词。
你可以这样理解:编码器理解你的语言,我们称之为 X,以及另一种语言,我们称之为 Y。解码器理解 Y 和你尝试翻译的语言 X,我们称之为 Z。
因此,Y 充当编码器和解码器交流的通用语言,以产生最终输出。
*为了便于理解,我们使用了单词,但大多数现代大型语言模型 (LLM) 并不使用单词,而是使用“标记(Token)”。
1.1 理解自注意力机制 (Self-Attention)
我们都听说过著名的三元组“查询(Query)、键(Key)和值(Value)”。我曾一度迷失在思考这些术语背后的由来。Q、K、Y 与字典(或传统 CS 中的映射)有关吗?它是受到之前论文的启发吗?如果是,它们是如何出现的?
让我们首先建立对这个想法的直观理解。
句子 (S): “Pramod loves pizza”(Pramod 喜欢披萨)
问题:
"who loves pizza?"谁喜欢披萨?
你可以为这个句子提出任意多的问题(查询)。现在,对于每个查询,你都会有一个特定的信息片段(键),它将为你提供想要的答案(值)。
查询:
- Q ->who loves pizza 谁喜欢披萨?
- K -> pizza披萨、Pramod、loves喜欢(实际上所有单词都会有不同程度的重要性)
- V -> pizza披萨(值并非直接的答案,而是类似于答案的矩阵表示)
这实际上是一种过于简化的说法,但它可以帮助我们理解,查询、键和值都只能通过句子本身创建。
让我们先了解如何应用自注意力机制,然后了解为什么要这样做。此外,在接下来的解释中,将 Q、K、V 视为纯矩阵,而不是其他任何东西。
首先,将单词 “delicious pizza 美味的披萨” 转换为嵌入。然后,将它们乘以权重 W_Q、W_K 和 W_V 以生成 Q、K 和 V 向量。
这些权重 W_Q、W_K 和 W_V 与 Transformer 一起训练。注意,向量 Q、K 和 V 的大小小于 x1 和 x2 的大小。即,x1 和 x2 是大小为 512 的向量,而 Q、K 和 V 的大小为 64。这是一种架构选择,目的是使计算更小、更快。
总的来看,就是x1要变成q1,k1,v1 ,x2要变成q2,k2,v2,如果有更多的输入,那么是同理;上图是x是1x4,q,k,v则是1x3, 如果有多个输入,x=Nx4 W_Q=4x3 得到 q=Nx3.
现在,使用这些 Q、K 和 V 向量计算注意力分数。
为了计算第一个单词“delicious美味的”的注意力分数,我们取该单词的查询 (q1) 和键 (k1),并将它们进行点积运算。(点积非常适合查找事物之间的相似性)。
然后,我们将其除以键向量维度的平方根。这样做是为了稳定训练。
对第一个单词的查询 (q1) 和不同单词的所有键(在本例中为 k1 和 k2)执行相同的操作。
最后,使用所有值,我们对每个值进行 softmax 运算。
然后,将这些值乘以每个单词的值 (v1, v2)。直观地讲,是为了得到每个单词相对于所选单词的重要性。不太重要的单词会被乘以 0.001 等较小的值而淹没。
最后,将所有内容加起来得到 Z 向量。
Transformer 的亮点在于可以并行计算。因此,我们不处理向量,而是处理矩阵。
实现过程保持不变。
首先,计算 Q、K 和 V 矩阵。
其次,计算注意力分数。
K_t 是K的转置
第三,对每个注意力头重复这些步骤。
这是每个注意力头的输出结果。
最后,将所有注意力头的输出连接起来,并乘以矩阵 WO(与模型一起训练)以获得最终的注意力分数。
以下是所有过程的总结:
上图展示了从输入句子到输出自注意力结果的完整步骤。它主要分为以下几个阶段:
-
输入与嵌入 (1, 2):
- 输入句子: 开始时,我们有一个句子,例如“Delicious Pizza”。
- 词嵌入: 句中的每个词都被转化为一个向量表示,也就是词嵌入。图中使用“X”表示词嵌入后的矩阵。
- 注意: 对于第一个编码器层(encoder_1),我们直接使用词嵌入“X”。而在后续的编码器层(encoder_2, encoder_3,…),我们则使用前一个编码器层的输出,用“R”表示。
-
多头处理 (3):
- 权重矩阵: 将嵌入后的矩阵“X”或“R” 分别与三个权重矩阵 (W_Q, W_K, W_V) 相乘。 这些权重矩阵是模型训练过程中学习到的参数,每个头都有自己独立的W_Q, W_K, W_V 。
- Q (Query) 查询向量: W_Q * X (或 R)
- K (Key) 键向量: W_K * X (或 R)
- V (Value) 值向量: W_V * X (或 R)
- 多头 (8 heads): 这里,输入数据被“分割”成了 8 个“头”,每个头都有一组不同的W_Q, W_K, W_V 矩阵,分别计算得到 Q1,K1,V1,Q2,K2,V2… Q8,K8,V8。 这允许模型从不同的角度关注输入信息,从而捕捉更丰富的信息。 图中,“**8” 是一个架构选择,可以改为其他数字,比如64。
- 权重矩阵: 将嵌入后的矩阵“X”或“R” 分别与三个权重矩阵 (W_Q, W_K, W_V) 相乘。 这些权重矩阵是模型训练过程中学习到的参数,每个头都有自己独立的W_Q, W_K, W_V 。
-
计算注意力得分 (4):
- 每个头分别计算 Q,K,V 的点积,得到注意力分数。 这是一个关键步骤,它决定了在输入句子中哪些词需要被赋予更大的关注权重。 每个头输出的注意力结果表示为 QK/sqrt(d_k) 之后再经过 softmax 归一化 (图中省略了这一步)。
- d_k 为每个头 K向量的维度
-
注意力加权 (4):
- 根据上一步计算得到的注意力分数,对 V (Value) 向量进行加权求和。 这将产生每个头关注加权后的输出。图中以Z1, Z2,… Z8表示。
-
合并与最终输出 (5):
- 拼接: 每个头的注意力结果 (Z1, Z2,… Z8) 被拼接在一起,形成一个更大的矩阵。
- 投影: 将拼接后的矩阵与一个额外的权重矩阵 WO (W 输出) 相乘,得到最终的自注意力输出“Z”。 这个WO 矩阵也是模型训练过程中学习到的参数。
总结:
这个图的核心思想是:
- 多头机制: 通过将输入数据分割成多个“头”进行处理,使模型能够从多个不同的角度捕捉输入信息,并提高模型的表达能力。
- 自注意力机制: 通过计算 Q,K,V 的点积和加权,让模型在处理一个词的时候,同时考虑到句子中的其他词。
- 线性变换 (W_Q, W_K, W_V, WO): 通过与这些权重矩阵的乘法,将输入数据进行转换和整合,最终产生模型的输出。
关键点:
- Encoder vs. 其他层: 图中标注说明,对于第一个encoder层,输入是词嵌入X,而后续encoder层则使用前一层encoder的输出R作为输入。
- 架构选择: 图中标注了“8”头是一种架构选择,可以使用其他数字。
现在,让我们来理解为什么它会起作用:
忘记多头注意力、注意力模块和所有术语。
想象一下,你在 A 点,想去大城市中的 B 点。
你认为只有一条路可以到达那里吗?当然不是,有成千上万种方法可以到达那个点。
但是,在你尝试过很多方法之前,你永远不会知道最佳路径。越多越好。
因此,单个矩阵乘法无法获得查询和键的最佳表示。
可以进行多个查询,可以为每个查询完成多个键。
这就是为什么我们要进行如此多的矩阵乘法,以尝试获得与用户提出的问题相关的查询的最佳键。
为了可视化自注意力如何创建不同的表示。让我们看一下单词“apple苹果”、“market市场”和“cellphone手机”的三种不同表示。
哪种表示最能回答以下问题?
- What does the company apple make苹果公司生产什么?
表示 2 是回答此问题的最佳选择,它给出的答案是“手机”,因为这是与它最接近的答案。
那下一个问题呢?
- Where can I get a new iphone我在哪里可以买到新的 iPhone?
在这种情况下,表示 3 将是最佳选择,我们将得到答案“市场”。
(上边三种不同表示这些是线性变换,可以应用于任何矩阵,第三个称为剪切运算)
1.2 理解位置编码 (Positional Encoding)
为了理解什么是位置编码以及为什么需要它,让我们想象一下没有位置编码的情况。
首先,输入句子,例如“Pramod likes to eat pizza with his friends Pramod 喜欢和他的朋友一起吃披萨”。将被分解成各自的单词*。
“Pramod”、“likes喜欢”、“eat吃”、“pizza披萨”、“ with和”、“his他的”、“friends朋友”
其次,每个单词都将被转换为给定维度的嵌入。
现在,如果没有位置编码。模型没有关于单词相对位置的信息。(因为所有内容都是并行处理的)
因此,这个句子与“Pramod likes to eat eat friends with hiss pizza 喜欢和他的披萨朋友一起吃”或任何其他单词排列没有什么不同。
因此,我们需要 PE(位置编码)的原因是为了告诉模型不同单词彼此之间的相对位置。
现在,这种 PE 的首选特性是什么:
- 每个位置的唯一编码: 否则,它将不断地为不同长度的句子改变。对于 10 个单词的句子,位置 2 将不同于 100 个单词的句子中同样的2。这将妨碍训练,因为没有可以遵循的可预测模式。
- 两个编码位置之间的线性关系: 如果我知道一个单词的位置 p,那么应该很容易计算另一个单词的位置 p+k。这将使模型更容易学习模式。
- 扩展到为比训练中遇到的更长的序列: 如果模型受到训练中使用句子长度的限制,它将永远无法在现实世界中工作。
- 由模型可以学习的确定性过程生成: 它应该是一个简单的公式或容易计算的算法。为了帮助我们的模型更好地泛化。
- 可扩展到多个维度: 不同的场景可能有不同的维度,我们希望它在所有情况下都能工作。
1.2.1 整数编码
阅读上述条件,任何人的第一想法都会是。“为什么不直接添加单词的位置?”这种简单的方法适用于短句子。但是对于较长的句子,例如一篇包含 2000 个单词的文章,添加位置 2000 可能会导致梯度爆炸或消失。
还有其他替代方法,例如规范化整数编码、二进制编码。但每种方法都有其自身的问题。要详细了解更多信息,请参阅此处You could have designed state of the art positional encoding。
1.2.2 正弦编码
满足我们所有条件的一种编码方法是使用正弦函数。如论文中所述。
但是,如果正弦满足所有条件,为什么还要交替使用余弦呢?
好吧,正弦不能满足所有条件,而只能满足大多数条件。正弦不能满足我们对线性关系的需求,因此我们也需要余弦。在此,我将提供一个简单的证明,该证明取自此处Linear Relationships in the Transformer’s Positional Encoding。
考虑一系列正弦和余弦对,每个都与一个频率wi相关联。我们的目标是找到一个线性变换矩阵
M,该矩阵可以将这些正弦函数移动一个固定的偏移量k:
频率wi遵循一个几何级数,该几何级数随着维数索引i的减小而减小,定义为:
为了找到这个变换矩阵,我们可以把它表示为一个具有未知系数的通用 2×2 矩阵:
通过将三角加法定理和差化积应用于右侧,我们可以将其展开为:
通过匹配系数,此展开式为我们提供了一个由两个方程组成的系统:
通过比较两边的项,我们可以求解未知的系数:
这些解为我们提供了最终的变换矩阵:
现在我们了解了什么是 PE 以及为什么使用正弦和余弦。让我们了解它是如何工作的。
pos = 单词在句子中的位置(“Pramod likes pizza 喜欢披萨”,Pramod 在位置 0,likes在 1,依此类推)
i = 嵌入的第 i 个和 (i+1) 个索引的值,偶数列号的正弦,奇数列号的余弦(“Pramod”被转换为嵌入的向量。它具有不同的索引)
d_model = 模型的维度(在我们的例子中为 512)
10,000 (n) = 这是一个实验确定的常数
正如你所看到的,使用这个公式,我们可以计算每个位置的 PE 值以及该位置的所有索引。这是一个简单的图示,显示了它是如何完成的。
现在,扩展上面的内容,这是它的函数形式:
更多可视化请参考.
这是 n = 10,000、d_model = 10,000 和序列长度 = 100 时原始数据的样子。此处提供生成代码:
将它想象成这样,y 轴上的每个索引代表一个单词,而 x 轴上对应于该索引的所有内容都是它的位置编码。
1.3 理解编码器和解码器模块
如果到目前为止一切都说得通,那么这对你来说将是轻而易举的。因为这正是我们将所有内容组合在一起的地方。
单个 Transformer 可以有多个编码器以及解码器模块。
1.3.1 编码器
让我们首先从编码器部分开始。
它由多个编码器组成,每个编码器模块包含以下部分:
- 多头注意力机制
- 残差连接
- 层归一化
- 前馈网络
残差连接
我们已经详细讨论了多头注意力机制,所以让我们来谈谈剩下的三个。
残差连接,也称为跳跃连接,正如其名称所暗示的那样。它们获取输入,跳过一个模块,并将其传递到下一个模块。
层归一化
层归一化是批归一化之后的一种发展。在我们谈论这两者中的任何一个之前,我们必须了解什么是归一化。
归一化是一种将不同特征置于同一尺度的方法,这样做是为了稳定训练。因为当模型尝试从尺度差异很大的特征中学习时,它会减慢训练速度并导致梯度爆炸。在此处。
批归一化是一种方法,其中从未来层减去整个批次的均值和标准差。
上图来自这里
在层归一化中,不是关注整个批次,而是关注单个实例的所有特征。
你可以这样理解,我们从一个句子中取每个单词,并规范化该单词。
为了更好地理解,请阅读此博客。
前馈网络
添加前馈网络 (FFN) 是为了向模型引入非线性和复杂性。虽然注意力机制非常擅长捕获序列中不同位置之间的关系,但它本质上仍然是一种线性运算(如前所述)。
FFN 通过其激活函数(通常为 ReLU)添加非线性,使模型能够学习更复杂的模式和变换,而纯注意力机制无法单独捕获这些模式和变换。
你可以这样理解:如果注意力机制就像进行对话,每个人都可以与其他人交谈(全局交互),那么 FFN 就像给每个人时间来深入思考他们所听到的内容并独立处理它(局部处理)。两者对于有效理解和转换输入都是必要的。如果没有 FFN,Transformer 学习复杂函数的能力将受到严重限制,并且本质上将仅限于通过注意力机制进行的加权平均运算。
解码器模块
编码器的输出作为数据处理中的键和值矩阵馈送到每个解码器模块。解码器模块是自回归的。这意味着它一个接一个地输出,并将其自身的输出作为输入。
- 解码器模块从编码器获取键和值,并从之前的输出中创建自己的查询。
- 使用第一步的输出,它移动到步骤 2,其中从先前的解码器模块的输出作为查询,键,值从编码器中获取。
- 此过程会重复进行,直到我们从解码器获得输出,然后将其作为创建下一个标记的输入
- 重复此过程,直到我们到达标记
解码器模块中也有一个小的变化,即我们应用一个掩码,以使自注意力机制仅关注输出序列中较早的位置。
这就是你能够编写自己的 Transformer 所需要的所有高级理解。现在让我们看一下论文以及代码。
1.4 最终线性层和 Softmax 层
解码器输出数字向量(通常是浮点数),该向量将发送到线性层。
线性层输出词汇表中每个单词的分数(训练数据集中唯一单词的数量)。
然后,将其发送到 softmax 层,该层将这些分数转换为概率。并给出具有最高概率的单词。(通常是这种情况,有时我们可以将其设置为获取第二个最有可能的单词,或第三个最有可能的单词等等)
2 编写 Transformer 的代码
对于以下部分,我建议你打开 3 个选项卡。本博客、Jupyter 笔记本和原始论文。
2.1 摘要和引言
本节将向你介绍论文的内容以及最初的编写原因。
有一些概念可以帮助你学习新东西,例如 RNN、卷积神经网络CNN和关于 BLEU。
重要的是要知道,Transformer 最初是为文本到文本翻译而创建的。即从一种语言到另一种语言。
因此,它们具有编码器部分和解码器部分。它们传递信息,这被称为交叉注意力(稍后会详细介绍自注意力和交叉注意力之间的区别)。
2.2 背景
本节通常会讨论该领域先前完成的工作、已知问题以及人们用来修复它们的方法。我们需要记住一件非常重要的事情。
“跟踪遥远的信息(长文本)”。Transformer 之所以令人惊叹,有很多原因,但关键的一点是它们可以记住遥远的关系。
随着句子变长,RNN 和 LSTM 等解决方案会丢失上下文含义。但是 Transformer 不会遇到这样的问题。(不过,当您阅读时希望不存在的问题。这里和长度是上下文窗口长度,这决定了 Transformer 可以看到多少信息)
2.3 模型架构
我们一直在等待的部分。我将在这里稍微偏离论文。因为我发现更容易遵循数据的处理过程。此外,如果你阅读该论文,它的每个单词都应该对你来说是有意义的。
我们将首先从多头注意力机制开始,然后是前馈网络,最后是位置编码,使用这些我们将完成编码器层,随后我们将转到解码器层,之后我们将编写编码器和解码器模块,最后以编写整个 Transformer 在真实世界数据上的训练循环结束。
完整的笔记本可以在这里找到
2.4 代码
必要的导入
import math
import torch
import torch.nn as nn
from torch.nn.functional import softmax
2.4.1 多头注意力机制MHA
到目前为止,你应该对注意力机制的工作原理有很好的了解,因此让我们首先从编写缩放点积注意力机制(scaled dot-product attention)的代码开始(因为 MHA 基本上是将多个缩放点积堆叠在一起)。参考第 3.2.1 节缩放点积注意力机制
张量大小、矩阵乘法、掩码填充
# 我的实现
def scaled_dot_product_attention(query, key, value, mask=None):
"""计算缩放点积注意力
参数说明:
query: (batch_size, num_heads, seq_len_q, d_k) 查询张量
key: (batch_size, num_heads, seq_len_k, d_k) 键张量
value: (batch_size, num_heads, seq_len_v, d_v) 值张量
mask: 可选的掩码张量,用于防止注意力访问某些位置
"""
# 检查输入张量的维度
assert query.dim() == 4, f"Query应该是4维张量,但得到了{query.dim()}维"
assert key.size(-1) == query.size(-1), "Key和Query的深度维度必须相等"
assert key.size(-2) == value.size(-2), "Key和Value的序列长度必须相等"
# 获取深度维度大小
d_k = query.size(-1)
# 计算注意力分数
# 将query和key的转置相乘,然后除以sqrt(d_k)进行缩放
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# 如果提供了掩码,将掩码位置的注意力分数设为负无穷
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
# 对分数使用softmax得到注意力权重
attention_weights = softmax(scores, dim=-1)
# 将注意力权重与value相乘得到输出
return torch.matmul(attention_weights, value)
使用这个,让我们完成 MHA
class MultiHeadAttention(nn.Module):
#让我只为这个类编写初始化程序,以便你了解它的完成方式
def __init__(self, d_model, num_heads):
super().__init__()
assert d_model % num_heads == 0, "d_model must be divisible by num_heads" #思考为什么?
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads # 注意:使用整数除法 //
# 创建可学习的投影矩阵
self.W_q = nn.Linear(d_model, d_model) #思考为什么我们从 d_model -> d_model
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
@staticmethod
def scaled_dot_product_attention(query, key, value, mask=None):
# YOUR IMPLEMENTATION HERE
def forward(self, query, key, value, mask=None):
# get batch_size and sequence length
# YOUR CODE HERE
# 1. Linear projections
# YOUR CODE HERE
# 2. Split into heads
# YOUR CODE HERE
# 3. Apply attention
# YOUR CODE HERE
# 4. Concatenate heads
# YOUR CODE HERE
# 5. Final projection
# YOUR CODE HERE
- 我很难理解 view 和 transpose 之间的区别。以下 2 个链接应该可以帮助你:何时使用 view、transpose 和 permute 以及 view 和 transpose 之间的差异
- Contiguous 和 view,我仍然不明白。直到我阅读了这些:Pytorch Internals 和 Contiguous & Non-Contiguous Tensor
- Linear
- 我还有一篇关于张量内部内存管理如何工作的文章,如果你有兴趣,请阅读此文章。
#我的实现
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
"""多头注意力层初始化
Args:
d_model: 模型的维度
num_heads: 注意力头的数量
"""
super().__init__()
assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
self.d_model = d_model # 模型维度
self.num_heads = num_heads # 注意力头数量
self.d_k = d_model // num_heads # 每个头的维度
# 创建可学习的投影矩阵
# 每个矩阵的形状都是 (d_model, d_model)
self.W_q = nn.Linear(d_model, d_model) # Query的变换矩阵
self.W_k = nn.Linear(d_model, d_model) # Key的变换矩阵
self.W_v = nn.Linear(d_model, d_model) # Value的变换矩阵
self.W_o = nn.Linear(d_model, d_model) # 输出的变换矩阵
@staticmethod
def scaled_dot_product_attention(query, key, value, mask=None):
"""计算缩放点积注意力
Args:
query: shape (batch_size, num_heads, seq_len_q, d_k)
key: shape (batch_size, num_heads, seq_len_k, d_k)
value: shape (batch_size, num_heads, seq_len_v, d_v)
mask: 可选的掩码张量
Returns:
注意力输出: shape (batch_size, num_heads, seq_len_q, d_v)
"""
# Shape checks
assert query.dim() == 4, f"Query should be 4-dim but got {query.dim()}-dim"
assert key.size(-1) == query.size(-1), "Key and query depth must be equal"
assert key.size(-2) == value.size(-2), "Key and value sequence length must be equal"
d_k = query.size(-1)
# Attention scores
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attention_weights = softmax(scores, dim=-1)
return torch.matmul(attention_weights, value)
def forward(self, query, key, value, mask=None):
"""前向传播
Args:
query: shape (batch_size, seq_len, d_model)
key: shape (batch_size, seq_len, d_model)
value: shape (batch_size, seq_len, d_model)
mask: 可选的掩码张量
Returns:
输出: shape (batch_size, seq_len, d_model)
"""
batch_size = query.size(0)
seq_len = query.size(1)
# 1. 线性变换
# shape: (batch_size, seq_len, d_model)
Q = self.W_q(query)
K = self.W_k(key)
V = self.W_v(value)
# 2. 将张量分割成多个注意力头
# 重塑形状: (batch_size, seq_len, num_heads, d_k)
# 转置后: (batch_size, num_heads, seq_len, d_k)
Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
K = K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
V = V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
# 3. 应用注意力机制
# output shape: (batch_size, num_heads, seq_len, d_k)
output = self.scaled_dot_product_attention(Q, K, V, mask)
# 4. 合并多头
# 转置回来: (batch_size, seq_len, num_heads, d_k)
# 重塑形状: (batch_size, seq_len, d_model)
output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
# 5. 最终的线性变换
# shape: (batch_size, seq_len, d_model)
return self.W_o(output)
2.4.2 前馈网络
描述它的另一种方式是使用内核大小为 1 的两个卷积。输入和输出的维度为 dmodel = 512,内部层的维度为 df f = 2048
class FeedForwardNetwork(nn.Module):
"""Position-wise Feed-Forward Network
Args:
d_model: input/output dimension
d_ff: hidden dimension
dropout: dropout rate (default=0.1)
"""
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
#create a sequential ff model as mentioned in section 3.3
#YOUR CODE HERE
def forward(self, x):
"""
Args:
x: Input tensor of shape (batch_size, seq_len, d_model)
Returns:
Output tensor of shape (batch_size, seq_len, d_model)
"""
#YOUR CODE HERE
- Dropout
- 在哪里放置 Dropout
- ReLU
#我的实现
class FeedForwardNetwork(nn.Module):
"""Position-wise Feed-Forward Network
Args:
d_model: input/output dimension
d_ff: hidden dimension
dropout: dropout rate (default=0.1)
"""
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
self.model = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model),
nn.Dropout(dropout)
)
def forward(self, x):
"""
Args:
x: Input tensor of shape (batch_size, seq_len, d_model)
Returns:
Output tensor of shape (batch_size, seq_len, d_model)
"""
return self.model(x)
2.4.3 位置编码
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length=5000):
super().__init__()
# Create matrix of shape (max_seq_length, d_model)
#YOUR CODE HERE
# Create position vector
#YOUR CODE HERE
# Create division term
#YOUR CODE HERE
# Compute positional encodings
#YOUR CODE HERE
# Register buffer
#YOUR CODE HERE
def forward(self, x):
"""
Args:
x: Tensor shape (batch_size, seq_len, d_model)
"""
# YOUR CODE HERE
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length=5000):
super().__init__()
# Create matrix of shape (max_seq_length, d_model)
pe = torch.zeros(max_seq_length, d_model)
# Create position vector
position = torch.arange(0, max_seq_length).unsqueeze(1) # Shape: (max_seq_length, 1)
# Create division term
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
# Compute positional encodings
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# Register buffer
self.register_buffer('pe', pe.unsqueeze(0)) # Shape: (1, max_seq_length, d_model)
def forward(self, x):
"""
Args:
x: Tensor shape (batch_size, seq_len, d_model)
"""
return x + self.pe[:, :x.size(1)] # Add positional encoding up to sequence length
2.4.4 编码器层
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
# 1. Multi-head attention
self.mha = MultiHeadAttention(d_model,num_heads)
# 2. Layer normalization
self.layer_norm_1 = nn.LayerNorm(d_model)
# 3. Feed forward
self.ff = FeedForwardNetwork(d_model,d_ff)
# 4. Another layer normalization
self.layer_norm_2 = nn.LayerNorm(d_model)
# 5. Dropout
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
"""
Args:
x: Input tensor of shape (batch_size, seq_len, d_model)
mask: Optional mask for padding
Returns:
x: Output tensor of shape (batch_size, seq_len, d_model)
"""
# 1. Multi-head attention with residual connection and layer norm
# att_output = self.attention(...)
# x = x + att_output # residual connection
# x = self.norm1(x) # layer normalization
att_output = self.mha(x, x, x, mask)
x = self.dropout(x + att_output) # Apply dropout after residual
x = self.layer_norm_1(x)
ff_output = self.ff(x)
x = self.dropout(x + ff_output) # Apply dropout after residual
x = self.layer_norm_2(x)
# 2. Feed forward with residual connection and layer norm
return x
2.4.5 解码器层
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
# 1. Masked Multi-head attention
self.mha_1 = MultiHeadAttention(d_model,num_heads)
# 2. Layer norm for first sub-layer
self.layer_norm_1 = nn.LayerNorm(d_model)
# 3. Multi-head attention for cross attention with encoder output
# This will take encoder output as key and value
self.mha_2 = MultiHeadAttention(d_model,num_heads)
# 4. Layer norm for second sub-layer
self.layer_norm_2 = nn.LayerNorm(d_model)
# 5. Feed forward network
self.ff = FeedForwardNetwork(d_model,d_ff)
# 6. Layer norm for third sub-layer
self.layer_norm_3 = nn.LayerNorm(d_model)
# 7. Dropout
self.dropout = nn.Dropout(dropout)
def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
"""
Args:
x: Target sequence embedding (batch_size, target_seq_len, d_model)
encoder_output: Output from encoder (batch_size, source_seq_len, d_model)
src_mask: Mask for source padding
tgt_mask: Mask for target padding and future positions
"""
# 1. Masked self-attention
# Remember: In decoder self-attention, query, key, value are all x
att_output = self.mha_1(x,x,x,tgt_mask)
x = self.dropout(x + att_output)
x = self.layer_norm_1(x)
att_output_2 = self.mha_2(x, encoder_output,encoder_output, src_mask)
x = self.dropout(x + att_output_2)
x = self.layer_norm_2(x)
ff_output = self.ff(x)
x = self.dropout(x + ff_output)
x = self.layer_norm_3(x)
return x
2.4.6 编码器
class Encoder(nn.Module):
def __init__(self,
vocab_size,
d_model,
num_layers=6,
num_heads=8,
d_ff=2048,
dropout=0.1,
max_seq_length=5000):
super().__init__()
# 1. Input embedding
self.embeddings = nn.Embedding(vocab_size, d_model)
self.scale = math.sqrt(d_model)
# 2. Positional encoding
self.pe = PositionalEncoding(d_model, max_seq_length)
# 3. Dropout
self.dropout = nn.Dropout(dropout)
# 4. Stack of N encoder layers
self.encoder_layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
def forward(self, x, mask=None):
"""
Args:
x: Input tokens (batch_size, seq_len)
mask: Mask for padding positions
Returns:
encoder_output: (batch_size, seq_len, d_model)
"""
# 1. Pass through embedding layer and scale
x = self.embeddings(x) * self.scale
# 2. Add positional encoding and apply dropout
x = self.dropout(self.pe(x))
# 3. Pass through each encoder layer
for layer in self.encoder_layers:
x = layer(x, mask)
return x
2.4.7 解码器
class Decoder(nn.Module):
def __init__(self,
vocab_size,
d_model,
num_layers=6,
num_heads=8,
d_ff=2048,
dropout=0.1,
max_seq_length=5000):
super().__init__()
# 1. Output embedding
self.embeddings = nn.Embedding(vocab_size, d_model)
self.scale = math.sqrt(d_model)
# 2. Positional encoding
self.pe = PositionalEncoding(d_model, max_seq_length)
# 3. Dropout
self.dropout = nn.Dropout(dropout)
# 4. Stack of N decoder layers
self.decoder_layers = nn.ModuleList([
DecoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
"""
Args:
x: Target tokens (batch_size, target_seq_len)
encoder_output: Output from encoder (batch_size, source_seq_len, d_model)
src_mask: Mask for source padding
tgt_mask: Mask for target padding and future positions
Returns:
decoder_output: (batch_size, target_seq_len, d_model)
"""
# 1. Pass through embedding layer and scale
x = self.embeddings(x) * self.scale
# 2. Add positional encoding and dropout
x = self.dropout(self.pe(x))
# 3. Pass through each decoder layer
for layer in self.decoder_layers:
x = layer(x, encoder_output, src_mask, tgt_mask)
return x
实用代码
def create_padding_mask(seq):
"""
Create mask for padding tokens (0s)
Args:
seq: Input sequence tensor (batch_size, seq_len)
Returns:
mask: Padding mask (batch_size, 1, 1, seq_len)
"""
batch_size, seq_len = seq.shape
output = torch.eq(seq, 0).float()
return output.view(batch_size, 1, 1, seq_len)
def create_future_mask(size):
"""
Create mask to prevent attention to future positions
Args:
size: Size of square mask (target_seq_len)
Returns:
mask: Future mask (1, 1, size, size)
"""
# Create upper triangular matrix and invert it
mask = torch.triu(torch.ones((1, 1, size, size)), diagonal=1) == 0
return mask
def create_masks(src, tgt):
"""
Create all masks needed for training
Args:
src: Source sequence (batch_size, src_len)
tgt: Target sequence (batch_size, tgt_len)
Returns:
src_mask: Padding mask for encoder
tgt_mask: Combined padding and future mask for decoder
"""
# 1. Create padding masks
src_padding_mask = create_padding_mask(src)
tgt_padding_mask = create_padding_mask(tgt)
# 2. Create future mask
tgt_len = tgt.size(1)
tgt_future_mask = create_future_mask(tgt_len)
# 3. Combine padding and future mask for target
# Both masks should be True for allowed positions
tgt_mask = tgt_padding_mask & tgt_future_mask
return src_padding_mask, tgt_mask
2.4.8 整个Transformer
class Transformer(nn.Module):
def __init__(self,
src_vocab_size,
tgt_vocab_size,
d_model,
num_layers=6,
num_heads=8,
d_ff=2048,
dropout=0.1,
max_seq_length=5000):
super().__init__()
# Pass all necessary parameters to Encoder and Decoder
self.encoder = Encoder(
src_vocab_size,
d_model,
num_layers,
num_heads,
d_ff,
dropout,
max_seq_length
)
self.decoder = Decoder(
tgt_vocab_size,
d_model,
num_layers,
num_heads,
d_ff,
dropout,
max_seq_length
)
# The final linear layer should project from d_model to tgt_vocab_size
self.final_layer = nn.Linear(d_model, tgt_vocab_size)
def forward(self, src, tgt):
# Create masks for source and target
src_mask, tgt_mask = create_masks(src, tgt)
# Pass through encoder
encoder_output = self.encoder(src, src_mask)
# Pass through decoder
decoder_output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)
# Project to vocabulary size
output = self.final_layer(decoder_output)
# Note: Usually don't apply softmax here if using CrossEntropyLoss
# as it applies log_softmax internally
return output
Transformer实用代码
class TransformerLRScheduler:
def __init__(self, optimizer, d_model, warmup_steps):
"""
Args:
optimizer: Optimizer to adjust learning rate for
d_model: Model dimensionality
warmup_steps: Number of warmup steps
"""
# Your code here
# lrate = d_model^(-0.5) * min(step_num^(-0.5), step_num * warmup_steps^(-1.5))
self.optimizer = optimizer
self.d_model = d_model
self.warmup_steps = warmup_steps
def step(self, step_num):
"""
Update learning rate based on step number
"""
# Your code here - implement the formula
lrate = torch.pow(self.d_model,-0.5)*torch.min(torch.pow(step_num,-0.5), torch.tensor(step_num) * torch.pow(self.warmup_steps,-1.5))
class LabelSmoothing(nn.Module):
def __init__(self, smoothing=0.1):
super().__init__()
self.smoothing = smoothing
self.confidence = 1.0 - smoothing
def forward(self, logits, target):
"""
Args:
logits: Model predictions (batch_size, vocab_size) #each row of vocab_size contains probability score of each label
target: True labels (batch_size) #each row of batch size contains the index to the correct label
"""
vocab_size = logits.size(-1)
with torch.no_grad():
# Create a soft target distribution
true_dist = torch.zeros_like(logits) #create the zeros [0,0,...]
true_dist.fill_(self.smoothing / (vocab_size - 1)) #fill with calculated value [0.000125..,0.000125...] (this is an arbitarary value for example purposes)
true_dist.scatter_(1, target.unsqueeze(1), self.confidence) #add 1 to the correct index (read more on docs of pytorch)
return torch.mean(torch.sum(-true_dist * torch.log_softmax(logits, dim=-1), dim=-1)) #return cross entropy loss
2.4.9 训练Transformers
def train_transformer(model, train_dataloader, criterion, optimizer, scheduler, num_epochs, device='cuda'):
"""
Training loop for transformer
Args:
model: Transformer model
train_dataloader: DataLoader for training data
criterion: Loss function (with label smoothing)
optimizer: Optimizer
scheduler: Learning rate scheduler
num_epochs: Number of training epochs
"""
# 1. Setup
model = model.to(device)
model.train()
# For tracking training progress
total_loss = 0
all_losses = []
# 2. Training loop
for epoch in range(num_epochs):
print(f"Epoch {epoch + 1}/{num_epochs}")
epoch_loss = 0
for batch_idx, batch in enumerate(train_dataloader):
# Get source and target batches
src = batch['src'].to(device)
tgt = batch['tgt'].to(device)
# Create masks
src_mask, tgt_mask = create_masks(src, tgt)
# Prepare target for input and output
# Remove last token from target for input
tgt_input = tgt[:, :-1]
# Remove first token from target for output
tgt_output = tgt[:, 1:]
# Zero gradients
optimizer.zero_grad()
# Forward pass
outputs = model(src, tgt_input, src_mask, tgt_mask)
# Reshape outputs and target for loss calculation
outputs = outputs.view(-1, outputs.size(-1))
tgt_output = tgt_output.view(-1)
# Calculate loss
loss = criterion(outputs, tgt_output)
# Backward pass
loss.backward()
# Clip gradients
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# Update weights
optimizer.step()
scheduler.step()
# Update loss tracking
epoch_loss += loss.item()
# Print progress every N batches
if batch_idx % 100 == 0:
print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")
# Calculate average loss for epoch
avg_epoch_loss = epoch_loss / len(train_dataloader)
all_losses.append(avg_epoch_loss)
print(f"Epoch {epoch + 1} Loss: {avg_epoch_loss:.4f}")
# Save checkpoint
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': avg_epoch_loss,
}, f'checkpoint_epoch_{epoch+1}.pt')
return all_losses
设置Dataset和DataLoader
import os
import torch
import spacy
import urllib.request
import zipfile
from torch.utils.data import Dataset, DataLoader
def download_multi30k():
"""Download Multi30k dataset if not present"""
# Create data directory
if not os.path.exists('data'):
os.makedirs('data')
# Download files if they don't exist
base_url = "https://raw.githubusercontent.com/multi30k/dataset/master/data/task1/raw/"
files = {
"train.de": "train.de.gz",
"train.en": "train.en.gz",
"val.de": "val.de.gz",
"val.en": "val.en.gz",
"test.de": "test_2016_flickr.de.gz",
"test.en": "test_2016_flickr.en.gz"
}
for local_name, remote_name in files.items():
filepath = f'data/{local_name}'
if not os.path.exists(filepath):
url = base_url + remote_name
urllib.request.urlretrieve(url, filepath + '.gz')
os.system(f'gunzip -f {filepath}.gz')
def load_data(filename):
"""Load data from file"""
with open(filename, 'r', encoding='utf-8') as f:
return [line.strip() for line in f]
def create_dataset():
"""Create dataset from files"""
# Download data if needed
download_multi30k()
# Load data
train_de = load_data('data/train.de')
train_en = load_data('data/train.en')
val_de = load_data('data/val.de')
val_en = load_data('data/val.en')
return (train_de, train_en), (val_de, val_en)
class TranslationDataset(Dataset):
def __init__(self, src_texts, tgt_texts, src_vocab, tgt_vocab, src_tokenizer, tgt_tokenizer):
self.src_texts = src_texts
self.tgt_texts = tgt_texts
self.src_vocab = src_vocab
self.tgt_vocab = tgt_vocab
self.src_tokenizer = src_tokenizer
self.tgt_tokenizer = tgt_tokenizer
def __len__(self):
return len(self.src_texts)
def __getitem__(self, idx):
src_text = self.src_texts[idx]
tgt_text = self.tgt_texts[idx]
# Tokenize
src_tokens = [tok.text for tok in self.src_tokenizer(src_text)]
tgt_tokens = [tok.text for tok in self.tgt_tokenizer(tgt_text)]
# Convert to indices
src_indices = [self.src_vocab["<s>"]] + [self.src_vocab[token] for token in src_tokens] + [self.src_vocab["</s>"]]
tgt_indices = [self.tgt_vocab["<s>"]] + [self.tgt_vocab[token] for token in tgt_tokens] + [self.tgt_vocab["</s>"]]
return {
'src': torch.tensor(src_indices),
'tgt': torch.tensor(tgt_indices)
}
def build_vocab_from_texts(texts, tokenizer, min_freq=2):
"""Build vocabulary from texts"""
counter = {}
for text in texts:
for token in [tok.text for tok in tokenizer(text)]:
counter[token] = counter.get(token, 0) + 1
# Create vocabulary
vocab = {"<s>": 0, "</s>": 1, "<blank>": 2, "<unk>": 3}
idx = 4
for word, freq in counter.items():
if freq >= min_freq:
vocab[word] = idx
idx += 1
return vocab
def create_dataloaders(batch_size=32):
# Load tokenizers
spacy_de = spacy.load("de_core_news_sm")
spacy_en = spacy.load("en_core_web_sm")
# Get data
(train_de, train_en), (val_de, val_en) = create_dataset()
# Build vocabularies
vocab_src = build_vocab_from_texts(train_de, spacy_de)
vocab_tgt = build_vocab_from_texts(train_en, spacy_en)
# Create datasets
train_dataset = TranslationDataset(
train_de, train_en,
vocab_src, vocab_tgt,
spacy_de, spacy_en
)
val_dataset = TranslationDataset(
val_de, val_en,
vocab_src, vocab_tgt,
spacy_de, spacy_en
)
# Create dataloaders
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
collate_fn=collate_batch
)
val_dataloader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
collate_fn=collate_batch
)
return train_dataloader, val_dataloader, vocab_src, vocab_tgt
def collate_batch(batch):
src_tensors = [item['src'] for item in batch]
tgt_tensors = [item['tgt'] for item in batch]
# Pad sequences
src_padded = torch.nn.utils.rnn.pad_sequence(src_tensors, batch_first=True, padding_value=2)
tgt_padded = torch.nn.utils.rnn.pad_sequence(tgt_tensors, batch_first=True, padding_value=2)
return {
'src': src_padded,
'tgt': tgt_padded
}
开始训练
# Initialize your transformer with the vocabulary sizes
model = Transformer(
src_vocab_size=len(vocab_src),
tgt_vocab_size=len(vocab_tgt),
d_model=512,
num_layers=6,
num_heads=8,
d_ff=2048,
dropout=0.1
)
criterion = LabelSmoothing(smoothing=0.1).to(device)
# Now you can use your training loop
losses = train_transformer(
model=model,
train_dataloader=train_dataloader,
criterion=criterion,
optimizer=optimizer,
scheduler=scheduler,
num_epochs=10
)
3 其它及进一步阅读
以下是一些资源和更多信息,可以帮助你在学习过程中提供帮助:
什么是 torch.nn 真正的含义?
3Blue1Brown 的神经网络教程
恭喜你完成了这个教程/课程/博客,不管你如何理解它。根据人类的好奇心,你现在可能有一些问题。随时在 GitHub 上创建问题,我会将那些我认为初学者最常有的问题添加到这里。