目录
- 学习目标
- 学习内容
- 联邦学习基础:why, what, how
- why?
- what?
- how?
- 联邦学习的例子——CIFAR-10数据集(分类问题)
- 1、import libararies
- 2、hyper-parameters
- 3、加载并且划分数据
- 4、创建神经网络模型
- 5、helper functions
- (1)client_update
- (2)server_aggregate
- (3)test
- 6、实例化模型
- 7、训练模型
- 7、整体流程
- 查看不同类型数据的方法
- 1、dataframe
- 2、ndarray
- 3、Dictionary
- 4、list
- 5、tuple
学习目标
- 完成集中学习的代码部分
- 对联邦学习进行了解
- 对学习过程中遇到的问题进行总结
学习内容
联邦学习基础:why, what, how
why?
深度学习对于数据的需求是贪得无厌的(insatiable),越多的数据训练的效果越好。ALphaGo学习了大约30万场的比赛模式才在2016年打败了人类玩家。如果能够不受限制的访问几大洲的所有医院数十亿的医疗记录,那么预测各类疾病的概率将会非常的精确。但是有数据保护法的管控,使用超级大量的数据来进行训练模型是不可能的。
高质量的数据像是一个个孤岛存储在世界各地的边缘设备上。在不违反隐私法的前提下把他们整合到一起得到他们的预测能力是非常困难的(herculean)的任务。联邦学习就是解决这一困境的!
what?
联邦学习提供了一个聪明的方式,连接机器学习模型和能够有效训练模型所需的数据。
联邦学习工作可以比喻成:殖民地(colonies)和领土(territories)是如何组成共和国(republic)或联邦(federation)的。
分布的边缘设备使用自己的数据训练自己的local model,然后组合在一起创造一个global model(听起来像是分布式学习💦),联邦学习就是分布学习的一种形式,但是它和传统的HPC(high performance computing)不同,HPC的目的是减少训练的时间,因为你也知道经历45天的训练,想要记得上一次调整的超参数是多么困难😰。但是FL的目标是无论数据在哪里,都要获取数据,并将其用于模型训练。在HPC中,训练数据首先被收集在一起并随机化,然后作为碎片跨多个计算节点共享。这些过程产生了独立且同分布(IID)的数据,从而提高了随机梯度下降的性能。但是FL学习是不能生成IID数据的,FL数据大多是非IID的,并且系统必须具有能够承受这种现象的架构。
how?
传统的FL学习结构由中心的管理员(curator)或者服务器(server)协调训练的。客户端(clients)大多数是边缘设备,数量可能多达几百万,这些设备在每次训练的过程中至少与服务器通信两次。
- 首先,客户端都从服务器接收当前的全局模型权重(global model weights)
- 然后,在每个本地数据上训练它以生成更新的参数
- 将这些参数上传到服务器进行聚合(aggregation)
联邦学习的例子——CIFAR-10数据集(分类问题)
1、import libararies
###############################
##### importing libraries #####
###############################
import os
import random
from tqdm import tqdm
import numpy as np
import torch, torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data.dataset import Dataset
torch.backends.cudnn.benchmark=True
2、hyper-parameters
##### Hyperparameters for federated learning #########
num_clients = 20
num_selected = 6
num_rounds = 150
epochs = 5
batch_size = 32
- num_clients: 客户端的数量。将全部数据平均分给每个client
- num_selected: 在num_clients中随机选择num-clients个客户端进行训练(每个communication round)。通常是30%
- num_rounds: 需要运行的communication 轮数。在每一个communication round中,从num_clients中随机抽出num_selected个客户端进行原理,然后聚合各自的模型参数成为一个global model
- epoch: 每一个被选择的客户端需要训练的轮数
- batch_size: 批量的加载数据
3、加载并且划分数据
本教程使用CIFAR10数据集。它由10个类别的6万张32x32像素的彩色图像组成。有5万张训练图像和1万张测试图像。在训练批次中,每个班级有5000张图像,总共有50000张。在PyTorch中,CIFAR 10可以在torchvision模块的帮助下使用。
在本教程中,图像被平均地划分为客户机,因此表示平衡(IID)情况。
- 加载图像,并对图像进行预处理
# Image augmentation
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# Loading CIFAR10 using torchvision.datasets
traindata = datasets.CIFAR10('./data', train=True, download=True,
transform= transform_train)
- 将训练数据分给num_clients个客户端
# Dividing the training data into num_clients, with each client having equal number of images
traindata_split = torch.utils.data.random_split(traindata, [int(traindata.data.shape[0] / num_clients) for _ in range(num_clients)])
- 将训练样本转化成深度学习的格式
# Creating a pytorch loader for a Deep Learning model
train_loader = [torch.utils.data.DataLoader(x, batch_size=batch_size, shuffle=True) for x in traindata_split]
- 对测试集进行预处理以及转成深度学习格式
# Normalizing the test images
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# Loading the test iamges and thus converting them into a test_loader
test_loader = torch.utils.data.DataLoader(
datasets.CIFAR10('./data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))])
), batch_size=batch_size, shuffle=True)
4、创建神经网络模型
VGG19(16个卷积层,3个完全连接层,5个MaxPool层和1个SoftMax层)在本教程中使用。还有VGG11、VGG13和VGG16等VGG的其他变体。
#################################
##### Neural Network model #####
#################################
cfg = {
'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}
class VGG(nn.Module):
def __init__(self, vgg_name):
super(VGG, self).__init__()
self.features = self._make_layers(cfg[vgg_name])
self.classifier = nn.Sequential(
nn.Linear(512, 512),
nn.ReLU(True),
nn.Linear(512, 512),
nn.ReLU(True),
nn.Linear(512, 10)
)
def forward(self, x):
out = self.features(x)
out = out.view(out.size(0), -1)
out = self.classifier(out)
output = F.log_softmax(out, dim=1)
return output
def _make_layers(self, cfg):
layers = []
in_channels = 3
for x in cfg:
if x == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
layers += [nn.Conv2d(in_channels, x, kernel_size=3, padding=1),
nn.BatchNorm2d(x),
nn.ReLU(inplace=True)]
in_channels = x
layers += [nn.AvgPool2d(kernel_size=1, stride=1)]
return nn.Sequential(*layers)
我们定义了一个名为VGG的类,它继承了nn.Module。这个类有两个主要方法:__init__和forward。__init__方法用于初始化网络结构,包括定义卷积层和全连接层。forward方法用于前向传播输入数据通过网络,并返回输出结果。
在__init__方法中,我们首先调用父类的__init__方法,然后定义了一个名为features的成员变量,它包含了VGG网络的卷积层。接下来,我们定义了一个名为classifier的成员变量,它包含了VGG网络的全连接层。最后,我们在forward方法中定义了如何处理输入数据,并返回输出结果。
_make_layers方法用于构建VGG网络的卷积层。它首先定义了一个名为layers的空列表,用于存储网络结构中的层。然后,我们遍历cfg列表,其中cfg是一个包含VGG网络结构配置的列表。如果x等于’M’,表示这是一个最大池化层,我们添加一个nn.MaxPool2d层;否则,表示这是一个卷积层,我们添加一个nn.Conv2d层、一个nn.BatchNorm2d层和一个nn.ReLU激活层。最后,我们添加一个平均池化层,并返回nn.Sequential(*layers),即网络结构中的所有层。
5、helper functions
(1)client_update
client_update函数使用privent client data训练client模型。这是在num_selected clients中进行的本地训练
def client_update(client_model, optimizer, train_loader, epoch=5):
"""
This function updates/trains client model on client data
"""
model.train()
for e in range(epoch):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = client_model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
return loss.item()
- 首先,我们使用client_model.train()将客户端模型设置为训练模式。
- 然后,我们使用一个for循环遍历训练轮数。在每一轮中,我们使用另一个for循环遍历训练数据加载器中的数据。
- 对于每个数据batch,我们将数据和目标标签从CPU转移到GPU上,并使用optimizer.zero_grad()将梯度清零。
- 接下来,我们使用客户端模型对数据进行前向传播,并计算损失。output = client_model(data)
- 然后,我们使用loss.backward()计算梯度。
- 最后,我们使用optimizer.step()更新客户端模型的参数。
(2)server_aggregate
server_aggregate函数聚合从每个客户机接收到的模型权重,并用更新后的权重更新全局模型。在本教程中,采用权重的平均值并将其聚合为全局权重。
def server_aggregate(global_model, client_models):
"""
This function has aggregation method 'mean'
"""
### This will take simple mean of the weights of models ###
global_dict = global_model.state_dict()
for k in global_dict.keys():
global_dict[k] = torch.stack([client_models[i].state_dict()[k].float() for i in range(len(client_models))], 0).mean(0)
global_model.load_state_dict(global_dict)
for model in client_models:
model.load_state_dict(global_model.state_dict())
- 首先,我们使用global_model.state_dict()获取全局模型的参数字典。
- 然后,我们使用一个for循环遍历全局模型的参数字典中的每个键(参数名称)。
- 对于每个参数,我们使用torch.stack()将所有客户端模型的相应参数堆叠在一起,并使用float()将其转换为浮点数类型。
- 接下来,我们使用mean()函数计算参数的平均值。
- 最后,我们使用global_model.load_state_dict()将计算出的平均值加载到全局模型的参数字典中。
- 接下来,我们使用另一个for循环遍历客户端模型列表,并使用model.load_state_dict()将全局模型的参数字典加载到每个客户端模型中,以实现全局模型在每个客户端模型的平均值。
(3)test
test函数输入global模型和test loader,返回test loss和accuracy
def test(global_model, test_loader):
"""This function test the global model on test data and returns test loss and test accuracy """
global_model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.cuda(), target.cuda()
output = global_model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
acc = correct / len(test_loader.dataset)
return test_loss, acc
- 首先,我们使用global_model.eval()将全局模型设置为评估模式,以便在测试过程中关闭dropout等正则化技术。
- 然后,我们初始化测试损失为0,正确分类的样本数为0
- 使用torch.no_grad()上下文,用于在代码块中禁用梯度计算。
- 我们遍历test_loader中的每个数据样本。
- 对于每个数据样本data,target,我们将数据和目标标签从CPU加载到GPU上
- 并使用全局模型对其进行前向传播。output = global_model(data)
- 我们使用F.nll_loss()计算输出与目标标签之间的交叉熵损失,并使用reduction='sum’将其转换为单个数值。
- 接下来,我们使用test_loss += …将每个样本的损失累加到测试损失中。
- 最后,我们使用pred = output.argmax(dim=1, keepdim=True)计算输出中最大概率的索引,并将其与目标标签进行比较。
- 使用pred.eq(target.view_as(pred))比较预测索引和实际索引,并将它们转换为布尔值。
- 使用sum().item()计算布尔值的总和,并将其除以测试数据加载器中样本的数量以获得测试准确性。
- 最后,我们返回测试损失和测试准确性。
6、实例化模型
############################################
#### Initializing models and optimizer ####
############################################
#### global model ##########
global_model = VGG('VGG19').cuda()
############## client models ##############
client_models = [ VGG('VGG19').cuda() for _ in range(num_selected)]
for model in client_models:
model.load_state_dict(global_model.state_dict()) ### initial synchronizing with global model
############### optimizers ################
opt = [optim.SGD(model.parameters(), lr=0.1) for model in client_models]
7、训练模型
###### List containing info about learning #########
losses_train = []
losses_test = []
acc_train = []
acc_test = []
# Runnining FL
for r in range(num_rounds):
# select random clients
client_idx = np.random.permutation(num_clients)[:num_selected]
# client update
loss = 0
for i in tqdm(range(num_selected)):
loss += client_update(client_models[i], opt[i], train_loader[client_idx[i]], epoch=epochs)
losses_train.append(loss)
# server aggregate
server_aggregate(global_model, client_models)
test_loss, acc = test(global_model, test_loader)
losses_test.append(test_loss)
acc_test.append(acc)
print('%d-th round' % r)
print('average train loss %0.3g | test loss %0.3g | test acc: %0.3f' % (loss / num_selected, test_loss, acc))
7、整体流程
查看不同类型数据的方法
首先要查看变量的数据类型:type(object)
1、dataframe
使用万能函数
def basic_eda(df):
print("-------------------------------TOP 5 RECORDS-----------------------------")
print(df.head(5))
print("-------------------------------INFO--------------------------------------")
print(df.info())
print("-------------------------------Describe----------------------------------")
print(df.describe())
print("-------------------------------Columns-----------------------------------")
print(df.columns)
print("-------------------------------Data Types--------------------------------")
print(df.dtypes)
print("----------------------------Missing Values-------------------------------")
print(df.isnull().sum())
print("----------------------------NULL values----------------------------------")
print(df.isna().sum())
print("--------------------------Shape Of Data---------------------------------")
print(df.shape)
print("============================================================================ \n")
- df.head():查看前几行数据,默认是5
- df.info:打印dataframe的简要摘要,包括索引的数据类型dtype和列的数据类型dtype,非空值的数量和内存使用情况。
- df.describe:describe()函数用于生成描述性统计信息。 描述性统计数据:数值类型的包括均值,标准差,最大值,最小值,分位数等;类别的包括个数,类别的数目,最高数量的类别及出现次数等;输出将根据提供的内容而有所不同
- df.colunms:查看列
- df.dtypes:查看元素的数据类型
- df.shape:查看dataframe的形状
2、ndarray
- ndarray.type:查看元素类型
- ndarray.shape:查看数组的形状
- ndarray.ndim:查看数组维度
- ndarry.size:查看数组的全部元素个数
- len(ndarray):计算的是数组的行数,相当于ndarray.shape[0]
3、Dictionary
- dict.keys():返回字典全部的key
- dict.size❌‘dict’ object has no attribute ‘size’
- numpy.size(dict)❌无法获得字典大小
- len(dict):返回字典key-value对的个数
4、list
- list.size❌‘list’ object has no attribute ‘size’
- numpy.size(list):查看列表全部元素的个数
- len(list):同numpy.size(list)一样
5、tuple
- tuple.size❌‘tuple’ object has no attribute ‘size’
- numpy.size(tuple):查看元组全部元素的个数
- len(tuple):同numpy.size(tuple)一样
okkksleeeeep!