🌞欢迎来到机器学习的世界
🌈博客主页:卿云阁💌欢迎关注🎉点赞👍收藏⭐️留言📝
🌟本文由卿云阁原创!
📆首发时间:🌹2024年2月17日🌹
✉️希望可以和大家一起完成进阶之路!
🙏作者水平很有限,如果发现错误,请留言轰炸哦!万分感谢!
目录
内容
数据集
提取-百度网盘
数据集介绍
LeNet
Alexnet
Vgg
Resnet
GoogleNet
项目实战---制作UI页面
res.py
UI.py
实现方式二
res.py
UI.py
内容
本文主要介绍几种经典的卷积神经网络,并分别实现对水质的检测,最后我们设计了一个UI页面。
数据集
提取-百度网盘
链接:https://pan.baidu.com/s/1DSDl5uKF0qaoyVs3f-L7iQ?pwd=wy46
提取码:wy46数据集介绍
LeNet
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os class LeNet(torch.nn.Module): def __init__(self): super(LeNet, self).__init__() self.convs = nn.Sequential( nn.Conv2d(3, 16, kernel_size=5), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Conv2d(16, 32, kernel_size=5), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Conv2d(32, 64, kernel_size=4), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.ReLU(), ) self.fc = nn.Sequential( nn.Dropout(0.5), nn.Flatten(), nn.Linear(64, 100), nn.ReLU(), nn.Linear(100, 60), nn.ReLU(), nn.Linear(60, 2) ) def forward(self, x): x = self.convs(x) x = self.fc(x) return x # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((32, 32)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化LeNet模型 model = LeNet() # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}')
Alexnet
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os class AlexNet(nn.Module): def __init__(self, num_classes=2): super(AlexNet, self).__init__() self.features = nn.Sequential( nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), ) self.avgpool = nn.AdaptiveAvgPool2d((6, 6)) self.classifier = nn.Sequential( nn.Dropout(), nn.Linear(256 * 6 * 6, 4096), nn.ReLU(inplace=True), nn.Dropout(), nn.Linear(4096, 4096), nn.ReLU(inplace=True), nn.Linear(4096, num_classes), ) def forward(self, x): x = self.features(x) x = self.avgpool(x) x = x.view(x.size(0), 256 * 6 * 6) x = self.classifier(x) return x # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化LeNet模型 model = AlexNet() # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}')
Vgg
- 更小的卷积核:3*3大小
- 网络更深:16层-19层
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os # Define the CNN_model class VGG16(torch.nn.Module): def __init__(self): super(VGG16, self).__init__() self.block1 = nn.Sequential( nn.Conv2d(3, 64, 3, 1, 1), nn.ReLU(), nn.Conv2d(64, 64, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.block2 = nn.Sequential( nn.Conv2d(64, 128, 3, 1, 1), nn.ReLU(), nn.Conv2d(128, 128, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.block3 = nn.Sequential( nn.Conv2d(128, 256, 3, 1, 1), nn.ReLU(), nn.Conv2d(256, 256, 3, 1, 1), nn.ReLU(), nn.Conv2d(256, 256, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.block4 = nn.Sequential( nn.Conv2d(256, 512, 3, 1, 1), nn.ReLU(), nn.Conv2d(512, 512, 3, 1, 1), nn.ReLU(), nn.Conv2d(512, 512, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.block5 = nn.Sequential( nn.Conv2d(512, 512, 3, 1, 1), nn.ReLU(), nn.Conv2d(512, 512, 3, 1, 1), nn.ReLU(), nn.Conv2d(512, 512, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.fnn = nn.Sequential( nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 256), nn.ReLU(), nn.Linear(256, 2) ) def forward(self, x): batch_size = x.size(0) x = self.block1(x) x = self.block2(x) x = self.block3(x) x = self.block4(x) x = self.block5(x) x = x.view(batch_size, -1) x = self.fnn(x) return x # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((32, 32)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化LeNet模型 model = VGG16() # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}')
Resnet
原始网络结构
原始论文中的两层残差块叫做Shortcut connection,用于最原始的ResNet18和ResNet34。
Rest50,其残差块有三层,称之为Bottleneck,中间一层为3*3的Kernal,而两侧是1*1的Kernal,1*1的Kernal主要是用来改变channels大小(在RestNet中是将channels乘4)。
几个Bottleneck组合成一个Block,在ReNet50中一共有4个Block,各个Block含有的Bottleneck的数量分别为3,4,3,6在每个Block的第一个Bottleneck的输入和输出会组合成Shortcut connection,即残差就是体现在这里。
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os # 定义简化版的ResNet网络 class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion*planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion*planes) ) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = nn.ReLU()(out) return out class ResNet(nn.Module): def __init__(self, block, num_blocks, num_classes=2): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512*block.expansion, num_classes) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1]*(num_blocks-1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = nn.AdaptiveAvgPool2d((1, 1))(out) out = out.view(out.size(0), -1) out = self.linear(out) return out # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化ResNet模型 model = ResNet(BasicBlock, [2, 2, 2, 2]) # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}')
GoogleNet
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os class BasicConv2d(nn.Module): def __init__(self, in_channels, out_channels, **kwargs): super(BasicConv2d, self).__init__() self.conv = nn.Conv2d(in_channels, out_channels, **kwargs) self.relu = nn.ReLU(inplace=True) def forward(self, x): x = self.conv(x) x = self.relu(x) return x class Inception(nn.Module): def __init__(self, in_channels, ch1x1, ch3x3red, ch3x3, ch5x5red, ch5x5, pool_proj): super(Inception, self).__init__() self.branch1 = BasicConv2d(in_channels, ch1x1, kernel_size=1) self.branch2 = nn.Sequential( BasicConv2d(in_channels, ch3x3red, kernel_size=1), BasicConv2d(ch3x3red, ch3x3, kernel_size=3, padding=1) ) self.branch3 = nn.Sequential( BasicConv2d(in_channels, ch5x5red, kernel_size=1), BasicConv2d(ch5x5red, ch5x5, kernel_size=5, padding=2) ) self.branch4 = nn.Sequential( nn.MaxPool2d(kernel_size=3, stride=1, padding=1), BasicConv2d(in_channels, pool_proj, kernel_size=1) ) def forward(self, x): branch1 = self.branch1(x) branch2 = self.branch2(x) branch3 = self.branch3(x) branch4 = self.branch4(x) outputs = [branch1, branch2, branch3, branch4] return torch.cat(outputs, 1) class GoogleNet(nn.Module): def __init__(self): super(GoogleNet, self).__init__() self.inputs = nn.Sequential(nn.Conv2d(in_channels=3, out_channels=64, kernel_size=17), nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1), nn.Conv2d(64, 64, kernel_size=1), nn.Conv2d(64, 192, kernel_size=3, padding=1), nn.MaxPool2d(kernel_size=3, stride=2, padding=1)) self.block1 = nn.Sequential( Inception(192, 64, 96, 128, 16, 32, 32), Inception(256, 128, 128, 192, 32, 96, 64), nn.MaxPool2d(kernel_size=3, stride=2, padding=1) ) self.block2 = nn.Sequential(Inception(480, 192, 96, 208, 16, 48, 64), Inception(512, 160, 112, 224, 24, 64, 64), Inception(512, 128, 128, 256, 24, 64, 64), Inception(512, 112, 144, 288, 32, 64, 64), Inception(528, 256, 160, 320, 32, 128, 128), nn.MaxPool2d(kernel_size=3, stride=2, padding=1) ) self.block3 = nn.Sequential( Inception(832, 256, 160, 320, 32, 128, 128), Inception(832, 384, 192, 384, 48, 128, 128), ) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.outputs = nn.Sequential( nn.Dropout(0.5), nn.Linear(1024, 2) ) def forward(self, x): batch_size = x.size(0) x = self.inputs(x) x = self.block1(x) x = self.block2(x) x = self.block3(x) x = self.avgpool(x) x = x.view(batch_size, -1) x = self.outputs(x) return x # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((32, 32)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化LeNet模型 model = GoogleNet() # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}')
项目实战---制作UI页面
res.py
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os # 定义简化版的ResNet网络 class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion*planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion*planes) ) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = nn.ReLU()(out) return out class ResNet(nn.Module): def __init__(self, block, num_blocks, num_classes=2): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512*block.expansion, num_classes) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1]*(num_blocks-1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = nn.AdaptiveAvgPool2d((1, 1))(out) out = out.view(out.size(0), -1) out = self.linear(out) return out # 定义水质数据集 class WaterQualityDataset(Dataset): def __init__(self, root, transform=None): self.dataset = ImageFolder(root, transform=transform) def __getitem__(self, index): return self.dataset[index] def __len__(self): return len(self.dataset) # 数据预处理和加载 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) train_dataset = WaterQualityDataset('D:/dataset', transform=transform) test_dataset = WaterQualityDataset('D:/dataset', transform=transform) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0) # 初始化ResNet模型 model = ResNet(BasicBlock, [2, 2, 2, 2]) # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) num_epochs = 10 for epoch in range(num_epochs): model.train() running_loss = 0.0 for images, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f'Training Loss: {running_loss / len(train_loader)}') # 在测试集上验证模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in tqdm(test_loader, desc='Testing'): images, labels = images.to(device), labels.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = correct / total print(f'Test Accuracy: {accuracy}') # 创建目录 save_dir = 'D:\\models' os.makedirs(save_dir, exist_ok=True) # 保存模型 model_path = os.path.join(save_dir, 'my_model.pth') torch.save(model.state_dict(), model_path) print(f"Model saved to {model_path}")
UI.py
import sys from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton, QFileDialog from PyQt5.QtGui import QPixmap from torchvision import transforms from PIL import Image import torch import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import transforms from torchvision.datasets import ImageFolder from tqdm import tqdm import os # 加载训练好的模型 # 定义简化版的ResNet网络 class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion*planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion*planes) ) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = nn.ReLU()(out) return out class ResNet(nn.Module): def __init__(self, block, num_blocks, num_classes=2): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512*block.expansion, num_classes) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1]*(num_blocks-1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = nn.ReLU()(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = nn.AdaptiveAvgPool2d((1, 1))(out) out = out.view(out.size(0), -1) out = self.linear(out) return out class WaterQualityApp(QWidget): def __init__(self, model_path): super(WaterQualityApp, self).__init__() self.model = self.load_model(model_path) self.init_ui() def init_ui(self): self.layout = QVBoxLayout() self.image_label = QLabel(self) self.layout.addWidget(self.image_label) self.choose_button = QPushButton('Choose Image', self) self.choose_button.clicked.connect(self.choose_image) self.layout.addWidget(self.choose_button) self.result_label = QLabel(self) self.layout.addWidget(self.result_label) self.setLayout(self.layout) self.setWindowTitle('Water Quality Classifier') def load_model(self, model_path): model = ResNet(BasicBlock, [2, 2, 2, 2]) model.load_state_dict(torch.load(model_path)) model.eval() return model def choose_image(self): options = QFileDialog.Options() options |= QFileDialog.ReadOnly file_path, _ = QFileDialog.getOpenFileName(self, "Choose Image", "", "Image Files (*.png *.jpg *.jpeg);;All Files (*)", options=options) if file_path: self.process_image(file_path) def process_image(self, file_path): image = Image.open(file_path).convert('RGB') image = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ])(image) image = image.unsqueeze(0) # Add batch dimension # Model inference with torch.no_grad(): output = self.model(image) _, predicted = torch.max(output.data, 1) # Display image and result pixmap = QPixmap(file_path) self.image_label.setPixmap(pixmap) self.image_label.setScaledContents(True) if predicted.item() == 0: result_text = "Water Quality: Class 1" else: result_text = "Water Quality: Class 2" self.result_label.setText(result_text) if __name__ == '__main__': app = QApplication(sys.argv) model_path = 'D:\\models\\my_model.pth' window = WaterQualityApp(model_path) window.show() sys.exit(app.exec_())
实现方式二
- 使用集成的Resnet网络进行训练
- 美化UI页面
res.py
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from torchvision import datasets, transforms, models from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score import os # 设置随机种子以保证实验的可重复性 torch.manual_seed(42) # 数据集路径 data_dir = 'D:\\dataset' # 图像预处理 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) # 加载数据集 dataset = datasets.ImageFolder(data_dir, transform=transform) # 划分训练集和测试集 train_set, test_set = train_test_split(dataset, test_size=0.2, random_state=42) # 创建 DataLoader train_loader = DataLoader(train_set, batch_size=32, shuffle=True) test_loader = DataLoader(test_set, batch_size=32, shuffle=False) # 定义 ResNet 模型 model = models.resnet18(pretrained=True) num_features = model.fc.in_features model.fc = nn.Linear(num_features, 2) # 2个类别 # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 训练模型 num_epochs = 10 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) for epoch in range(num_epochs): model.train() running_loss = 0.0 for inputs, labels in train_loader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}") # 在测试集上验证模型 model.eval() all_labels = [] all_predictions = [] with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) _, predictions = torch.max(outputs, 1) all_labels.extend(labels.cpu().numpy()) all_predictions.extend(predictions.cpu().numpy()) # 计算准确率 accuracy = accuracy_score(all_labels, all_predictions) print(f"Accuracy on test set: {accuracy}") # 创建目录 save_dir = 'D:\\models' os.makedirs(save_dir, exist_ok=True) # 保存模型 model_path = os.path.join(save_dir, 'my_model.pth') torch.save(model.state_dict(), model_path) print(f"Model saved to {model_path}")
UI.py
import sys from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QVBoxLayout, QFileDialog from PyQt5.QtGui import QPixmap, QFont from PIL import Image import torch from torchvision import transforms, models from PyQt5.QtCore import Qt class WaterQualityDetectionApp(QWidget): def __init__(self): super().__init__() # 初始化 PyTorch 模型 self.model = models.resnet18(pretrained=True) num_features = self.model.fc.in_features self.model.fc = torch.nn.Linear(num_features, 2) # 2个类别 self.model.load_state_dict(torch.load('D:\\models\\my_model.pth')) self.model.eval() # 创建界面控件 self.image_label = QLabel() self.image_label.setAlignment(Qt.AlignCenter) self.image_label.setStyleSheet("border: 2px solid gray; border-radius: 5px;") self.upload_button = QPushButton("Upload Water Quality Image") self.upload_button.clicked.connect(self.load_image) self.upload_button.setStyleSheet("background-color: #4CAF50; color: white; font-size: 16px; padding: 10px; border-radius: 5px;") self.detect_button = QPushButton("Detect Water Quality") self.detect_button.clicked.connect(self.detect_water_quality) self.detect_button.setStyleSheet("background-color: #008CBA; color: white; font-size: 16px; padding: 10px; border-radius: 5px;") self.result_label = QLabel() self.result_label.setAlignment(Qt.AlignCenter) self.result_label.setStyleSheet("font-size: 18px; margin-top: 20px;") # 设置布局 layout = QVBoxLayout() layout.addWidget(self.image_label, 1) layout.addWidget(self.upload_button) layout.addWidget(self.detect_button) layout.addWidget(self.result_label) self.setLayout(layout) self.setWindowTitle("Water Quality Detection") self.setGeometry(100, 100, 800, 600) # 设置窗口大小 def load_image(self): # 打开文件对话框选择水质图像文件 file_dialog = QFileDialog() image_path, _ = file_dialog.getOpenFileName(self, "Open Water Quality Image File", "", "Image Files (*.png *.jpg *.bmp)") if image_path: # 显示选择的水质图像 pixmap = QPixmap(image_path) self.image_label.setPixmap(pixmap.scaledToWidth(600)) # 缩放图像以适应窗口 self.image_label.setScaledContents(True) self.image_path = image_path def detect_water_quality(self): # 对加载的水质图像进行检测 if hasattr(self, 'image_path'): try: image = Image.open(self.image_path).convert("RGB") transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) image = transform(image) image = image.unsqueeze(0) # 添加 batch 维度 with torch.no_grad(): prediction = self.model(image) _, predicted_class = torch.max(prediction, 1) result_text = "1类水" if predicted_class.item() == 0 else "2类水" self.result_label.setText(f"Detected Water Quality: {result_text}") # 更新结果标签 except Exception as e: print(f"Error during water quality detection: {e}") self.result_label.setText("Error during water quality detection. Please check the image.") if __name__ == "__main__": app = QApplication(sys.argv) water_quality_app = WaterQualityDetectionApp() water_quality_app.show() sys.exit(app.exec_())