目录
一、 原代码
二、 代码学习(修改后并加上详细注释)
1. 控制器
2. NASModel
3. 初始化及训练过程
3.1 主要参数的初始化
3.2 数据集的准备与加载
3.3 搜索空间
3.4 训练、参数更新
4. 对搜索空间、搜索策略、性能评估策略的认识
4.1 搜索空间(Search Space)
4.2 搜索策略(Search Strategy)
4.3 性能评估策略(Performance Evaluation Strategy)
5. 修改后代码
一、 原代码
(原代码不是本人编写)
代码地址如下:
https://github.com/Longcodedao/NAS-With-RL
文件格式为.ipynb,需要使用jupyter notebook,把它整理出来(粘到一起,并加上解析)如下:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
class Params:
NUM_EPOCHS = 50
ALPHA = 0.005
BATCH_SIZE = 64
HIDDEN_SIZE = 64 # Number of Hidden Units in Controller
BETA = 0.1 # The entropy bonus multiplier
INPUT_SIZE = 3
ACTION_SPACE = 2
NUM_STEPS = 4
GAMMA = 0.99
# 设置数据转换函数
# Compose()将多个transforms合并,参数是由多个transform对象组合成的列表
# ToTensor()转化为tensor格式图片。输入参数,自动调用__call__方法,把图片变成tensor格式
# Normalize()实例化,输入了均值mean和方差std两个参数,``output[channel] = (input[channel] - mean[channel]) / std[channel]``
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))]
)
# 准备数据集
# 参数train为TRUE则返回训练集,为FALSE则返回测试集,download设置为TRUE则自动从网上下载
trainset = torchvision.datasets.MNIST(root='./data', train=True,
download=True, transform=transform)
testset = torchvision.datasets.MNIST(root='./data', train=False,
download=True, transform=transform)
# 用dataloader加载数据集
# 参数:dataset即为数据集;batch_size为多少个为一组;shuffle为TRUE时每次加载顺序不同,为FALSE时每次加载数据集的顺序相同
# drop_last默认为false,为TRUE时最后一组数据不满batch_size时舍去,为FALSE时不舍
# num_workers使用多少个子进程来加载数据。 0表示数据将在主进程中加载。 (默认值:0)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64,
shuffle=True, num_workers=2)
testlaoder = torch.utils.data.DataLoader(testset, batch_size=64,
shuffle=False, num_workers=2)
# 定义控制器,LSTM网络
class Controller(nn.Module):
def __init__(self, search_space,
hidden_size=64, max_layer=4, device=''):
super(Controller, self).__init__()
self.search_space = search_space # 包括隐藏单元的数量和激活函数的种类
self.DEVICE = device
self.hidden_size = hidden_size
self.length_search = len(search_space) # 搜索长度为搜索空间的大小:2 # num_steps = max_layer * length_search_space= 4 * 2
self.list_length = [len(space) for space in search_space.values()] # 搜索空间中每个类别的列表值:[隐藏单元数量, 激活函数种类]:[4, 3]
self.max_layer = max_layer
# 生成1到4之间的随机整数,不包括4,维度为1*1, 类型为tensor,然后用item读取tensor值,最终返回一个int
self.total_layer = torch.randint(1, self.max_layer, (1,)).item() # --------------添加这一行
self.lstm = nn.ModuleList() # 在列表中保存子模块
self.fc = nn.ModuleList()
# 添加LSTM子模块,input_size = 输入大小(输入x中预期特征的数量,这里是激活函数种类3个),hidden_size = 隐藏特征数量(隐藏状态h的特征数量)
self.lstm.append(nn.LSTMCell(self.list_length[-1], self.hidden_size).to(self.DEVICE))
# 继续添加LSTM子模块,最终的lstm模块包含两层LSTM,第一层输入特征为激活函数类别个数,第二层为隐藏单元个数,对输入序列应用LSTM RNN
for i in range(1, self.length_search):
self.lstm.append(nn.LSTMCell(self.list_length[i - 1], self.hidden_size).to(self.DEVICE))
# 添加self.length_search层(2层)全连接层,
for i in range(0, self.length_search):
# linear参数:输入特征大小,输出特征大小。作用:对传入数据应用线性变换
self.fc.append(nn.Linear(self.hidden_size, self.list_length[i]).to(self.DEVICE))
# 这个方法是用来初始化隐藏状态(h_t)和单元状态(c_t)的。
# 这些状态是 LSTM 的内部状态,用来记录和处理序列信息。得到的状态矩阵的形状是 (1, self.hidden_size),并且他们都被初始化为全0.
def init_hidden(self):
h_t = torch.zeros(1, self.hidden_size, dtype=torch.float, device=self.DEVICE)
c_t = torch.zeros(1, self.hidden_size, dtype=torch.float, device=self.DEVICE)
return h_t, c_t
# 前向传播
def forward(self, input):
# self.total_layer = torch.randint(1, self.max_layer, (1,)).item()
# outputs用来存放每个搜索空间中每个关键字的输出。这个字典的每一个键值对表示搜索空间中的一个元素经过LSTM及全连接层处理后的输出序列。
outputs = {}
# 对self.length_search个隐藏层状态进行初始化。
self.hidden = [self.init_hidden() for _ in range(self.length_search)]
# 对于每一层来说
for num_layer in range(self.max_layer):
# 先按顺序遍历self.search_space的每个元素,这个元素在这里被命名为(key, val)。然后,取出与i索引相对应的隐藏状态(h_t, c_t),计算LSTM层的输出。
for i, (key, val) in enumerate(self.search_space.items()):
h_t, c_t = self.hidden[i]
# lstm的输入是输入数据input和当前的隐藏状态 (h_t, c_t)。输出是新的隐藏状态和单元状态 (h_t, c_t),再赋值给与i索引对应的隐藏状态。
h_t, c_t = self.lstm[i](input, (h_t, c_t))
self.hidden[i] = (h_t, c_t)
# 新的隐藏状态h_t经过全连接层(self.fc[i])计算,得到output。这个output又被作为下一个LSTM层的输入。
output = self.fc[i](h_t)
# print(output)
input = output
# 全连接层的输出被添加到outputs字典的对应键key下。如果key在outputs中不存在,那么就创建一个新的键值对;否则,就在已有的键值对后添加新的输出。
if key not in outputs.keys():
outputs[key] = [output]
else:
outputs[key].extend([output])
# print(outputs)`
# for _ in range(self.length_search):
# h_t, c_t = self.hidden[i]
# h_t.detach_()
# c_t.detach_()
# self.hidden[i] = (h_t, c_t)
# 整理outputs,将每一个 key 对应的一系列 tensor 堆叠起来,并压缩掉所有不必要的维度,让每个 key 对应一个形状更加整洁、方便处理的 tensor。
for i, (key, val) in enumerate(outputs.items()):
outputs[key] = torch.stack(outputs[key]).squeeze(1)
return outputs
# 0: nn.ReLU, 1: nn.Tanh, 2: nn.Sigmoid
# 设置搜索空间,包括隐藏单元的数量和激活函数的种类
search_space = {
"hidden_units": [8, 16, 32, 64],
"activation": [0, 1, 2]
}
# 使用gpu还是cpu
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 控制器的实例化
controller = Controller(search_space, max_layer=4, device=device)
print(f"Total Layer: {controller.total_layer}")
print(f"List Length: {controller.list_length}")
print(controller)
# 控制器的输入
input = torch.tensor([[1.0, 2.0, 3.0]]).to(device)
outputs = controller(input)
# print(outputs)
# 神经网络架构搜索部分,输出一个搜索完毕的DNN模型model
class NASModel(nn.Module):
def __init__(self, architectures, input_size, output_size):
super(NASModel, self).__init__()
self.architectures = architectures # 一个字典,搜索空间。
self.length_layers = len(self.architectures['hidden_units']) # 隐藏层单元可能的结果,4
self.output_size = output_size # 输入大小784,输出大小10
layers = []
# 对每个隐藏层而言,将每个隐藏层的单元个数和激活函数赋给每层
for layer in range(self.length_layers):
hidden_units = self.architectures['hidden_units'][layer].item()
activation = self.architectures['activation'][layer].item()
# print(activation)
if activation == 0:
activation = nn.ReLU()
elif activation == 1:
activation = nn.Tanh()
elif activation == 2:
activation = nn.Sigmoid()
# 对于第一层,输入为输入大小,输出为第一层隐藏单元数量
if layer == 0:
layers.append(nn.Linear(input_size, hidden_units))
layers.append(activation)
# 对于其他各层,输入为上一层隐藏单元数量,输出为当前隐藏层数量
else:
layers.append(nn.Linear(self.architectures['hidden_units'][layer - 1].item(),
hidden_units))
layers.append(activation)
# 最后一个线性层,输入为最后一层单元数量,输出为输出大小
layers.append(nn.Linear(self.architectures['hidden_units'][self.length_layers - 1].item(), self.output_size))
# 加上softmax层,Softmax函数是将多分类输出值转换为概率分布的函数,它可以将输出值范围映射到 [0, 1],并且约束输出值的和为1
layers.append(nn.Softmax(dim=1))
# print(layers)
# 用sequential将layers构建到一起
self.model = nn.Sequential(*layers)
def forward(self, x):
# 输入经过创建好的模型的前向传播,返回输出
return self.model(x)
from torch.distributions import Categorical
from torch.nn.functional import one_hot, log_softmax, softmax, normalize
# 根据控制器和搜索空间,采样一系列操作来创建一个网络架构,并计算这些操作的对数概率。
# 其中“网络架构”表示为一个architecture字典,键为操作,值为操作的结果。
# 同时,每一个操作的对数概率都被记录在episode_total_log_probs字典中。
architecture = {}
episode_total_log_probs = {}
# 创建控制器
controller = Controller(search_space, max_layer=4, device=device)
# 控制器的输出,是一个字典,这个字典的每一个键值对表示搜索空间中的一个元素经过LSTM及全连接层处理后的输出序列,是一个概率。
# 通过输入数据input计算得到历次操作的概率分布
episode_logits = controller(input)
# episode_logits: {'hidden_units': tensor([[ 0.0253, -0.0500, -0.0533, -0.0923],
# [-0.0050, -0.0620, -0.0510, -0.0730],
# [-0.0128, -0.0644, -0.0466, -0.0669],
# [-0.0160, -0.0668, -0.0416, -0.0650]], grad_fn=<SqueezeBackward1>),
# 'activation': tensor([[-0.0373, 0.0469, -0.0961],
# [-0.0274, 0.0363, -0.0952],
# [-0.0214, 0.0308, -0.0930],
# [-0.0175, 0.0282, -0.0909]], grad_fn=<SqueezeBackward1>)}
print(f"Number of layers is: {controller.total_layer}") # 1-3之间的正整数
# 对于搜索空间的每一个元素
# 隐藏单元的数量, 激活函数的类别
for key, space in search_space.items():
logits = episode_logits[key]
# 从对应概率中通过采样取出一个动作,创建由probs或logits参数化的分类分布(但不能同时使用两者)。
action_index = Categorical(logits=logits).sample().unsqueeze(0) # ------------unsqueeze的1改为0
# print(action_index)
# 为每一个动作创建一个动作空间,其中的值为搜索空间的键对应的候选值
actions_space = torch.tensor([space] * controller.total_layer).to(device)
# 根据action_index从动作空间中取出对应的动作。第一次输出隐藏单元数量的action,第二次输出激活函数类别的tensor
action = torch.gather(actions_space, 1, action_index).to(device)
# print(action)
# 循环一次tensor([[32, 64, 64, 8]], device='cuda:0')
# 循环两次tensor([[1, 2, 0, 1]], device='cuda:0')
# 每一个动作都被添加到architecture字典
architecture[key] = action.squeeze(0) # squeeze的1改为0
# print(action_index.int().squeeze(1))
# 代码使用了one_hot函数和log_softmax函数来计算每个操作的one-hot向量和对数概率。
mask = one_hot(action_index, num_classes=len(space))
episode_log_probs = torch.sum(mask.float() * log_softmax(logits, dim=1), dim=1)
# 每一个动作的对数概率都被计算并添加到episode_total_log_probs字典中。
episode_total_log_probs[key] = episode_log_probs
print(architecture)
# {'hidden_units': tensor([32, 64, 64, 8], device='cuda:0'), 'activation': tensor([1, 2, 0, 1], device='cuda:0')}
print(episode_total_log_probs)
# 每一个episode的总的概率
# {'hidden_units': tensor([[-1.2527, 0.0000, -1.4808, -2.7254]], device='cuda:0', grad_fn=<SumBackward1>),
# 'activation': tensor([[-1.1186, -2.0931, -1.1331]], device='cuda:0', grad_fn=<SumBackward1>)}
model = NASModel(architecture, 784, 10)
print(model)
# NASModel(
# (model): Sequential(
# (0): Linear(in_features=784, out_features=32, bias=True)
# (1): Tanh()
# (2): Linear(in_features=32, out_features=64, bias=True)
# (3): Sigmoid()
# (4): Linear(in_features=64, out_features=64, bias=True)
# (5): ReLU()
# (6): Linear(in_features=64, out_features=8, bias=True)
# (7): Tanh()
# (8): Linear(in_features=8, out_features=10, bias=True)
# (9): Softmax(dim=1)
# )
# )
from torch.distributions import Categorical
from torch.nn.functional import one_hot, log_softmax, softmax, normalize
import torch.optim as optim
import tqdm
# 强化学习部分,定义“关卡”
def play_episode(controller):
architecture = {} # 搜索空间中类别采用的动作,如{'hidden_units': tensor([64, 8, 8, 64]), 'activation': tensor([0, 0, 1, 1])}
episode_total_log_probs = {}
input = torch.tensor([[1.0, 2.0, 3.0]]).to(device)
# print(controller)
episode_logits = controller(input)
for key, space in search_space.items():
logits = episode_logits[key] # 每个搜索类别的概率
action_index = Categorical(logits=logits).sample().unsqueeze(0) # unsqueeze的1改为0
actions_space = torch.tensor([space] * controller.total_layer).to(device)
action = torch.gather(actions_space, 1, action_index).to(device)
architecture[key] = action.squeeze(0) # squeeze的1改为0
# print(action_index.int().squeeze(1))
mask = one_hot(action_index, num_classes=len(space))
episode_log_probs = torch.sum(mask.float() * log_softmax(logits, dim=1), dim=1)
episode_total_log_probs[key] = episode_log_probs
model = NASModel(architecture, 784, 10).to(device)
print(f'{model}\n')
criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9) # 优化器
for epoch in range(10):
model.train()
running_loss = 0.0
for i, data in enumerate(trainloader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
inputs = inputs.view(-1, 784)
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
running_loss /= len(trainloader)
print(f"Epoch {epoch + 1}: Loss = {running_loss}")
# 将模型设置成evaluation模式, 仅仅当模型中有Dropout和BatchNorm是才会有影响。
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data in testlaoder:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = model(images.view(-1, 784))
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
print('Accuracy of the network on the 10000 test images: {}'.format(acc))
# compute the reward
reward = acc
reward = torch.tensor(reward, device=device).detach()
sum_weighted_log_probs = {}
sum_weighted_log_probs['hidden_units'] = torch.sum(-episode_total_log_probs['hidden_units'] * reward).unsqueeze(0)
sum_weighted_log_probs['activation'] = torch.sum(-episode_total_log_probs['activation'] * reward).unsqueeze(0)
sum_weighted_loss = sum_weighted_log_probs['hidden_units'] + \
sum_weighted_log_probs['activation']
return sum_weighted_loss, episode_total_log_probs, reward
# 开始执行
controller = Controller(search_space, max_layer=4, device=device)
print(controller)
# 使用optim时,必须构造一个优化器对象,它将保存当前状态,并将根据计算的梯度更新参数。
# 要构造一个Optimizer,你必须给它一个可迭代对象,其中包含要优化的参数(都应该是变量s)。然后,您可以指定特定于优化器的选项,如学习率、权重衰减等。
# 这里使用Adam优化算法
optimizer = optim.Adam(controller.parameters(), lr=0.001)
total_rewards = []
# 将module设置为 training mode,这行代码不是必须加上,仅仅当模型中有Dropout和BatchNorm是才会有影响。
controller.train()
for epoch in range(10):
optimizer.zero_grad() # 在每次更新权重前,先清零所有被优化变量(通常是模型的参数)的梯度
epoch_log_probs = torch.empty((0,), device=device) # 概率值
for i in range(3):
(sum_weighted_loss, episode_logits, reward) = play_episode(controller)
print(sum_weighted_loss)
epoch_log_probs = torch.cat((epoch_log_probs, sum_weighted_loss))
loss = torch.mean(epoch_log_probs)
loss.backward() # 计算梯度
optimizer.step() # 使用优化器更新参数
# for name, param in controller.named_parameters():
# print(name, param.grad)
print(f"Loss in {epoch} is: {loss}")
# 后面是进行DNN的训练,进行10个epoch
'''
###########
input = torch.tensor([[1.0, 2.0, 3.0]]).to(device)
controller(input)
episode_logits = controller(input)
architecture = {}
episode_total_log_probs = {}
for key, space in search_space.items():
logits = episode_logits[key]
action_index = Categorical(logits = logits).sample().unsqueeze(0)
actions_space = torch.tensor([space] * controller.total_layer).to(device)
action = torch.gather(actions_space, 1, action_index).to(device)
architecture[key] = action.squeeze(0)
# print(action_index.int().squeeze(1))
mask = one_hot(action_index, num_classes = len(space))
episode_log_probs = torch.sum(mask.float() * log_softmax(logits, dim = 1), dim = 1)
episode_total_log_probs[key] = episode_log_probs
##############
model = NASModel(architecture, 784, 10).to(device)
print(f'{model}\n')
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr = 0.005, momentum = 0.9)
for epoch in range(10):
model.train()
running_loss = 0.0
for i, data in enumerate(trainloader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
inputs = inputs.view(-1, 784)
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
running_loss /= len(trainloader)
print(f"Epoch {epoch + 1}: Loss = {running_loss}")
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data in testlaoder:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = model(images.view(-1, 784))
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
print('Accuracy of the network on the 10000 test images: {}'.format(acc))
'''
二、 代码学习(修改后并加上详细注释)
仔细看了一下,这个代码的思路总结如下:
1. 控制器
在其中构建了一个循环神经网络RNN,使用的是两层LSTM加上两层Linear:
Controller(
(lstm): ModuleList(
(0): LSTMCell(3, 64)
(1): LSTMCell(4, 64)
)
(fc): ModuleList(
(0): Linear(in_features=64, out_features=4, bias=True)
(1): Linear(in_features=64, out_features=3, bias=True)
)
)
这个控制器的输入是一个tensor向量,输出是搜索空间中每个类别内容的概率分布,这里是四层隐藏层中每层的单元数和激活函数种类的概率值。
outputs长这样:
{'hidden_units': tensor([[-0.0952, -0.1259, 0.1534, 0.0106],
[-0.0963, -0.0973, 0.1138, 0.0093],
[-0.1015, -0.0844, 0.1024, 0.0050],
[-0.1061, -0.0790, 0.0976, 0.0015]], device='cuda:0',
grad_fn=<SqueezeBackward1>), 'activation': tensor([[-0.0934, 0.0654, -0.1024],
[-0.0983, 0.0586, -0.0943],
[-0.1009, 0.0556, -0.0908],
[-0.1022, 0.0543, -0.0895]], device='cuda:0',
grad_fn=<SqueezeBackward1>)}
(控制器的作用是在搜索空间中生成神经网络架构。控制器通过输入当前的状态,产生下一步的操作(控制器的输出)。然后这个操作会影响下一步的状态。过程持续进行,直到得到一个完整的神经网络架构。这个代码中好像是把它分成两部分编写了)
2. NASModel
输入由三部分组成,(controller输出的概率生成的architectures,输入大小,输出大小)。
architecture长这样:
{'hidden_units': tensor([64, 8, 16, 16], device='cuda:0'),
'activation': tensor([0, 0, 1, 2], device='cuda:0')}
输出是一个DNN,长这样:
NASModel(
(model): Sequential(
(0): Linear(in_features=784, out_features=8, bias=True)
(1): Tanh()
(2): Linear(in_features=8, out_features=8, bias=True)
(3): Sigmoid()
(4): Linear(in_features=8, out_features=8, bias=True)
(5): Sigmoid()
(6): Linear(in_features=8, out_features=16, bias=True)
(7): Sigmoid()
(8): Linear(in_features=16, out_features=10, bias=True)
(9): Softmax(dim=1)
)
)
3. 初始化及训练过程
3.1 主要参数的初始化
class Params:
NUM_EPOCHS = 50
ALPHA = 0.005
BATCH_SIZE = 64
HIDDEN_SIZE = 64 # Number of Hidden Units in Controller
BETA = 0.1 # The entropy bonus multiplier
INPUT_SIZE = 3
ACTION_SPACE = 2
NUM_STEPS = 4
GAMMA = 0.99
3.2 数据集的准备与加载
这部分数据集用来训练DNN。
# 定义转换函数
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))]
)
# 准备数据集
train_set = torchvision.datasets.MNIST(root='./data', train=True,
download=True, transform=transform)
test_set = torchvision.datasets.MNIST(root='./data', train=False,
download=True, transform=transform)
# 加载数据集
train_loader = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=64, shuffle=False)
注意:加载数据集时根据电脑硬件配置进行参数设置,这里batch_size设为64,没有使用num_workers(原代码设置为2)。
3.3 搜索空间
包括两部分内容:一个是隐藏单元数量(有8,16,32,64四种),另一个是激活函数的种类(有ReLU,Tanh,Sigmoid四种)。
# 0: nn.ReLU, 1: nn.Tanh, 2: nn.Sigmoid
search_space = {
"hidden_units": [8, 16, 32, 64],
"activation": [0, 1, 2]
}
搜索空间的所有可能结果有:,当神经网络的层数为4时,这个搜索空间就有20736种可能,还是相当大的。
3.4 训练、参数更新
控制器实例化:
controller = Controller(search_space, max_layer=4, device=device)
控制器的输入是一个随机向量,并不是用正态分布或者随机分布抽取的,而是用更复杂的方法,暂时不需要搞懂。
进入训练步骤(10个大epoch):
在一个大epoch中有两个梯度:
1. 在play_episode函数中使用SGD优化器更新DNN,损失函数为CrossEntropyLoss,训练10个小epoch并进入验证集验证。
2. 完成10个小epoch的训练后,使用Adam优化器更新controller的网络,损失函数为mean。
也就是一共训练100个epoch。
在play_episode函数中,通过控制器输出网络参数的概率,用NASModel根据控制器的输出生成DNN网络,用MNIST数据集训练DNN,输出reward。
感觉代码还是有一些问题,reward和更新控制器那块儿。。。。。。。
4. 对搜索空间、搜索策略、性能评估策略的认识
在强化学习的神经网络架构搜索(Neural Architecture Search,NAS)中,搜索空间、搜索策略和性能评估策略都是非常重要的组成部分,对于确定最优的网络架构具有决定性的影响。下面我将分别解释这三个概念:
4.1 搜索空间(Search Space)
搜索空间定义了所有可能的神经网络架构的集合。在搜索空间中,每一个神经网络架构可以看作是一种可能的解决方案。搜索空间可以包括各种不同的网络类型(例如卷积神经网络、递归神经网络等)、各种不同的层(例如卷积层、全连接层等)、各种不同的连接方式等等。这份代码中是隐藏单元数量和激活函数种类。
4.2 搜索策略(Search Strategy)
搜索策略定义了如何在搜索空间中寻找最优的神经网络架构。常见的搜索策略有基于强化学习的搜索策略(RNN作为策略函数)、基于遗传算法的搜索策略、基于贝叶斯优化的搜索策略等等。在基于强化学习的搜索策略中,神经网络架构被看作是一个环境,每一步的选择(例如选择添加哪一种类型的层,选择如何连接各个层)对应于一个动作,而神经网络在验证集上的性能被看作是回报。通过不断地与环境进行交互,强化学习算法将学习到一个策略,这个策略可以在搜索空间中找到最优的神经网络架构。
4.3 性能评估策略(Performance Evaluation Strategy)
性能评估策略定义了如何评价每一个神经网络架构的性能。常见的性能评估策略有基于验证集的性能评估、基于模型复杂度的性能评估等等。性能评估策略通常需要在评估性能和评估速度之间取得平衡。如果我们对每一个神经网络架构都进行完整的训练和验证,那么性能评估将会非常准确,但是这通常需要消耗大量的计算资源。因此,实际中常常会使用一种折中的策略,例如只训练和验证部分的数据,或者只训练和验证部分的epoch,然后基于这些结果来评估神经网络架构的性能。
5. 修改后代码
# 日期: 2024/1/23 21:34
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
from torch.distributions import Categorical
from torch.nn.functional import one_hot, log_softmax, softmax, normalize
import torch.optim as optim
import tqdm
class Params:
NUM_EPOCHS = 50
ALPHA = 0.005
BATCH_SIZE = 64
HIDDEN_SIZE = 64 # Number of Hidden Units in Controller
BETA = 0.1 # The entropy bonus multiplier
INPUT_SIZE = 3
ACTION_SPACE = 2
NUM_STEPS = 4
GAMMA = 0.99
# 定义控制器,LSTM网络
class Controller(nn.Module):
def __init__(self, search_space,
hidden_size=64, max_layer=4, device=''):
super(Controller, self).__init__()
self.search_space = search_space # 包括隐藏单元的数量和激活函数的种类
self.DEVICE = device # 指定是在gpu还是cpu上
self.hidden_size = hidden_size # 控制器中隐藏单元的数量
# 搜索长度为搜索空间的大小:2 # num_steps = max_layer * length_search_space= 4 * 2
self.length_search = len(search_space) # num_steps = max_layer * length_search_space
self.list_length = [len(space) for space in search_space.values()] # 搜索空间中每个类别的列表长度:[隐藏单元数量的种类, 激活函数种类]:[4, 3]
self.max_layer = max_layer # 最大层数:4
# 总层数:生成1到4之间的随机整数,不包括4,维度为1*1, 类型为tensor,然后用item读取tensor值,最终返回一个int
self.total_layer = torch.randint(1, self.max_layer, (1,)).item()
# 在列表中保存子模块
self.lstm = nn.ModuleList()
self.fc = nn.ModuleList()
# 添加LSTM子模块,input_size = 输入大小(输入x中预期特征的数量,这里是激活函数种类3个),hidden_size = 隐藏特征数量(隐藏状态h的特征数量)
self.lstm.append(nn.LSTMCell(self.list_length[-1], self.hidden_size).to(self.DEVICE))
# 继续添加LSTM子模块,最终的lstm模块包含两层LSTM,第一层输入特征为激活函数类别个数,第二层为隐藏单元个数,对输入序列应用LSTM RNN
for i in range(1, self.length_search):
self.lstm.append(nn.LSTMCell(self.list_length[i - 1], self.hidden_size).to(self.DEVICE))
# 添加self.length_search层(2层)全连接层
for i in range(0, self.length_search):
# linear参数:输入特征大小,输出特征大小。作用:对传入数据应用线性变换
self.fc.append(nn.Linear(self.hidden_size, self.list_length[i]).to(self.DEVICE))
# 这个方法是用来初始化隐藏状态(h_t)和单元状态(c_t)的。
# 这些状态是 LSTM 的内部状态,用来记录和处理序列信息。得到的状态矩阵的形状是 (1, self.hidden_size),并且他们都被初始化为全0.
def init_hidden(self):
h_t = torch.zeros(1, self.hidden_size, dtype=torch.float, device=self.DEVICE)
c_t = torch.zeros(1, self.hidden_size, dtype=torch.float, device=self.DEVICE)
return h_t, c_t
# 前向传播
def forward(self, input):
# outputs用来存放搜索空间中每个类别内容的概率分布。这个字典的每一个键值对表示搜索空间中的一个元素经过LSTM及全连接层处理后的输出序列。
outputs = {}
# 对self.length_search个隐藏层状态进行初始化。
self.hidden = [self.init_hidden() for _ in range(self.length_search)]
# 对于每一层来说
for num_layer in range(self.max_layer):
# 先按顺序遍历self.search_space的每个元素,这个元素在这里被命名为(key, val)。然后,取出与i索引相对应的隐藏状态(h_t, c_t),计算LSTM层的输出。
for i, (key, val) in enumerate(self.search_space.items()):
h_t, c_t = self.hidden[i]
# lstm的输入是输入数据input和当前的隐藏状态 (h_t, c_t)。输出是新的隐藏状态和单元状态 (h_t, c_t),再赋值给与i索引对应的隐藏状态。
h_t, c_t = self.lstm[i](input, (h_t, c_t))
self.hidden[i] = (h_t, c_t)
# 新的隐藏状态h_t经过全连接层(self.fc[i])计算,得到output。这个output又被作为下一个LSTM层的输入。
output = self.fc[i](h_t)
# print(output)
input = output
# 全连接层的输出被添加到outputs字典的对应键key下。如果key在outputs中不存在,那么就创建一个新的键值对;否则,就在已有的键值对后添加新的输出。
if key not in outputs.keys():
outputs[key] = [output]
else:
outputs[key].extend([output])
# print(outputs)`
# for _ in range(self.length_search):
# h_t, c_t = self.hidden[i]
# h_t.detach_()
# c_t.detach_()
# self.hidden[i] = (h_t, c_t)
# 整理outputs,将每一个 key 对应的一系列 tensor 堆叠起来,并压缩掉所有不必要的维度,让每个 key 对应一个形状更加整洁、方便处理的 tensor。
for i, (key, val) in enumerate(outputs.items()):
outputs[key] = torch.stack(outputs[key]).squeeze(1)
return outputs
# 神经网络架构搜索部分,输出一个搜索完毕的DNN模型model
class NASModel(nn.Module):
def __init__(self, architectures, input_size, output_size):
super(NASModel, self).__init__()
self.architectures = architectures # 搜索空间的类别及其采取的动作
# {'hidden_units': tensor([32, 64, 64, 8], device='cuda:0'),
# 'activation': tensor([1, 2, 0, 1], device='cuda:0')}
self.length_layers = len(self.architectures['hidden_units']) # 隐藏层单元的长度,及隐藏层数量
self.output_size = output_size # 输入大小784,输出大小10
layers = []
# 对每个隐藏层而言,将每个隐藏层的单元个数和激活函数赋给每层
for layer in range(self.length_layers):
hidden_units = self.architectures['hidden_units'][layer].item()
activation = self.architectures['activation'][layer].item()
# print(activation)
if activation == 0:
activation = nn.ReLU()
elif activation == 1:
activation = nn.Tanh()
elif activation == 2:
activation = nn.Sigmoid()
# 对于第一层,输入为输入大小,输出为第一层隐藏单元数量
if layer == 0:
layers.append(nn.Linear(input_size, hidden_units))
layers.append(activation)
# 对于其他各层,输入为上一层隐藏单元数量,输出为当前隐藏层数量
else:
layers.append(nn.Linear(self.architectures['hidden_units'][layer - 1].item(),
hidden_units))
layers.append(activation)
# 最后一个线性层,输入为最后一层单元数量,输出为输出大小
layers.append(nn.Linear(self.architectures['hidden_units'][self.length_layers - 1].item(), self.output_size))
# 加上softmax层,Softmax函数是将多分类输出值转换为概率分布的函数,它可以将输出值范围映射到 [0, 1],并且约束输出值的和为1
layers.append(nn.Softmax(dim=1))
# print(layers)
# 用sequential将layers构建到一起
self.model = nn.Sequential(*layers)
def forward(self, x):
# 返回创建好的模型
return self.model(x)
def play_episode(controller):
# 定义字典,用来储存搜索的结果,隐藏单元数和激活函数种类
architecture = {} # 搜索空间中类别采用的动作,如{'hidden_units': tensor([64, 8, 8, 64]), 'activation': tensor([0, 0, 1, 1])}
# 每一个动作的概率都被计算并添加到episode_total_log_probs字典中。
episode_total_log_probs = {}
# {'hidden_units': tensor([[-2.5390, 0.0000, 0.0000, -3.0257]], grad_fn=<SumBackward1>),
# 'activation': tensor([[-2.1069, -2.2470, 0.0000]], grad_fn=<SumBackward1>)}
input = torch.tensor([[1.0, 2.0, 3.0]]).to(device)
# 控制器的输出,是一个字典,这个字典的每一个键值对表示搜索空间中的一个元素经过LSTM及全连接层处理后的输出序列,是一个概率。
# 通过输入数据input计算得到历次操作的概率分布
episode_logits = controller(input)
# episode_logits: {'hidden_units': tensor([[ 0.0253, -0.0500, -0.0533, -0.0923],
# [-0.0050, -0.0620, -0.0510, -0.0730],
# [-0.0128, -0.0644, -0.0466, -0.0669],
# [-0.0160, -0.0668, -0.0416, -0.0650]], grad_fn=<SqueezeBackward1>),
# 'activation': tensor([[-0.0373, 0.0469, -0.0961],
# [-0.0274, 0.0363, -0.0952],
# [-0.0214, 0.0308, -0.0930],
# [-0.0175, 0.0282, -0.0909]], grad_fn=<SqueezeBackward1>)}
# 对于搜索空间的每一个元素
# 隐藏单元的数量, 激活函数的类别
for key, space in search_space.items():
logits = episode_logits[key]
# (4,4)tensor,(4,3)tensor
# 从对应概率中通过采样取出一个动作,创建由probs或logits参数化的分类分布(但不能同时使用两者)。
action_index = Categorical(logits=logits).sample().unsqueeze(0)
# action_index: tensor([[0, 0, 3, 0]])
# 为每一个动作创建一个动作空间,其中的值为搜索空间的键对应的候选值
actions_space = torch.tensor([space] * controller.total_layer).to(device)
# actions_space: tensor([[ 8, 16, 32, 64],
# [ 8, 16, 32, 64],
# [ 8, 16, 32, 64]])
# 根据action_index从动作空间中取出对应的动作。第一次输出隐藏单元数量的action,第二次输出激活函数类别的tensor
action = torch.gather(actions_space, 1, action_index).to(device)
# action:循环的第一次输出为tensor([[ 8, 8, 64, 8]]),第二次为tensor([[0, 0, 0, 2]])
# 每一个动作都被添加到architecture字典
architecture[key] = action.squeeze(0)
# print(action_index.int().squeeze(1))
# 代码使用了one_hot函数和log_softmax函数来计算每个操作的one-hot向量和对数概率。
mask = one_hot(action_index, num_classes=len(space))
# [[1, 0, 0, 0],
# [1, 0, 0, 0],
# [0, 0, 0, 1],
# [1, 0, 0, 0]]
episode_log_probs = torch.sum(mask.float() * log_softmax(logits, dim=1), dim=1)
# episode_log_probs: tensor([[-4.1869, 0.0000, 0.0000, -1.3777]], grad_fn=<SumBackward1>)
# episode_log_probs: tensor([[-3.3254, 0.0000, -1.1439]], grad_fn=<SumBackward1>)
# 每一个动作的对数概率都被计算并添加到episode_total_log_probs字典中。这个结果的反向传播可以导致我们的策略朝向更高回报的方向移动。
episode_total_log_probs[key] = episode_log_probs
# {'hidden_units': tensor([[-4.1869, 0.0000, 0.0000, -1.3777]], grad_fn=<SumBackward1>),
# 'activation': tensor([[-3.3254, 0.0000, -1.1439]], grad_fn=<SumBackward1>)}
# 生成一个DNN网络
model = NASModel(architecture, 784, 10).to(device)
print(f'{model}\n')
# 定义损失函数CrossEntropyLoss
criterion = nn.CrossEntropyLoss()
# 使用optim时,必须构造一个优化器对象,它将保存当前状态,并将根据计算的梯度更新参数。
# 要构造一个Optimizer,你必须给它一个可迭代对象,其中包含要优化的参数(都应该是变量s)。然后,您可以指定特定于优化器的选项,如学习率、权重衰减等。
# 定义优化器SGD
optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)
# 训练DNN, 10个回合
for epoch in range(10):
model.train()
running_loss = 0.0
for i, data in enumerate(train_loader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# 在每次更新权重前,先清零所有被优化变量(通常是模型的参数)的梯度
optimizer.zero_grad()
# forward + backward + optimize
inputs = inputs.view(-1, 784)
outputs = model(inputs)
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 用优化器优化参数
# print statistics
running_loss += loss.item()
running_loss /= len(train_loader)
print(f"Epoch {epoch + 1}: Loss = {running_loss}")
model.eval() # 设置模式为评估,开始测试步骤
# 以测试集上的损失或者正确率来判断模型是否训练的好
# 验证集与测试集不一样的,验证集是在训练中用的,反正模型过拟合,测试集是在模型完全训练好后使用的
# 验证集用来调整超参数,相当于真题,测试集是考试
correct = 0 # 验证集准确率
total = 0 # 验证集数量
# 只需要进行测试,不需要对梯度进行调整,所以设置下面这行
with torch.no_grad():
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = model(images.view(-1, 784))
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
print('Accuracy of the network on the 10000 test images: {}'.format(acc))
# compute the reward,在当前策略下,采取某一动作后,环境返回的回报或者奖励。
reward = acc
# 这一行是将变量 reward 转换为一个 PyTorch 的张量,并且确保它在正确的设备上(比如 CPU 或者 GPU)。
# detach 方法用于将该张量从计算图中分离出来,这样在之后的运算中不会对它求导数。
reward = torch.tensor(reward, device=device).detach()
sum_weighted_log_probs = {}
sum_weighted_log_probs['hidden_units'] = torch.sum(-episode_total_log_probs['hidden_units'] * reward).unsqueeze(0)
sum_weighted_log_probs['activation'] = torch.sum(-episode_total_log_probs['activation'] * reward).unsqueeze(0)
# 预测的准确率越高,这个值越高
sum_weighted_loss = sum_weighted_log_probs['hidden_units'] + \
sum_weighted_log_probs['activation']
return sum_weighted_loss, episode_total_log_probs, reward
# 设置数据转换函数
# Compose()将多个transforms合并,参数是由多个transform对象组合成的列表
# ToTensor()转化为tensor格式图片。输入参数,自动调用__call__方法,把图片变成tensor格式
# Normalize()实例化,输入了均值mean和方差std两个参数,``output[channel] = (input[channel] - mean[channel]) / std[channel]``
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))]
)
# 准备数据集
# 参数train为TRUE则返回训练集,为FALSE则返回测试集,download设置为TRUE则自动从网上下载
train_set = torchvision.datasets.MNIST(root='./data', train=True,
download=True, transform=transform)
test_set = torchvision.datasets.MNIST(root='./data', train=False,
download=True, transform=transform)
# 用dataloader加载数据集
# 参数:dataset即为数据集;batch_size为多少个为一组;shuffle为TRUE时每次加载顺序不同,为FALSE时每次加载数据集的顺序相同
# drop_last默认为false,为TRUE时最后一组数据不满batch_size时舍去,为FALSE时不舍
# num_workers使用多少个子进程来加载数据。 0表示数据将在主进程中加载。 (默认值:0)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=64, shuffle=False)
# 0: nn.ReLU, 1: nn.Tanh, 2: nn.Sigmoid
search_space = {
"hidden_units": [8, 16, 32, 64],
"activation": [0, 1, 2]
}
device = 'cuda' if torch.cuda.is_available() else 'cpu'
controller = Controller(search_space, max_layer=4, device=device)
optimizer = optim.Adam(controller.parameters(), lr=0.001)
total_rewards = []
controller.train()
for epoch in range(10):
optimizer.zero_grad()
# 返回一个用标量值0填充的张量,其形状由变量参数size定义。
epoch_log_probs = torch.empty((0,), device=device)
# 这里这个循环好像没什么用,因为每次训练DNN都是从0开始训练的,这里没有什么梯度累计之类的。
for i in range(3):
(sum_weighted_loss, episode_total_log_probs,
reward) = play_episode(controller)
print(sum_weighted_loss)
# tensor([677.8463], device='cuda:0', grad_fn=<AddBackward0>)
print(episode_total_log_probs)
# episode_log_probs: tensor([[-4.1869, 0.0000, 0.0000, -1.3777]], grad_fn=<SumBackward1>)
# episode_log_probs: tensor([[-3.3254, 0.0000, -1.1439]], grad_fn=<SumBackward1>)
# 在给定维数中连接给定序列的seq张量。所有张量要么具有相同的形状(连接维度除外),要么为空, 第二个参数指定拼接的维数,0表示第一维,1表示在第二维上拼接。
epoch_log_probs = torch.cat((epoch_log_probs, sum_weighted_loss))
loss = torch.mean(epoch_log_probs) # 返回平均值
loss.backward()
optimizer.step()
# for name, param in controller.named_parameters():
# print(name, param.grad)
print(f"Loss in {epoch} is: {loss}")