文章目录
- 昇思MindSpore应用实践
- 基于MindSpore的Vision Transformer
- 1、Vision Transformer(ViT)简介
- 网络结构
- 2、Attention模块
- Encoder部分用到的功能函数:
- 整体构建ViT模型
- 3、模型训练
- 4、模型验证
- Reference
昇思MindSpore应用实践
本系列文章主要用于记录昇思25天学习打卡营的学习心得。
基于MindSpore的Vision Transformer
1、Vision Transformer(ViT)简介
自从2017年Google发表"Attention is ALL You Need"之后,基于自注意力机制结构的网络模型开始在各领域发展,特别是Transformer网络,不仅在NLP领域取得了成效,CV领域也开始引入Transformer网络,Vision Transformer(ViT)则是自然语言处理和计算机视觉两个领域的融合结晶。在不依赖卷积操作的情况下,通过自注意力机制捕捉图像向量序列中的一维序列的上下文关联信息,依然可以在图像分类任务上达到很好的效果。
网络结构
ViT模型的主体结构是基于Transformer模型的Encoder部分(部分结构顺序有调整,如:Normalization的位置与标准Transformer不同),其结构图如下:
- 数据集的原图像被划分为多个patch(图像块)后,将二维patch(不考虑channel)转换为一维向量,再加上类别向量与位置向量作为模型输入。
- 模型主体的Block结构是基于Transformer的Encoder结构,但是调整了Normalization的位置,其中,最主要的结构依然是Multi-head Attention结构。
- 模型在Blocks堆叠后接全连接层,接受类别向量的输出作为输入并用于分类。通常情况下,我们将最后的全连接层称为Head,Transformer Encoder部分为backbone。
# ViT的PatchEmbedding
class PatchEmbedding(nn.Cell):
MIN_NUM_PATCHES = 4
# 一幅输入224 x 224的图像,首先经过卷积处理得到16 x 16个patch,那么每一个patch的大小就是14 x 14
# 再将每一个patch的矩阵拉伸成为一个一维向量,得到的14 x 14的patch就转换为长度为196的向量
# 从而获得了近似词向量堆叠的效果,从而像NLP中的Transformer一样处理图像数据
def __init__(self,
image_size: int = 224,
patch_size: int = 16,
embed_dim: int = 768,
input_channels: int = 3):
super(PatchEmbedding, self).__init__()
self.image_size = image_size
self.patch_size = patch_size
self.num_patches = (image_size // patch_size) ** 2
self.conv = nn.Conv2d(input_channels, embed_dim, kernel_size=patch_size, stride=patch_size, has_bias=True)
def construct(self, x):
"""Path Embedding construct."""
x = self.conv(x)
b, c, h, w = x.shape
x = ops.reshape(x, (b, c, h * w))
x = ops.transpose(x, (0, 2, 1))
return x
2、Attention模块
由于ViT模型中用到了Transformer中的多头自注意力,Multi-head Attention结构,该结构基于自注意力机制,由多个Self-Attention并行组成。所以,理解了Self-Attention就抓住了Transformer的核心。
Self-Attention的核心内容是为输入向量的每个元素学习一个权重。通过给定一个任务相关的查询向量Query向量,计算Query和各个Key的相似性或者相关性得到注意力分布,即QKV机制,得到每个Key对应Value的权重系数,然后对Value进行加权求和得到最终的Attention数值。
{
q
i
=
W
q
⋅
x
i
k
i
=
W
k
⋅
x
i
,
i
=
1
,
2
,
3
…
v
i
=
W
v
⋅
x
i
\begin{cases} q_i = W_q \cdot x_i & \\ k_i = W_k \cdot x_i,\hspace{1em} &i = 1,2,3 \ldots \\ v_i = W_v \cdot x_i & \end{cases}
⎩
⎨
⎧qi=Wq⋅xiki=Wk⋅xi,vi=Wv⋅xii=1,2,3…
-
最初的输入向量首先会经过Embedding层映射成Q(Query),K(Key),V(Value)三个向量( d i m ∗ 3 dim * 3 dim∗3),如果输入向量为一个向量序列( x 1 x_1 x1, x 2 x_2 x2, x 3 x_3 x3),其中的 x 1 x_1 x1, x 2 x_2 x2, x 3 x_3 x3都是一维向量,那么每一个一维向量都会经过Embedding层映射出Q,K,V三个向量,只是Embedding矩阵不同,矩阵参数也是通过学习得到的(nn.Dense)。Q,K,V三个矩阵是发现向量之间关联信息的一种手段,需要经过学习得到(nn.Dense),至于为什么是Q,K,V三个,主要是因为需要两个向量点乘以获得权重,又需要另一个向量来承载权重向加的结果,所以,最少需要3个矩阵。
-
自注意力机制的自注意主要体现在它的Q,K,V都来源于其自身,也就是该过程是在提取输入的不同顺序的向量的联系与特征,最终通过Q与K乘积经过Softmax的结果来表现不同顺序向量之间的关联紧密性。Q,K,V得到后就需要获取向量间权重,需要对Q和K进行点乘并除以维度的平方根。
{ a 1 , 1 = q 1 ⋅ k 1 / d a 1 , 2 = q 1 ⋅ k 2 / d a 1 , 3 = q 1 ⋅ k 3 / d \begin{cases} a_{1,1} = q_1 \cdot k_1 / \sqrt d \\ a_{1,2} = q_1 \cdot k_2 / \sqrt d \\ a_{1,3} = q_1 \cdot k_3 / \sqrt d \end{cases} ⎩ ⎨ ⎧a1,1=q1⋅k1/da1,2=q1⋅k2/da1,3=q1⋅k3/d
对所有向量的结果进行Softmax处理,就获得了向量之间的关系权重:
S o f t m a x : a ^ 1 , i = e x p ( a 1 , i ) / ∑ j e x p ( a 1 , j ) , j = 1 , 2 , 3 … Softmax: \hat a_{1,i} = exp(a_{1,i}) / \sum_j exp(a_{1,j}),\hspace{1em} j = 1,2,3 \ldots Softmax:a^1,i=exp(a1,i)/j∑exp(a1,j),j=1,2,3…
3. 其最终输出则是通过V这个映射后的向量与Q,K经过Softmax结果进行weight sum获得,这个过程可以理解为在全局上进行自注意表示。每一组Q,K,V最后都有一个V输出,这是Self-Attention得到的最终结果,是当前向量在结合了它与其他向量关联权重后得到的结果。
b 1 = ∑ i a ^ 1 , i v i , i = 1 , 2 , 3... b_1 = \sum_i \hat a_{1,i}v_i,\hspace{1em} i = 1,2,3... b1=i∑a^1,ivi,i=1,2,3...
多头注意力机制就是将原本self-Attention处理的向量分割为多个Heads(dim // num_heads)进行处理,这一点也可以从代码中体现,这也是attention结构可以进行并行加速(普通的RNN却没有这一特性)的一个方面。
总结来说,多头注意力机制在保持参数总量不变的情况下,将同样的query, key和value映射到原来的高维空间(Q,K,V)的不同子空间(Q_0,K_0,V_0)中进行自注意力的计算,最后再合并不同子空间中的注意力信息。
所以,对于同一个输入向量,多个注意力机制可以同时对其进行处理,即利用并行计算加速处理过程,又在处理的时候更充分的分析和利用了向量特征。下图展示了多头注意力机制,其并行能力的主要体现在下图中的
a
1
a_1
a1和
a
2
a_2
a2是同一个向量进行分割获得的。
以下是MindSpore给出的基于Multi-Head Attention的代码:
from mindspore import nn, ops
class Attention(nn.Cell):
def __init__(self,
dim: int,
num_heads: int = 8,
keep_prob: float = 1.0, # 指定输出dropout操作的保留概率
attention_keep_prob: float = 1.0): # 指定注意力权重的dropout操作的保留概率
super(Attention, self).__init__()
self.num_heads = num_heads # 头的数量
head_dim = dim // num_heads # 每个头处理的维度,计算方法是总维度除以头数
self.scale = ms.Tensor(head_dim ** -0.5) # 缩放因子,用于调节注意力得分的尺度
self.qkv = nn.Dense(dim, dim * 3) # 用于生成查询(Q)、键(K)、值(V)矩阵
self.attn_drop = nn.Dropout(p=1.0-attention_keep_prob) # Dense层一般要加上Dropout,用于输出结果中较小的参数通过一定概率随机丢弃
self.out = nn.Dense(dim, dim)
self.out_drop = nn.Dropout(p=1.0-keep_prob)
self.attn_matmul_v = ops.BatchMatMul()
self.q_matmul_k = ops.BatchMatMul(transpose_b=True)
self.softmax = nn.Softmax(axis=-1)
def construct(self, x):
"""Attention construct."""
b, n, c = x.shape
qkv = self.qkv(x) # 给维度为dim的输入向量分配dim*3个qkv的合并表示
# 提取批量大小(b),序列长度(n),特征维度(dim=c),注意力头数,每个头的维度
qkv = ops.reshape(qkv, (b, n, 3, self.num_heads, c // self.num_heads))
qkv = ops.transpose(qkv, (2, 0, 3, 1, 4)) # 调整维度以便分离 Q, K, V
q, k, v = ops.unstack(qkv, axis=0) # 将QKV拆解出 Q, K, V
attn = self.q_matmul_k(q, k) # 计算 Q 和 K 的点积。
attn = ops.mul(attn, self.scale) # 应用缩放因子
attn = self.softmax(attn) # 应用 softmax 函数标准化分数
attn = self.attn_drop(attn) # 对注意力分数使用 dropout
out = self.attn_matmul_v(attn, v) # 使用注意力分数加权 V
out = ops.transpose(out, (0, 2, 1, 3))
out = ops.reshape(out, (b, n, c)) # 将多头输出合并回原始特征维度
out = self.out(out)
out = self.out_drop(out)
return out
在了解了Self-Attention结构之后,通过与Feed Forward
,Residual Connection
等结构的拼接就可以形成Transformer Encoder的基础结构。
Encoder部分用到的功能函数:
from typing import Optional, Dict
class FeedForward(nn.Cell):
def __init__(self,
in_features: int,
hidden_features: Optional[int] = None,
out_features: Optional[int] = None,
activation: nn.Cell = nn.GELU,
keep_prob: float = 1.0):
super(FeedForward, self).__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.dense1 = nn.Dense(in_features, hidden_features)
self.activation = activation()
self.dense2 = nn.Dense(hidden_features, out_features)
self.dropout = nn.Dropout(p=1.0-keep_prob)
def construct(self, x):
"""Feed Forward construct."""
x = self.dense1(x)
x = self.activation(x)
x = self.dropout(x)
x = self.dense2(x)
x = self.dropout(x)
return x
class ResidualCell(nn.Cell):
def __init__(self, cell):
super(ResidualCell, self).__init__()
self.cell = cell
def construct(self, x):
"""ResidualCell construct."""
return self.cell(x) + x
class TransformerEncoder(nn.Cell):
def __init__(self,
dim: int,
num_layers: int,
num_heads: int,
mlp_dim: int,
keep_prob: float = 1.,
attention_keep_prob: float = 1.0,
drop_path_keep_prob: float = 1.0,
activation: nn.Cell = nn.GELU,
norm: nn.Cell = nn.LayerNorm):
super(TransformerEncoder, self).__init__()
layers = []
for _ in range(num_layers):
normalization1 = norm((dim,))
normalization2 = norm((dim,))
attention = Attention(dim=dim,
num_heads=num_heads,
keep_prob=keep_prob,
attention_keep_prob=attention_keep_prob)
feedforward = FeedForward(in_features=dim,
hidden_features=mlp_dim,
activation=activation,
keep_prob=keep_prob)
layers.append(
nn.SequentialCell([
ResidualCell(nn.SequentialCell([normalization1, attention])),
ResidualCell(nn.SequentialCell([normalization2, feedforward]))
])
)
self.layers = nn.SequentialCell(layers)
def construct(self, x):
"""Transformer construct."""
return self.layers(x)
-
ViT模型中的基础结构与标准Transformer有所不同,主要在于Normalization的位置是放在Self-Attention和Feed Forward之前,其他结构如Residual Connection,Feed Forward,Normalization都如Transformer中所设计的一致。
-
从Transformer结构的图片可以发现,多个子encoder的堆叠就完成了模型编码器的构建,在ViT模型中,依然沿用这个思路,通过配置超参数num_layers,就可以确定堆叠层数。
-
Residual Connection,Normalization的结构可以保证模型有很强的扩展性,防止随着层数加深,产生梯度消失或梯度爆炸(保证信息经过深层处理不会出现退化的现象,这是残差连接Residual Connection的作用,设计原理依旧是源于ResNet),Normalization和dropout的应用可以增强模型泛化能力。
从以下源码中就可以清晰看到Transformer的结构。将TransformerEncoder结构和一个多层感知器(MLP)结合,就构成了ViT模型的backbone
部分:
class TransformerEncoder(nn.Cell):
def __init__(self,
dim: int,
num_layers: int,
num_heads: int,
mlp_dim: int,
keep_prob: float = 1.,
attention_keep_prob: float = 1.0,
drop_path_keep_prob: float = 1.0,
activation: nn.Cell = nn.GELU,
norm: nn.Cell = nn.LayerNorm):
super(TransformerEncoder, self).__init__()
layers = []
for _ in range(num_layers):
normalization1 = norm((dim,))
normalization2 = norm((dim,))
attention = Attention(dim=dim,
num_heads=num_heads,
keep_prob=keep_prob,
attention_keep_prob=attention_keep_prob)
feedforward = FeedForward(in_features=dim,
hidden_features=mlp_dim,
activation=activation,
keep_prob=keep_prob)
layers.append(
nn.SequentialCell([
ResidualCell(nn.SequentialCell([normalization1, attention])),
ResidualCell(nn.SequentialCell([normalization2, feedforward]))
])
)
self.layers = nn.SequentialCell(layers)
def construct(self, x):
"""Transformer construct."""
return self.layers(x)
整体构建ViT模型
from mindspore.common.initializer import Normal
from mindspore.common.initializer import initializer
from mindspore import Parameter
def init(init_type, shape, dtype, name, requires_grad):
"""Init."""
initial = initializer(init_type, shape, dtype).init_data()
return Parameter(initial, name=name, requires_grad=requires_grad)
class ViT(nn.Cell):
def __init__(self,
image_size: int = 224,
input_channels: int = 3,
patch_size: int = 16,
embed_dim: int = 768,
num_layers: int = 12,
num_heads: int = 12,
mlp_dim: int = 3072,
keep_prob: float = 1.0,
attention_keep_prob: float = 1.0,
drop_path_keep_prob: float = 1.0,
activation: nn.Cell = nn.GELU,
norm: Optional[nn.Cell] = nn.LayerNorm,
pool: str = 'cls') -> None:
super(ViT, self).__init__()
self.patch_embedding = PatchEmbedding(image_size=image_size, # 通过Patch_Embedding将输入图像向量化
patch_size=patch_size,
embed_dim=embed_dim,
input_channels=input_channels)
num_patches = self.patch_embedding.num_patches
self.cls_token = init(init_type=Normal(sigma=1.0),
shape=(1, 1, embed_dim),
dtype=ms.float32,
name='cls',
requires_grad=True)
self.pos_embedding = init(init_type=Normal(sigma=1.0),
shape=(1, num_patches + 1, embed_dim),
dtype=ms.float32,
name='pos_embedding',
requires_grad=True) # 位置编码参与training的梯度计算
self.pool = pool
self.pos_dropout = nn.Dropout(p=1.0-keep_prob)
self.norm = norm((embed_dim,))
self.transformer = TransformerEncoder(dim=embed_dim, # TransformerEncoder部分
num_layers=num_layers,
num_heads=num_heads,
mlp_dim=mlp_dim,
keep_prob=keep_prob,
attention_keep_prob=attention_keep_prob,
drop_path_keep_prob=drop_path_keep_prob,
activation=activation,
norm=norm)
self.dropout = nn.Dropout(p=1.0-keep_prob)
self.dense = nn.Dense(embed_dim, num_classes)
def construct(self, x): # ViT模型完整的前向传播过程
"""ViT construct."""
x = self.patch_embedding(x)
cls_tokens = ops.tile(self.cls_token.astype(x.dtype), (x.shape[0], 1, 1))
x = ops.concat((cls_tokens, x), axis=1) # 加入cls_token
x += self.pos_embedding # 加入位置编码
x = self.pos_dropout(x)
x = self.transformer(x) # 送入ViT计算
x = self.norm(x)
x = x[:, 0]
if self.training:
x = self.dropout(x)
x = self.dense(x) # MLP分类器
return x
再梳理一遍ViT处理图像数据实现分类任务的完整流程:
1、输入原始图像,经过图像缩放,统一为224x224的大小
2、224x224大小的图像经过卷积或手动分割处理(手动分割Patch不是不行,但是造轮子太麻烦了),变为16x16个大小为14x14的Patch
3、每个Patch先拉伸为1x196的向量,再加上class_embedding变为1x197的向量,再与1x197的pos_embedding向量相加,引入位置编码
4、将加入了pos_embedding的向量输入TransformerEncoder网络进行训练
5、分类任务中,输出结果取所有Pacth向量的class_embedding数值输入后续的MLP进行计算
3、模型训练
from mindspore.nn import LossBase
from mindspore.train import LossMonitor, TimeMonitor, CheckpointConfig, ModelCheckpoint
from mindspore import train
# define super parameter
epoch_size = 10
momentum = 0.9
num_classes = 1000
resize = 224
step_size = dataset_train.get_dataset_size()
# construct model
network = ViT()
# load ckpt
vit_url = "https://download.mindspore.cn/vision/classification/vit_b_16_224.ckpt"
path = "./ckpt/vit_b_16_224.ckpt"
vit_path = download(vit_url, path, replace=True)
param_dict = ms.load_checkpoint(vit_path)
ms.load_param_into_net(network, param_dict)
# define learning rate
lr = nn.cosine_decay_lr(min_lr=float(0),
max_lr=0.00005,
total_step=epoch_size * step_size,
step_per_epoch=step_size,
decay_epoch=10)
# define optimizer
network_opt = nn.Adam(network.trainable_params(), lr, momentum)
# define loss function
class CrossEntropySmooth(LossBase):
"""CrossEntropy."""
def __init__(self, sparse=True, reduction='mean', smooth_factor=0., num_classes=1000):
super(CrossEntropySmooth, self).__init__()
self.onehot = ops.OneHot()
self.sparse = sparse
self.on_value = ms.Tensor(1.0 - smooth_factor, ms.float32)
self.off_value = ms.Tensor(1.0 * smooth_factor / (num_classes - 1), ms.float32)
self.ce = nn.SoftmaxCrossEntropyWithLogits(reduction=reduction)
def construct(self, logit, label):
if self.sparse:
label = self.onehot(label, ops.shape(logit)[1], self.on_value, self.off_value)
loss = self.ce(logit, label)
return loss
network_loss = CrossEntropySmooth(sparse=True,
reduction="mean",
smooth_factor=0.1,
num_classes=num_classes)
# set checkpoint
ckpt_config = CheckpointConfig(save_checkpoint_steps=step_size, keep_checkpoint_max=100)
ckpt_callback = ModelCheckpoint(prefix='vit_b_16', directory='./ViT', config=ckpt_config)
# initialize model
# "Ascend + mixed precision" can improve performance
ascend_target = (ms.get_context("device_target") == "Ascend")
if ascend_target:
model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O2")
else:
model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O0")
# train model
model.train(epoch_size,
dataset_train,
callbacks=[ckpt_callback, LossMonitor(125), TimeMonitor(125)],
dataset_sink_mode=False,)
4、模型验证
dataset_val = ImageFolderDataset(os.path.join(data_path, "val"), shuffle=True)
trans_val = [
transforms.Decode(),
transforms.Resize(224 + 32),
transforms.CenterCrop(224),
transforms.Normalize(mean=mean, std=std),
transforms.HWC2CHW()
]
dataset_val = dataset_val.map(operations=trans_val, input_columns=["image"])
dataset_val = dataset_val.batch(batch_size=16, drop_remainder=True)
# construct model
network = ViT()
# load ckpt
param_dict = ms.load_checkpoint(vit_path)
ms.load_param_into_net(network, param_dict)
network_loss = CrossEntropySmooth(sparse=True,
reduction="mean",
smooth_factor=0.1,
num_classes=num_classes)
# define metric
eval_metrics = {'Top_1_Accuracy': train.Top1CategoricalAccuracy(), #MindSpore中集成了包含Top1与Top5ACC的接口
'Top_5_Accuracy': train.Top5CategoricalAccuracy()}
if ascend_target:
model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O2")
else:
model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O0")
# evaluate model
result = model.eval(dataset_val)
print(result)
# print_log
{'Top_1_Accuracy': 0.75, 'Top_5_Accuracy': 0.928}
模型验证过程主要应用了ImageFolderDataset,CrossEntropySmooth和Model等接口。
ImageFolderDataset主要用于读取数据集;
CrossEntropySmooth是损失函数实例化接口;
Model用于编译模型。
与训练过程相似,首先进行数据增强,定义ViT网络结构,加载预训练模型参数。随后设置损失函数,评价指标等,编译模型后进行验证。本案例采用了业界通用的评价标准Top_1_Accuracy
和Top_5_Accuracy
评价指标来评价模型表现。
在本案例中,这两个指标代表了在输出的1000维向量中,以最大值或前5
的输出值所代表的类别为预测结果时,模型预测的准确率。这两个指标的值越大,代表模型准确率越高。
Reference
昇思官方文档-Vision Transformer图像分类
昇思大模型平台