一、前言
模型权值平均是一种用于改善深度神经网络泛化性能的技术。通过对训练过程中不同时间步的模型权值进行平均,可以得到更宽的极值点(optima)并提高模型的泛化能力。 在PyTorch中,官方提供了实现模型权值平均的方法。
这里我们首先介绍指数移动平均(EMA)方法,它使用一个衰减系数来平衡当前权值和先前平均权值。其次,介绍了随机加权平均(SWA)方法,它通过将当前权值与先前平均权值进行加权平均来更新权值。最后,介绍了Tanh自适应指数移动EMA算法(T_ADEMA),它使用Tanh函数来调整衰减系数,以更好地适应训练过程中的不同阶段。
为了方便使用这些权值平均方法,我将官方的代码写成了一个基类AveragingBaseModel,以此引出EMAModel、SWAModel和T_ADEMAModel等方法。这些类可以用于包装原始模型,并在训练过程中更新平均权值。 为了验证这些权值平均方法的效果,我还在ResNet18模型上进行了简单的实验。实验结果表明,使用权值平均方法可以提高模型的准确率,尤其是在训练后期。
但请注意,博客中所提供的代码示例仅用于演示权值平均的原理和PyTorch的实现方式,并不能保证在所有情况下都能取得理想的效果。实际应用中,还需要根据具体任务和数据集来选择适合的权值平均方法和参数设置。
二、算法介绍
基类实现
这里我们的基类完全是参照于torch源码部分,仅仅进行了一点细微的修改。
它首先通过de_parallel函数将原始模型转换为单个GPU模型。de_parallel函数用于处理并行模型,将其转换为单个GPU模型。然后,它将转换后的模型复制到适当的设备(CPU或GPU)上(这一步很重要,问题大多数就是因为计算不匹配),并注册一个名为n_averaged的缓冲区,用于跟踪已平均的次数。
在forward方法中,它简单地将调用传递给转换后的模型。update方法首先获取当前模型和新模型的参数,并将它们转换为可迭代对象,用于更新平均权值。它接受一个新的模型作为参数,并将其与当前模型(已平均的权值)进行比较。
from copy import deepcopy
from pyzjr.core.general import is_parallel
import itertools
from torch.nn import Module
def de_parallel(model):
"""
将并行模型(DataParallel 或 DistributedDataParallel)转换为单 GPU 模型。
"""
return model.module if is_parallel(model) else model
class AveragingBaseModel(Module):
def __init__(self, model, cuda=False, avg_fn=None, use_buffers=False):
super(AveragingBaseModel, self).__init__()
device = 'cuda' if cuda and torch.cuda.is_available() else 'cpu'
self.module = deepcopy(de_parallel(model))
self.module = self.module.to(device)
self.register_buffer('n_averaged',
torch.tensor(0, dtype=torch.long, device=device))
self.avg_fn = avg_fn
self.use_buffers = use_buffers
def forward(self, *args, **kwargs):
return self.module(*args, **kwargs)
def update(self, model):
self_param = itertools.chain(self.module.parameters(), self.module.buffers() if self.use_buffers else [])
model_param = itertools.chain(model.parameters(), model.buffers() if self.use_buffers else [])
self_param_detached = [p.detach() for p in self_param]
model_param_detached = [p.detach().to(p_averaged.device) for p, p_averaged in zip(model_param, self_param_detached)]
if self.n_averaged == 0:
for p_averaged, p_model in zip(self_param_detached, model_param_detached):
p_averaged.copy_(p_model)
if self.n_averaged > 0:
for p_averaged, p_model in zip(self_param_detached, model_param_detached):
n_averaged = self.n_averaged.to(p_averaged.device)
p_averaged.copy_(self.avg_fn(p_averaged, p_model, n_averaged))
if not self.use_buffers:
for b_swa, b_model in zip(self.module.buffers(), model.buffers()):
b_swa.copy_(b_model.to(b_swa.device).detach())
self.n_averaged += 1
若当前模型尚未进行过平均(即n_averaged为0),则直接将新模型的参数复制到当前模型中。若当前模型已经进行过平均,则通过avg_fn函数计算当前模型和新模型的加权平均,并将结果复制到当前模型中。如果use_buffers为True,则会将缓冲区从新模型复制到当前模型。最后,n_averaged增加1,表示已进行一次平均。
指数移动平均(EMA)
EMA被用于根据当前参数和之前的平均参数来更新平均参数。其计算公式如下所示:
这里的EMA param是当前的平均参数,current param是当前的参数,decay是一个介于0和1之间的衰减因子,它用于控制当前参数对平均参数的贡献程度。decay越接近1,平均参数对当前参数的影响就越小,反之亦是。
def get_ema_avg_fn(decay=0.999):
@torch.no_grad()
def ema_update(ema_param, current_param, num_averaged):
return decay * ema_param + (1 - decay) * current_param
return ema_update
class EMAModel(AveragingBaseModel):
def __init__(self, model, cuda = False, decay=0.9, use_buffers=False):
super().__init__(model=model, cuda=cuda, avg_fn=get_ema_avg_fn(decay), use_buffers=use_buffers)
随机加权平均(SWA)
SWA通过对神经网络的权重进行平均来改善模型的泛化能力。其计算公式如下所示:
SWA param是新的平均参数,averaged param是之前的平均参数,current param是当前的参数,num avg是已经平均的参数数量。
def get_swa_avg_fn():
@torch.no_grad()
def swa_update(averaged_param, current_param, num_averaged):
return averaged_param + (current_param - averaged_param) / (num_averaged + 1)
return swa_update
class SWAModel(AveragingBaseModel):
def __init__(self, model, cuda = False,use_buffers=False):
super().__init__(model=model, cuda=cuda, avg_fn=get_swa_avg_fn(), use_buffers=use_buffers)
Tanh自适应指数移动EMA算法(T_ADEMA)
这一个是在查询资料的时候,找到的一篇论文描述的,是否有效,还得经过实验才对。
全文阅读--XML全文阅读--中国知网 (cnki.net)
论文表示是为了在神经网络训练过程中根据不同的训练阶段更有效地过滤噪声,所提出的公式:
T_ADEMA param是新的平均参数,avg param是之前的平均参数,current param是当前的参数,num avg是已经平均的参数数量。alpha是一个控制衰减速率的超参数。通过将参数数量作为输入传递给切线函数的参数,动态地计算衰减因子。切线函数(tanh)的输出范围为[-1, 1],随着参数数量的增加,衰减因子会逐渐趋近于1。由于切线函数的特性,当参数数量较小时,衰减因子接近于0;当参数数量较大时,衰减因子接近于1。
def get_t_adema(alpha=0.9):
num_averaged = [0] # 使用列表包装可变对象,以在闭包中引用
@torch.no_grad()
def t_adema_update(averaged_param, current_param, num_averageds):
num_averaged[0] += 1
decay = alpha * torch.tanh(torch.tensor(num_averaged[0], dtype=torch.float32))
tadea_update = decay * averaged_param + (1 - decay) * current_param
return tadea_update
return t_adema_update
class T_ADEMAModel(AveragingBaseModel):
def __init__(self, model, cuda=False, alpha=0.9, use_buffers=False):
super().__init__(model=model, cuda=cuda, avg_fn=get_t_adema(alpha), use_buffers=use_buffers)
三、构建一个简单的实验测试
这一部分我正在做实验,下面是调用了一个简单的resnet18网络,看看逻辑上面是否有错。
if __name__=="__main__":
# 创建 ResNet18 模型
import torch
import torchvision.models as models
from torch.utils.data import DataLoader
from tqdm import tqdm
from torch.optim.swa_utils import AveragedModel
class RandomDataset(torch.utils.data.Dataset):
def __init__(self, size=224):
self.data = torch.randn(size, 3, 224, 224)
self.labels = torch.randint(0, 2, (size,))
def __getitem__(self, index):
return self.data[index], self.labels[index]
def __len__(self):
return len(self.data)
model = models.resnet18(pretrained=False)
# model = model.to('cuda')
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()
# 创建数据加载器
train_dataset = RandomDataset()
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# 定义权重平均模型
swa_model = SWAModel(model, cuda=True)
ema_model = EMAModel(model, cuda=True)
t_adema_model = T_ADEMAModel(model, cuda=True)
for epoch in range(5):
for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{5}"):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# 更新权重平均模型
ema_model.update(model)
swa_model.update(model)
t_adema_model.update(model)
# 测试模型
test_dataset = RandomDataset(size=100)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
def evaluate(model):
model.eval() # 切换到评估模式
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to('cuda'), labels.to('cuda')
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = correct / total
print(f"模型准确率:{accuracy * 100:.2f}%")
# 原模型测试
print("Model Evaluation:")
evaluate(model.to('cuda')) #
# 测试权重平均模型
print("SWAModel Evaluation:")
evaluate(swa_model.to('cuda'))
print("EMAModel Evaluation:")
evaluate(ema_model.to('cuda'))
print("T-ADEMAModel Evaluation:")
evaluate(t_adema_model.to('cuda'))
运行效果:
Model Evaluation: 模型准确率:46.00% SWAModel Evaluation: 模型准确率:54.00% EMAModel Evaluation: 模型准确率:58.00% T - ADEMAModel Evaluation: 模型准确率:58.00%
仅仅是测试是否能够跑通,过程中也有比原模型要低的时候,而且权值平均主要是用于训练中后期,所以有没有效果应该需要自己去做实验。
当前你可以下载pip install pyzjr==1.2.9,调用from pyzjr.nn import EMAModel运行。