基于 PyTorch 的树叶分类任务:从数据准备到模型训练与测试
1. 引言
在计算机视觉领域,图像分类是一个经典的任务。本文将详细介绍如何使用 PyTorch 实现一个树叶分类任务。我们将从数据准备开始,逐步构建模型、训练模型,并在测试集上进行预测,最终生成提交文件。
2. 环境准备
首先,确保已安装以下 Python 库:
pip install torch torchvision pandas d2l
torch
:PyTorch 核心库。torchvision
:提供计算机视觉相关的工具。pandas
:用于处理 CSV 文件。d2l
:深度学习工具库,提供辅助函数。
3. 数据准备
竞赛链接:https://www.kaggle.com/competitions/classify-leaves/leaderboard?tab=public
3.1 数据集结构
假设数据集位于 classify-leaves
目录下,包含以下文件:
classify-leaves/
├── train.csv
├── test.csv
├── images/
├── image1.jpg
├── image2.jpg
...
train.csv
:包含训练图像的路径和标签。test.csv
:包含测试图像的路径。
3.2 数据加载与预处理
import os
import pandas as pd
import random
imgpath = "classify-leaves"
trainlist = pd.read_csv(f"{imgpath}/train.csv")
num2name = list(trainlist["label"].value_counts().index)
random.shuffle(num2name)
name2num = {}
for i in range(len(num2name)):
name2num[num2name[i]] = i
num2name
:获取所有类别标签,并按类别数量排序。name2num
:将类别名称映射到数字编号。
4. 自定义数据集类
为了加载数据,我们需要定义一个自定义数据集类 Leaf_data
:
from torch.utils.data import Dataset
from d2l import torch as d2l
class Leaf_data(Dataset):
def __init__(self, path, train, transform=lambda x: x):
super().__init__()
self.path = path
self.transform = transform
self.train = train
if train:
self.datalist = pd.read_csv(f"{path}/train.csv")
else:
self.datalist = pd.read_csv(f"{path}/test.csv")
def __getitem__(self, index):
res = ()
tmplist = self.datalist.iloc[index, :]
for i in tmplist.index:
if i == "image":
res += (self.transform(d2l.Image.open(f"{self.path}/{tmplist[i]}")),)
else:
res += (name2num[tmplist[i]],)
if len(res) < 2:
res += (tmplist[i],)
return res
def __len__(self):
return len(self.datalist)
__getitem__
:根据索引返回一个样本,包括图像和标签。__len__
:返回数据集的长度。
5. 模型定义与初始化
我们使用预训练的 ResNet34 模型,并修改最后一层以适应分类任务:
import torch
import torchvision
from torch import nn
def init_weight(m):
if type(m) in [nn.Linear, nn.Conv2d]:
nn.init.xavier_normal_(m.weight)
net = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.IMAGENET1K_V1)
net.fc = nn.Linear(in_features=512, out_features=len(name2num), bias=True)
net.fc.apply(init_weight)
net.to(try_gpu())
init_weight
:使用 Xavier 初始化方法初始化全连接层的权重。net
:加载预训练的 ResNet34 模型,并修改最后一层全连接层。
6. 训练过程
6.1 优化器与损失函数
lr = 1e-4
parames = [parame for name, parame in net.named_parameters() if name not in ["fc.weight", "fc.bias"]]
trainer = torch.optim.Adam([{"params": parames}, {"params": net.fc.parameters(), "lr": lr * 10}], lr=lr)
LR_con = torch.optim.lr_scheduler.CosineAnnealingLR(trainer, 1, 0)
loss = nn.CrossEntropyLoss(reduction='none')
trainer
:使用 Adam 优化器,对全连接层使用更高的学习率。LR_con
:使用余弦退火学习率调度器。loss
:使用交叉熵损失函数。
6.2 训练函数
def train_batch(features, labels, net, loss, trainer, device):
# 将数据移动到指定设备(如 GPU)
features, labels = features.to(device), labels.to(device)
# 前向传播
outputs = net(features)
l = loss(outputs, labels).mean() # 计算损失
# 反向传播和优化
trainer.zero_grad() # 梯度清零
l.backward() # 反向传播
trainer.step() # 更新参数
# 计算准确率
acc = (outputs.argmax(dim=1) == labels).float().mean()
return l.item(), acc.item()
def train(train_data, test_data, net, loss, trainer, num_epochs, device=try_gpu()):
best_acc = 0
timer = d2l.Timer()
plot = d2l.Animator(xlabel="epoch", xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test loss'], ylim=[0, 1])
for epoch in range(num_epochs):
metric = d2l.Accumulator(4)
for i, (features, labels) in enumerate(train_data):
timer.start()
l, acc = train_batch(features, labels, net, loss, trainer, device)
metric.add(l, acc, labels.shape[0], labels.numel())
timer.stop()
test_acc = d2l.evaluate_accuracy_gpu(net, test_data, device=device)
if test_acc > best_acc:
save_model(net)
best_acc = test_acc
plot.add(epoch + 1, (metric[0] / metric[2], metric[1] / metric[3], test_acc))
print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}')
print(f"best acc {best_acc}")
return metric[0] / metric[2], metric[1] / metric[3], test_acc
train
:训练模型,记录损失和准确率,并在验证集上评估模型。
7. 测试与结果保存
在测试集上进行预测,并保存结果到 CSV 文件:
net.load_state_dict(torch.load(model_path))
augs = torchvision.transforms.Compose([
torchvision.transforms.Resize(224),
torchvision.transforms.ToTensor(), norm
])
test_data = Leaf_data(imgpath, False, augs)
test_dataloader = Data.DataLoader(test_data, batch_size=64, shuffle=False)
res = pd.DataFrame(columns=["image", "label"], index=range(len(test_data)))
net = net.cpu()
count = 0
for X, y in test_dataloader:
preds = net(X).detach().argmax(dim=-1).numpy()
preds = pd.DataFrame(y, index=map(lambda x: num2name[x], preds))
preds.loc[:, 1] = preds.index
preds.index = range(count, count + len(y))
res.iloc[preds.index] = preds
count += len(y)
print(f"loaded {count}/{len(test_data)} datas")
res.to_csv('./submission.csv', index=False)
test_dataloader
:加载测试数据。res
:保存预测结果到 CSV 文件。
8. 总结
本文详细介绍了如何使用 PyTorch 实现一个树叶分类任务,包括数据准备、模型定义、训练、验证和测试。通过本文,您可以掌握以下技能:
- 自定义数据集类的实现。
- 使用预训练模型进行迁移学习。
- 训练模型并保存最佳模型。
- 在测试集上进行预测并生成提交文件。
希望本文对您有所帮助!如果有任何问题,欢迎在评论区留言讨论。😊
完整代码
import os
import torch
from torch.utils import data as Data
import torchvision
from torch import nn
from d2l import torch as d2l
import pandas as pd
import random
# 数据准备
imgpath = "classify-leaves"
trainlist = pd.read_csv(f"{imgpath}/train.csv")
num2name = list(trainlist["label"].value_counts().index)
random.shuffle(num2name)
name2num = {}
for i in range(len(num2name)):
name2num[num2name[i]] = i
# GPU 检查
def try_gpu():
if torch.cuda.device_count() > 0:
return torch.device('cuda')
return torch.device('cpu')
# 模型保存路径
model_dir = './models'
if not os.path.exists(model_dir):
os.makedirs(model_dir)
model_path = os.path.join(model_dir, 'pre_res_model.ckpt')
def save_model(net):
torch.save(net.state_dict(), model_path)
# 自定义数据集类
class Leaf_data(Data.Dataset):
def __init__(self, path, train, transform=lambda x: x):
super().__init__()
self.path = path
self.transform = transform
self.train = train
if train:
self.datalist = pd.read_csv(f"{path}/train.csv")
else:
self.datalist = pd.read_csv(f"{path}/test.csv")
def __getitem__(self, index):
res = ()
tmplist = self.datalist.iloc[index, :]
for i in tmplist.index:
if i == "image":
res += (self.transform(d2l.Image.open(f"{self.path}/{tmplist[i]}")),)
else:
res += (name2num[tmplist[i]],)
if len(res) < 2:
res += (tmplist[i],)
return res
def __len__(self):
return len(self.datalist)
def train_batch(features, labels, net, loss, trainer, device):
# 将数据移动到指定设备(如 GPU)
features, labels = features.to(device), labels.to(device)
# 前向传播
outputs = net(features)
l = loss(outputs, labels).mean() # 计算损失
# 反向传播和优化
trainer.zero_grad() # 梯度清零
l.backward() # 反向传播
trainer.step() # 更新参数
# 计算准确率
acc = (outputs.argmax(dim=1) == labels).float().mean()
return l.item(), acc.item()
# 训练函数
def train(train_data, test_data, net, loss, trainer, num_epochs, device=try_gpu()):
best_acc = 0
timer = d2l.Timer()
plot = d2l.Animator(xlabel="epoch", xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test loss'], ylim=[0, 1])
for epoch in range(num_epochs):
metric = d2l.Accumulator(4)
for i, (features, labels) in enumerate(train_data):
timer.start()
l, acc = train_batch(features, labels, net, loss, trainer, device)
metric.add(l, acc, labels.shape[0], labels.numel())
timer.stop()
test_acc = d2l.evaluate_accuracy_gpu(net, test_data, device=device)
if test_acc > best_acc:
save_model(net)
best_acc = test_acc
plot.add(epoch + 1, (metric[0] / metric[2], metric[1] / metric[3], test_acc))
print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}')
print(f"best acc {best_acc}")
return metric[0] / metric[2], metric[1] / metric[3], test_acc
# 模型初始化
def init_weight(m):
if type(m) in [nn.Linear, nn.Conv2d]:
nn.init.xavier_normal_(m.weight)
net = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.IMAGENET1K_V1)
net.fc = nn.Linear(in_features=512, out_features=len(name2num), bias=True)
net.fc.apply(init_weight)
net.to(try_gpu())
# 优化器和损失函数
lr = 1e-4
parames = [parame for name, parame in net.named_parameters() if name not in ["fc.weight", "fc.bias"]]
trainer = torch.optim.Adam([{"params": parames}, {"params": net.fc.parameters(), "lr": lr * 10}], lr=lr)
LR_con = torch.optim.lr_scheduler.CosineAnnealingLR(trainer, 1, 0)
loss = nn.CrossEntropyLoss(reduction='none')
# 数据增强和数据加载
batch = 64
num_epochs = 10
norm = torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
augs = torchvision.transforms.Compose([
torchvision.transforms.Resize(224),
torchvision.transforms.RandomHorizontalFlip(p=0.5),
torchvision.transforms.ToTensor(), norm
])
train_data, valid_data = Data.random_split(
dataset=Leaf_data(imgpath, True, augs),
lengths=[0.8, 0.2]
)
train_dataloder = Data.DataLoader(train_data, batch, True)
valid_dataloder = Data.DataLoader(valid_data, batch, True)
# 训练模型
train(train_dataloder, valid_dataloder, net, loss, trainer, num_epochs)
# 测试模型
net.load_state_dict(torch.load(model_path))
augs = torchvision.transforms.Compose([
torchvision.transforms.Resize(224),
torchvision.transforms.ToTensor(), norm
])
test_data = Leaf_data(imgpath, False, augs)
test_dataloader = Data.DataLoader(test_data, batch_size=64, shuffle=False)
res = pd.DataFrame(columns=["image", "label"], index=range(len(test_data)))
net = net.cpu()
count = 0
for X, y in test_dataloader:
preds = net(X).detach().argmax(dim=-1).numpy()
preds = pd.DataFrame(y, index=map(lambda x: num2name[x], preds))
preds.loc[:, 1] = preds.index
preds.index = range(count, count + len(y))
res.iloc[preds.index] = preds
count += len(y)
print(f"loaded {count}/{len(test_data)} datas")
res.to_csv('./submission.csv', index=False)
参考链接:
- PyTorch 官方文档
- torchvision 官方文档
- d2l 深度学习工具库