基于 EfficientNetV2 实现判别MNIST 手写模型分类

pytorch深度学习项目实战100例 的学习记录


白票大王: google colab



EfficientNetV2是由 Google Research,Brain Team发布在2021 ICML的一篇论文,它结合使用NAS和缩放,优化训练速度和参数效率。并且模型中使用新操作(如 Fused-MBConv)在搜索空间中进行搜索。EfficientNetV2 模型比EfficientNetV1的训练速度快得多,同时体积小 6.8 倍。

理解和提高 EfficientNetV1 的训练效率

  1. 使用非常大的图像尺寸进行训练很慢( Training with very large image sizes is slow)

  2. Depth-wise卷积在模型的早期层执行缓慢但在后期层是有效的(Depthwise convolutions are slow in early layers but effective in later stages)

MBConv 和 Fused-MBConv 的结构

Fused-MBConv 逐渐将 EfficientNet-B4 中的原始 MBConv 替换为 Fused-MBConv。


EfficientNetV2 与 EfficientNetV1有几个主要区别:

  • EfficientNetV2 在早期层中广泛使用了 MBConv 和新添加的 fused-MBConv。
  • EfficientNetV2 更喜欢 MBConv 的较小扩展比,因为较小的扩展比往往具有较少的内存访问开销。
  • EfficientNetV2 更喜欢较小的核大小( 3×3),但它增加了更多层来补偿较小内核大小导致的感受野减少。
  • EfficientNetV2 完全移除了原始 EfficientNet 中的最后一个 stride-1 阶段,这可能是由于其较大的参数大小和内存访问开销。

(1) 我们将最大推理图像大小限制为 480,因为非常大的图像通常会导致昂贵的内存和训练速度开销; (2) 作为启发式,我们还逐渐将更多的层添加到后期(例如表 4 中的第 5 阶段和第 6 阶段),以增加网络容量而不会增加太多运行时开销。
当图像尺寸较小增广较弱时模型的表现最好; 但是对于更大的图像,它在更强的增广的情况下表现更好。从小图像尺寸和弱正则化(epoch = 1)开始,然后随着更大的图像尺寸和更强的正则化逐渐增加学习难度:更大的 Dropout 率、RandAugment 幅度和混合比



from collections import OrderedDict
from functools import partial

import mindspore.nn as nn
import mindspore.numpy as mnp
import mindspore.ops as ops

import argparse
import os

import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as C
import mindspore.dataset.vision.c_transforms as CV
from mindspore import Model
from mindspore import context
from mindspore import dtype as mstype
from mindspore.dataset.vision import Inter
from mindspore.nn import Accuracy
from mindspore.train.callback import LossMonitor
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig

def drop_path(x, drop_prob: float = 0., training: bool = False):
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets
    #     random_tensor = keep_prob + Tensor(torch.rand(shape).numpy())
    stdnormal = ops.StandardNormal(seed=1)
    random_tensor = keep_prob + stdnormal(shape)
    #     random_tensor.floor_()  # binarize
    random_tensor = mnp.floor(random_tensor)
    output = mnp.divide(x, keep_prob) * random_tensor
    #     output = x.div(keep_prob) * random_tensor
    return output

class DropPath(nn.Cell):
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def construct(self, x):
        return drop_path(x, self.drop_prob, self.training)

class ConvBNAct(nn.Cell):
    def __init__(self,
                 in_planes: int,
                 out_planes: int,
                 kernel_size: int = 3,
                 stride: int = 1,
                 groups: int = 1,
        super(ConvBNAct, self).__init__()

        #         self.activation=activation_layer
        padding = (kernel_size - 1) // 2
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if activation_layer is None:
            activation_layer = nn.ReLU  # alias Swish  (torch>=1.7)

        self.conv = nn.Conv2d(in_channels=in_planes,
                              # padding=padding,

        self.bn = norm_layer(out_planes)

        #         if self.activation is not no':
        self.act = activation_layer()

    def construct(self, x):
        result = self.conv(x)
        result = self.bn(result)
        #         if self.activation is not 'no':
        result = self.act(result)

        return result

class SqueezeExcite(nn.Cell):
    def __init__(self,
                 input_c: int,  # block input channel
                 expand_c: int,  # block expand channel
                 se_ratio: float = 0.25):
        super(SqueezeExcite, self).__init__()
        squeeze_c = int(input_c * se_ratio)
        self.conv_reduce = nn.Conv2d(expand_c, squeeze_c, 1, pad_mode='valid')
        self.act1 = nn.ReLU()  # alias Swish
        self.conv_expand = nn.Conv2d(squeeze_c, expand_c, 1, pad_mode='valid')
        self.act2 = nn.Sigmoid()

    def construct(self, x):
        scale = x.mean((2, 3), keep_dims=True)
        scale = self.conv_reduce(scale)
        scale = self.act1(scale)
        scale = self.conv_expand(scale)
        scale = self.act2(scale)
        return scale * x

class MBConv(nn.Cell):
    def __init__(self,
                 kernel_size: int,
                 input_c: int,
                 out_c: int,
                 expand_ratio: int,
                 stride: int,
                 se_ratio: float,
                 drop_rate: float,
        super(MBConv, self).__init__()

        if stride not in [1, 2]:
            raise ValueError("illegal stride value.")

        self.has_shortcut = (stride == 1 and input_c == out_c)

        activation_layer = nn.ReLU  # alias Swish
        expanded_c = input_c * expand_ratio

        # 在EfficientNetV2中,MBConv中不存在expansion=1的情况所以conv_pw肯定存在
        assert expand_ratio != 1
        # Point-wise expansion
        self.expand_conv = ConvBNAct(input_c,

        # Depth-wise convolution
        self.dwconv = ConvBNAct(expanded_c,

        #         self.se = SqueezeExcite(input_c, expanded_c, se_ratio) if se_ratio > 0 else ops.Identity
        self.se = SqueezeExcite(input_c, expanded_c, se_ratio) if se_ratio > 0 else None

        # Point-wise linear projection
        self.project_conv = ConvBNAct(expanded_c,
                                      #                                       activation_layer=ops.Identity
                                      )  # 注意这里没有激活函数,所有传入Identity

        self.out_channels = out_c

        # 只有在使用shortcut连接时才使用dropout层
        self.drop_rate = drop_rate
        if self.has_shortcut and drop_rate > 0:
            self.dropout = DropPath(drop_rate)

    def construct(self, x):
        result = self.expand_conv(x)
        result = self.dwconv(result)
        #         if self.se != 'no':
        if self.se is not None:
            result = self.se(result)
        result = self.project_conv(result)

        if self.has_shortcut:
            if self.drop_rate > 0:
                result = self.dropout(result)
            result += x

        return result

class FusedMBConv(nn.Cell):
    def __init__(self,
                 kernel_size: int,
                 input_c: int,
                 out_c: int,
                 expand_ratio: int,
                 stride: int,
                 se_ratio: float,
                 drop_rate: float,
        super(FusedMBConv, self).__init__()

        assert stride in [1, 2]
        assert se_ratio == 0

        self.has_shortcut = stride == 1 and input_c == out_c
        self.drop_rate = drop_rate

        self.has_expansion = expand_ratio != 1

        activation_layer = nn.ReLU  # alias Swish
        expanded_c = input_c * expand_ratio

        # 只有当expand ratio不等于1时才有expand conv
        if self.has_expansion:
            # Expansion convolution
            self.expand_conv = ConvBNAct(input_c,

            self.project_conv = ConvBNAct(expanded_c,
                                          #                                           activation_layer=ops.Identity
                                          )  # 注意没有激活函数
            # 当只有project_conv时的情况
            self.project_conv = ConvBNAct(input_c,
                                          activation_layer=activation_layer)  # 注意有激活函数

        self.out_channels = out_c

        # 只有在使用shortcut连接时才使用dropout层
        self.drop_rate = drop_rate
        if self.has_shortcut and drop_rate > 0:
            self.dropout = DropPath(drop_rate)

    def construct(self, x):
        if self.has_expansion:
            result = self.expand_conv(x)
            result = self.project_conv(result)
            result = self.project_conv(x)

        if self.has_shortcut:
            if self.drop_rate > 0:
                result = self.dropout(result)
            result += x

        return result

class EfficientNetV2(nn.Cell):
    def __init__(self,
                 model_cnf: list,
                 num_classes: int = 10,
                 num_features: int = 128,
                 dropout_rate: float = 0.2,
                 drop_connect_rate: float = 0.2):
        super(EfficientNetV2, self).__init__()

        for cnf in model_cnf:
            assert len(cnf) == 8

        norm_layer = partial(nn.BatchNorm2d, eps=1e-3, momentum=0.1)

        stem_filter_num = model_cnf[0][4]

        self.stem = ConvBNAct(1,
                              norm_layer=norm_layer)  # 激活函数默认是SiLU

        total_blocks = sum([i[0] for i in model_cnf])
        block_id = 0
        blocks = []
        for cnf in model_cnf:
            repeats = cnf[0]
            op = FusedMBConv if cnf[-2] == 0 else MBConv
            for i in range(repeats):
                                 input_c=cnf[4] if i == 0 else cnf[5],
                                 stride=cnf[2] if i == 0 else 1,
                                 drop_rate=drop_connect_rate * block_id / total_blocks,
                block_id += 1
        self.blocks = nn.SequentialCell(*blocks)

        head_input_c = model_cnf[-1][-3]
        head = OrderedDict()

        head.update({"project_conv": ConvBNAct(head_input_c,
                                               norm_layer=norm_layer)})  # 激活函数默认是SiLU

        #         self.adaptive=ops.AdaptiveAvgPool2D((None,1))
        #         head.update({"avgpool": ops.AdaptiveAvgPool2D(None,1)})
        head.update({"flatten": nn.Flatten()})

        if dropout_rate > 0:
            head.update({"dropout": nn.Dropout(keep_prob=dropout_rate)})
        head.update({"classifier": nn.Dense(num_features, num_classes)})

        self.head = nn.SequentialCell(head)

        self.avgpool = nn.AvgPool2d(kernel_size=(10, 12), pad_mode='valid')

    def construct(self, x):
        x = self.stem(x)
        x = self.blocks(x)
        x = self.avgpool(x)
        #         x = self.adaptive(x)
        x = self.head(x)
        return x

def efficientnetv2_s(num_classes: int = 10):
    model_config = [[2, 3, 1, 1, 24, 24, 0, 0],
                    [4, 3, 2, 4, 24, 48, 0, 0],
                    [4, 3, 2, 4, 48, 64, 0, 0],
                    [6, 3, 2, 4, 64, 128, 1, 0.25],
                    [9, 3, 1, 6, 128, 160, 1, 0.25],
                    [15, 3, 2, 6, 160, 256, 1, 0.25]]

    model = EfficientNetV2(model_cnf=model_config,
    return model

def create_dataset(data_path, batch_size=32, repeat_size=1,
    # 定义数据集
    mnist_ds = ds.MnistDataset(data_path)
    resize_height, resize_width = 300, 384

    # 定义所需要操作的map映射
    resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR)
    hwc2chw_op = CV.HWC2CHW()
    type_cast_op = C.TypeCast(mstype.int32)
    type_cast_op_image = C.TypeCast(mstype.float32)

    # 使用map映射函数,将数据操作应用到数据集
    mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(operations=type_cast_op_image, input_columns="image",
    mnist_ds = mnist_ds.map(operations=resize_op, input_columns="image", num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(operations=hwc2chw_op, input_columns="image", num_parallel_workers=num_parallel_workers)

    # 进行shuffle、batch操作
    buffer_size = 1000
    mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size)
    mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)

    return mnist_ds

def train_net(args, model, batch_size, epoch_size, data_path, repeat_size, ckpoint_cb, sink_mode):
    ds_train = create_dataset(os.path.join(data_path, 'train'), batch_size, repeat_size)
    model.train(epoch_size, ds_train, callbacks=[ckpoint_cb, LossMonitor(10)], dataset_sink_mode=sink_mode)

def test_net(network, model, data_path):
    ds_eval = create_dataset(os.path.join(data_path, 'test'))
    acc = model.eval(ds_eval, dataset_sink_mode=False)

net = efficientnetv2_s(num_classes=10)

parser = argparse.ArgumentParser(description='MindSpore Example')
parser.add_argument('--device_target', type=str, default="CPU", choices=['Ascend', 'GPU', 'CPU'])

args = parser.parse_known_args()[0]
context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)

# 定义损失函数
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

# 定义优化器
net_opt = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.9)

# 设置模型保存参数
config_ck = CheckpointConfig(save_checkpoint_steps=1875, keep_checkpoint_max=10)
# 应用模型保存参数
ckpoint = ModelCheckpoint(prefix='checkpoint_efficientvs', config=config_ck)

train_epoch = 10
# mnist_path = './datasets/cifar-10-batches-bin'
mnist_path = './datasets/MNIST_Data'
dataset_size = 1
batch_size = 16
model = Model(net, net_loss, net_opt, metrics={"Accuracy": Accuracy()})
train_net(args, model, batch_size, train_epoch, mnist_path, dataset_size, ckpoint, False)
test_net(net, model, mnist_path)



import copy
from functools import partial
from collections import OrderedDict

import torch
from torch import nn

def get_efficientnet_v2_structure(model_name):
    if 'efficientnet_v2_s' in model_name:
        return [
            # e k  s  in  out xN  se   fused
            (1, 3, 1, 24, 24, 2, False, True),
            (4, 3, 2, 24, 48, 4, False, True),
            (4, 3, 2, 48, 64, 4, False, True),
            (4, 3, 2, 64, 128, 6, True, False),
            (6, 3, 1, 128, 160, 9, True, False),
            (6, 3, 2, 160, 256, 15, True, False),
    elif 'efficientnet_v2_m' in model_name:
        return [
            # e k  s  in  out xN  se   fused
            (1, 3, 1, 24, 24, 3, False, True),
            (4, 3, 2, 24, 48, 5, False, True),
            (4, 3, 2, 48, 80, 5, False, True),
            (4, 3, 2, 80, 160, 7, True, False),
            (6, 3, 1, 160, 176, 14, True, False),
            (6, 3, 2, 176, 304, 18, True, False),
            (6, 3, 1, 304, 512, 5, True, False),
    elif 'efficientnet_v2_l' in model_name:
        return [
            # e k  s  in  out xN  se   fused
            (1, 3, 1, 32, 32, 4, False, True),
            (4, 3, 2, 32, 64, 7, False, True),
            (4, 3, 2, 64, 96, 7, False, True),
            (4, 3, 2, 96, 192, 10, True, False),
            (6, 3, 1, 192, 224, 19, True, False),
            (6, 3, 2, 224, 384, 25, True, False),
            (6, 3, 1, 384, 640, 7, True, False),
    elif 'efficientnet_v2_xl' in model_name:
        return [
            # e k  s  in  out xN  se   fused
            (1, 3, 1, 32, 32, 4, False, True),
            (4, 3, 2, 32, 64, 8, False, True),
            (4, 3, 2, 64, 96, 8, False, True),
            (4, 3, 2, 96, 192, 16, True, False),
            (6, 3, 1, 192, 256, 24, True, False),
            (6, 3, 2, 256, 512, 32, True, False),
            (6, 3, 1, 512, 640, 8, True, False),

class ConvBNAct(nn.Sequential):
    """Convolution-Normalization-Activation Module"""
    def __init__(self, in_channel, out_channel, kernel_size, stride, groups, norm_layer, act, conv_layer=nn.Conv2d):
        super(ConvBNAct, self).__init__(
            conv_layer(in_channel, out_channel, kernel_size, stride=stride, padding=(kernel_size-1)//2, groups=groups, bias=False),

class SEUnit(nn.Module):
    """Squeeze-Excitation Unit

    paper: https://openaccess.thecvf.com/content_cvpr_2018/html/Hu_Squeeze-and-Excitation_Networks_CVPR_2018_paper

    def __init__(self, in_channel, reduction_ratio=4, act1=partial(nn.SiLU, inplace=True), act2=nn.Sigmoid):
        super(SEUnit, self).__init__()
        hidden_dim = in_channel // reduction_ratio
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Conv2d(in_channel, hidden_dim, (1, 1), bias=True)
        self.fc2 = nn.Conv2d(hidden_dim, in_channel, (1, 1), bias=True)
        self.act1 = act1()
        self.act2 = act2()

    def forward(self, x):
        return x * self.act2(self.fc2(self.act1(self.fc1(self.avg_pool(x)))))

class StochasticDepth(nn.Module):

    paper: https://link.springer.com/chapter/10.1007/978-3-319-46493-0_39

        - prob: Probability of dying
        - mode: "row" or "all". "row" means that each row survives with different probability
    def __init__(self, prob, mode):
        super(StochasticDepth, self).__init__()
        self.prob = prob
        self.survival = 1.0 - prob
        self.mode = mode

    def forward(self, x):
        if self.prob == 0.0 or not self.training:
            return x
            shape = [x.size(0)] + [1] * (x.ndim - 1) if self.mode == 'row' else [1]
            return x * torch.empty(shape).bernoulli_(self.survival).div_(self.survival).to(x.device)

class MBConvConfig:
    """EfficientNet Building block configuration"""
    def __init__(self, expand_ratio: float, kernel: int, stride: int, in_ch: int, out_ch: int, layers: int,
                 use_se: bool, fused: bool, act=nn.SiLU, norm_layer=nn.BatchNorm2d):
        self.expand_ratio = expand_ratio
        self.kernel = kernel
        self.stride = stride
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.num_layers = layers
        self.act = act
        self.norm_layer = norm_layer
        self.use_se = use_se
        self.fused = fused

    def adjust_channels(channel, factor, divisible=8):
        new_channel = channel * factor
        divisible_channel = max(divisible, (int(new_channel + divisible / 2) // divisible) * divisible)
        divisible_channel += divisible if divisible_channel < 0.9 * new_channel else 0
        return divisible_channel

class MBConv(nn.Module):
    """EfficientNet main building blocks

        - c: MBConvConfig instance
        - sd_prob: stochastic path probability
    def __init__(self, c, sd_prob=0.0):
        super(MBConv, self).__init__()
        inter_channel = c.adjust_channels(c.in_ch, c.expand_ratio)
        block = []

        if c.expand_ratio == 1:
            block.append(('fused', ConvBNAct(c.in_ch, inter_channel, c.kernel, c.stride, 1, c.norm_layer, c.act)))
        elif c.fused:
            block.append(('fused', ConvBNAct(c.in_ch, inter_channel, c.kernel, c.stride, 1, c.norm_layer, c.act)))
            block.append(('fused_point_wise', ConvBNAct(inter_channel, c.out_ch, 1, 1, 1, c.norm_layer, nn.Identity)))
            block.append(('linear_bottleneck', ConvBNAct(c.in_ch, inter_channel, 1, 1, 1, c.norm_layer, c.act)))
            block.append(('depth_wise', ConvBNAct(inter_channel, inter_channel, c.kernel, c.stride, inter_channel, c.norm_layer, c.act)))
            block.append(('se', SEUnit(inter_channel, 4 * c.expand_ratio)))
            block.append(('point_wise', ConvBNAct(inter_channel, c.out_ch, 1, 1, 1, c.norm_layer, nn.Identity)))

        self.block = nn.Sequential(OrderedDict(block))
        self.use_skip_connection = c.stride == 1 and c.in_ch == c.out_ch
        self.stochastic_path = StochasticDepth(sd_prob, "row")

    def forward(self, x):
        out = self.block(x)
        if self.use_skip_connection:
            out = x + self.stochastic_path(out)
        return out

class EfficientNetV2(nn.Module):
    """Pytorch Implementation of EfficientNetV2

    paper: https://arxiv.org/abs/2104.00298

    - reference 1 (pytorch): https://github.com/d-li14/efficientnetv2.pytorch/blob/main/effnetv2.py
    - reference 2 (official): https://github.com/google/automl/blob/master/efficientnetv2/effnetv2_configs.py

        - layer_infos: list of MBConvConfig
        - out_channels: bottleneck channel
        - nlcass: number of class
        - dropout: dropout probability before classifier layer
        - stochastic depth: stochastic depth probability
    def __init__(self, layer_infos, out_channels=1280, nclass=0, dropout=0.2, stochastic_depth=0.0,
                 block=MBConv, act_layer=nn.SiLU, norm_layer=nn.BatchNorm2d):
        super(EfficientNetV2, self).__init__()
        self.layer_infos = layer_infos
        self.norm_layer = norm_layer
        self.act = act_layer

        self.in_channel = layer_infos[0].in_ch
        self.final_stage_channel = layer_infos[-1].out_ch
        self.out_channels = out_channels

        self.cur_block = 0
        self.num_block = sum(stage.num_layers for stage in layer_infos)
        self.stochastic_depth = stochastic_depth

        self.stem = ConvBNAct(3, self.in_channel, 3, 2, 1, self.norm_layer, self.act)
        self.blocks = nn.Sequential(*self.make_stages(layer_infos, block))
        self.head = nn.Sequential(OrderedDict([
            ('bottleneck', ConvBNAct(self.final_stage_channel, out_channels, 1, 1, 1, self.norm_layer, self.act)),
            ('avgpool', nn.AdaptiveAvgPool2d((1, 1))),
            ('flatten', nn.Flatten()),
            ('dropout', nn.Dropout(p=dropout, inplace=True)),
            ('classifier', nn.Linear(out_channels, nclass) if nclass else nn.Identity())

    def make_stages(self, layer_infos, block):
        return [layer for layer_info in layer_infos for layer in self.make_layers(copy.copy(layer_info), block)]

    def make_layers(self, layer_info, block):
        layers = []
        for i in range(layer_info.num_layers):
            layers.append(block(layer_info, sd_prob=self.get_sd_prob()))
            layer_info.in_ch = layer_info.out_ch
            layer_info.stride = 1
        return layers

    def get_sd_prob(self):
        sd_prob = self.stochastic_depth * (self.cur_block / self.num_block)
        self.cur_block += 1
        return sd_prob

    def forward(self, x):
        return self.head(self.blocks(self.stem(x)))

    def change_dropout_rate(self, p):
        self.head[-2] = nn.Dropout(p=p, inplace=True)

def efficientnet_v2_init(model):
    for m in model.modules():
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode='fan_out')
            if m.bias is not None:
        elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
        elif isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, mean=0.0, std=0.01)

def get_efficientnet_v2(model_name, pretrained, nclass=0, dropout=0.1, stochastic_depth=0.2, **kwargs):
    residual_config = [MBConvConfig(*layer_config) for layer_config in get_efficientnet_v2_structure(model_name)]
    model = EfficientNetV2(residual_config, 1280, nclass, dropout=dropout, stochastic_depth=stochastic_depth, block=MBConv, act_layer=nn.SiLU)

    return model


# 获取EfficientNet V2 S的结构配置
model_name = 'efficientnet_v2_s'
pretrained = False  # Assuming custom implementation without pretrained weights
nclass = 10  # For MNIST
dropout = 0.1
stochastic_depth = 0.2

model = get_efficientnet_v2(model_name, pretrained, nclass, dropout, stochastic_depth)

调用torchvision 的MNIST数据集

import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match EfficientNet's expected input
    transforms.Grayscale(num_output_channels=3),  # Convert to 3-channel RGB
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize for 3 channels

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()
epochs =100
# Training loop
for epoch in range(epochs):
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")


def test(model, device, test_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    test_loss = 0
    correct = 0
    with torch.no_grad():  # No need to track gradients for validation
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            test_loss += criterion(outputs, labels).item()  # Sum up batch loss
            pred = outputs.argmax(dim=1, keepdim=True)  # Get the index





