【深度学习】使用Pytorch实现的用于时间序列预测的各种深度学习模型类

深度学习模型类

  • 简介
  • 按滑动时间窗口切割数据集
  • 模型类
    • CNN
    • GRU
    • LSTM
    • MLP
    • RNN
    • TCN
    • Transformer
    • Seq2Seq

简介

本文所定义模型类的输入数据的形状shape统一为 [batch_size, time_step,n_features],batch_size为批次大小,time_step为时间步长,n_features为特征数量。另外该模型类同时适用于单特征与多特征

本项目代码统一了训练方式,只需在models文件夹中加入下面模型类,即可使用该模型,而不需要重新写训练模型等的代码,减少了代码的冗余。

代码有注释,不加解释
声明:转载请标明出处
在这里插入图片描述
超参数只需通过字典定义传入即可,所有训练方式一样的模型通用
在这里插入图片描述

按滑动时间窗口切割数据集

import os

import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader


class TimeSeriesDataset(Dataset):
    """
    自定义的时间序列数据集类,用于处理时间序列数据的加载和预处理。目的是将时间序列数据准备成适合机器学习模型训练的格式(按滑动窗口划分)。

    Args:
        data (torch.Tensor): 包含时间序列数据的张量,形状为 [1, n_features, data_len]
        args.time_step (int): 输入数据的时间步。
        args.skip (int): 输入数据的跳跃步。

     Returns:
            tuple: 包含输入数据 x 和目标数据 y 的元组。
            X: 输入数据的批次,形状为 [time_step, n_features]
            Y: 目标数据的批次,形状为 [1]

    """

    def __init__(self, data, args):
        self.data = data
        self.time_step = args.time_step
        self.skip = args.skip

    def __len__(self):
        n = self.data.shape[-1] - self.time_step + 1 - self.skip
        return n

    def __getitem__(self, idx):
        x = self.data[:, :, idx:idx + self.time_step].permute(2, 1, 0).squeeze(-1)
        y = self.data[:, :1, idx + self.time_step + self.skip - 1].view(-1)
        return x, y


def Dataset_Custom(args, if_Batching=True):
    """
    创建自定义时间序列数据集

    Args:
        args (Namespace): 包含所有必要参数的命名空间。
        if_Batching: 是否批次化,类似XGBoost算法不需要

    Returns:
        Tuple[TimeSeriesDataset, TimeSeriesDataset, TimeSeriesDataset]: 训练、验证和测试数据集
    """

    # 读取数据
    data = pd.read_csv(os.path.join(args.root_path, args.data_path))

    # 检查是否存在 "date" 列,如果存在则删除
    if 'date' in data.columns:
        data = data.drop('date', axis=1)

    # 确保 "load" 列是第一列,如果不是,将其移到第一列
    if args.target in data.columns:
        data = data[[args.target] + [col for col in data.columns if col != args.target]]

    # 定义数据集划分比例(例如,70% 训练集,10% 验证集,20% 测试集)
    data_len = len(data)
    num_train = int(data_len * 0.7)
    num_test = int(data_len * 0.2)
    num_vali = data_len - num_train - num_test

    train_data = data[:num_train]
    vali_data = data[num_train:num_train + num_vali]
    test_data = data[num_train + num_vali:]

    # 归一化
    scaler = MinMaxScaler(feature_range=(0, 1))

    scaler.fit(train_data)
    train_data = scaler.transform(train_data)
    vali_data = scaler.transform(vali_data)
    test_data = scaler.transform(test_data)

    # 转换为张量并添加维度
    train_data = torch.from_numpy(train_data).float()
    vali_data = torch.from_numpy(vali_data).float()
    test_data = torch.from_numpy(test_data).float()

    # 将其变为[1, n_features, data_len]
    train_data = train_data.unsqueeze(0).permute(0, 2, 1)
    vali_data = vali_data.unsqueeze(0).permute(0, 2, 1)
    test_data = test_data.unsqueeze(0).permute(0, 2, 1)

    # 按滑动时间窗口转成机器学习的数据格式
    training_dataset = TimeSeriesDataset(train_data, args)
    valiing_dataset = TimeSeriesDataset(vali_data, args)
    testing_dataset = TimeSeriesDataset(test_data, args)
    print(f"train:{len(training_dataset)},vali:{len(valiing_dataset)},test:{len(testing_dataset)}")
    if if_Batching:
        # 创建数据加载器,用于批量加载数据
        train_loader = DataLoader(training_dataset, shuffle=True, drop_last=True, batch_size=args.batch_size)
        vali_loader = DataLoader(valiing_dataset, shuffle=True, drop_last=True, batch_size=args.batch_size)
        test_loader = DataLoader(testing_dataset, shuffle=False, drop_last=False, batch_size=len(testing_dataset))
        return train_loader, vali_loader, test_loader
    else:
        return training_dataset, valiing_dataset, testing_dataset

模型类

CNN

import torch
from torch import nn





class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()
        self.input_size = configs.input_size  # 输入特征的大小
        self.output_size = configs.output_size  # 预测结果的维度
        self.time_step = configs.time_step  # 时间步数
        self.kernel_size = configs.kernel_size  # 卷积核的大小

        self.relu = nn.ReLU(inplace=True)  # ReLU激活函数

        # 第一个卷积层
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=self.input_size, out_channels=64, kernel_size=self.kernel_size),
            # 输入特征维度为input_size,输出通道数为64,卷积核大小为kernel_size
            nn.ReLU(),  # ReLU激活函数
            nn.MaxPool1d(kernel_size=self.kernel_size, stride=1)  # 最大池化,池化窗口大小为kernel_size,步长为1
        )

        # 第二个卷积层
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=2),
            # 输入通道数为64,输出通道数为128,卷积核大小为2
            nn.ReLU(),  # ReLU激活函数
            nn.MaxPool1d(kernel_size=self.kernel_size, stride=1)  # 最大池化,池化窗口大小为kernel_size,步长为1
        )

        # 根据卷积操作后的数据格式和输出大小计算线性层的输入维度
        conv_output_size = self._calculate_conv_output_size()

        # 线性层1,输入维度为卷积层输出大小,输出维度为50
        self.linear1 = nn.Linear(conv_output_size, 50)

        # 线性层2,输入维度为50,输出维度为预测结果的维度
        self.linear2 = nn.Linear(50, self.output_size)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = x.view(x.shape[0], -1)
        return x

    def _calculate_conv_output_size(self):
        """自动计算卷积层输出形状,此操作避免手算该参数 参数:input_size( 输入特征维度 )返回值: conv_output_size (卷积层输出的特征维度)"""
        input_tensor = torch.zeros(1, self.input_size, self.time_step)  # 创建输入零张量,维度为(1, input_size, time_step)
        conv1_output = self.conv1(input_tensor)
        conv2_output = self.conv2(conv1_output)
        conv_output_size = conv2_output.view(conv2_output.size(0), -1).size(1)
        return conv_output_size

GRU

import torch
from torch import nn
from torch.autograd import Variable


# GRU模型结构
class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()

        # 初始化模型参数
        self.output_size = configs.output_size  # 输出类别的数量
        self.num_layers = configs.num_layers  # GRU层数
        self.input_size = configs.input_size  # 输入特征的维度
        self.hidden_size = configs.hidden_size  # 隐藏状态的维度
        self.dropout = configs.dropout  # 在非循环层之间应用的丢弃比例,默认为0.0(没有丢弃)。
        self.bidirectional = configs.bidirectional  # 是否使用双向GRU

        # 创建GRU层  batch_first=True:输入数据的维度顺序是 (batch_size, seq_len, input_size)
        self.gru = nn.GRU(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers,
                          dropout=self.dropout, bidirectional=self.bidirectional, batch_first=True)
        # 创建全连接层用于输出预测结果
        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        # 初始化初始隐藏状态
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        # 通过GRU层进行前向传播
        out, h_0 = self.gru(x, h_0)
        # 取GRU的最后一个时间步的输出
        out = out[:, -1]
        # 通过全连接层进行分类预测
        out = self.fc(out)
        return out

LSTM

import torch
import torch.nn as nn
from torch.autograd import Variable


class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()
        self.input_size = configs.input_size  # 输入特征的大小。
        self.hidden_size = configs.hidden_size  # LSTM 隐藏状态的维度
        self.num_layers = configs.num_layers  # LSTM 层的堆叠层数
        self.output_size = configs.output_size  # 输出的大小(预测结果的维度)
        self.dropout = configs.dropout  # 在非循环层之间应用的丢弃比例,默认为0.0(没有丢弃)。
        self.bidirectional = configs.bidirectional  # 如果为True,LSTM将是双向的(包括前向和后向),默认为False。

        self.lstm = nn.LSTM(input_size=configs.input_size, hidden_size=configs.hidden_size,
                            num_layers=configs.num_layers, dropout=self.dropout, bidirectional=self.bidirectional,
                            batch_first=True)  # 定义 LSTM 层
        self.fc = nn.Linear(configs.hidden_size, configs.output_size)  # 定义线性层,将 LSTM 输出映射到预测结果的维度

    def forward(self, x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))  # 初始化 LSTM 的隐藏状态
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))  # 初始化 LSTM 的记忆状态
        ula, (h_out, _) = self.lstm(x, (h_0, c_0))  # 前向传播过程,返回 LSTM 层的输出序列和最后一个时间步的隐藏状态
        h_out = h_out[-1, :, :].view(-1, self.hidden_size)  # 提取最后一个时间步的隐藏状态并进行形状变换
        ''''使用LSTM的最后一个时间步的隐藏状态h_out作为线性层的输入,是因为模型将隐藏状态视为包含了序列信息的高层表示。
            在许多情况下,使用最后一个时间步的隐藏状态进行预测已经足够。'''
        out = self.fc(h_out)  # 将最后一个时间步的隐藏状态通过线性层进行预测
        return out

MLP

import torch.nn as nn


# MLP模型结构
class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()

        self.input_size = configs.input_size  # 输入特征的大小,即输入层的维度。
        self.output_size = configs.output_size  # 输出的大小,即预测结果的维度。
        self.channel_sizes = [int(size) for size in configs.channel_sizes.split(',')]  # 因为输入的是字符串,转成列表

        # 一个整数列表,指定每个隐藏层的单元数量。列表的长度表示隐藏层的层数,每个元素表示对应隐藏层的单元数量。
        self.time_step = configs.time_step
        layers = []
        input_adjust_size = self.input_size * self.time_step  # 将数据展开后的维度为 特征数量*时间步长
        # 遍历channel_sizes列表
        for i in range(len(self.channel_sizes)):
            if i == 0:
                # 对于第一层,创建一个从input_size到channel_sizes[i]的线性层
                self.linear = nn.Linear(input_adjust_size, self.channel_sizes[i])
                self.init_weights()  # 初始化线性层的权重
                layers += [self.linear, nn.ReLU()]  # 将线性层和ReLU激活函数添加到layers列表中
            else:
                # 对于后续层,创建一个从channel_sizes[i-1]到channel_sizes[i]的线性层
                self.linear = nn.Linear(self.channel_sizes[i - 1], self.channel_sizes[i])
                self.init_weights()  # 初始化线性层的权重
                layers += [self.linear, nn.ReLU()]  # 将线性层和ReLU激活函数添加到layers列表中
        # 创建最后一个线性层,从channel_sizes[-1]到output_size
        self.linear = nn.Linear(self.channel_sizes[-1], self.output_size)
        self.init_weights()  # 初始化线性层的权重
        layers += [self.linear]  # 将最后一个线性层添加到layers列表中
        # 使用layers列表创建一个Sequential网络
        self.network = nn.Sequential(*layers)

    def init_weights(self):
        # 使用均值为0,标准差为0.01的正态分布初始化线性层的权重
        self.linear.weight.data.normal_(0, 0.01)

    def forward(self, x):
        # X输入的shape为 [batch_size, time_step,n_features]

        # 将输入数据的维度展平,然后传递给线性层。
        # 无论输入数据的特征数和时间步数如何,都能适应到模型中。这个修改应该可以适用于不同维度的输入数据。
        x = x.view(x.size(0), -1)
        return self.network(x)

RNN

import torch
import torch.nn as nn
from torch.autograd import Variable


# RNN模型结构
class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()
        self.input_size = configs.input_size  # 输入特征的大小
        self.hidden_size = configs.hidden_size  # 隐藏状态的维度
        self.num_layers = configs.num_layers  # RNN的层数
        self.output_size = configs.output_size  # 输出的大小,即预测结果的维度
        self.dropout = configs.dropout  # 在非循环层之间应用的丢弃比例,默认为0.0(没有丢弃)。
        self.bidirectional = configs.bidirectional  # 如果为True,LSTM将是双向的(包括前向和后向),默认为False。
        # 定义RNN结构,输入特征大小、隐藏状态维度、层数等参数
        self.rnn = nn.RNN(input_size=self.input_size, hidden_size=self.hidden_size, dropout=self.dropout,
                          bidirectional=self.bidirectional, num_layers=self.num_layers, batch_first=True)
        # 将RNN的输出压缩到与输出大小相同的维度
        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        # 创建初始隐藏状态h_0,维度为(num_layers, batch_size, hidden_size)
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        # 通过RNN传播输入数据,获取输出out和最终隐藏状态h_0
        out, h_0 = self.rnn(x, h_0)
        # 取RNN最后一个时间步的输出,将其输入到全连接层进行预测
        out = self.fc(out[:, -1, :])
        return out

TCN

import torch.nn as nn
from torch.nn.utils import weight_norm

# TCN模型结构
'''
Chomp1d模块:用于从卷积层的输出中移除无效的时间步。
'''


class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        # Chomp1d模块的作用是从卷积层的输出中移除无效的时间步,即通过切片操作去掉最后的self.chomp_size个时间步。
        # 由于切片操作可能导致存储不连续,因此在返回结果之前,需要使用contiguous()方法确保存储连续性。
        return x[:, :, :-self.chomp_size].contiguous()


'''
TemporalBlock模块:包含两个卷积层和相应的正则化、激活函数和dropout操作。
第一个卷积层使用权重归一化,通过Chomp1d组件移除无效的时间步,然后经过ReLU激活函数和dropout操作。
第二个卷积层也经过相同的处理流程。通过残差连接将第二个卷积层的输出和输入进行相加,并通过ReLU激活函数得到最终的输出。
'''


class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        # 第一次卷积
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, 3096, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        # 随机失活(dropout)。随机失活是一种常用的正则化技术,用于减少过拟合
        self.dropout1 = nn.Dropout(dropout)

        # 第二次卷积
        self.conv2 = weight_norm(nn.Conv1d(3096, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        # 将两次卷积层按顺序组合成一个网络
        self.net = nn.Sequential(
            self.conv1, self.chomp1, self.relu1, self.dropout1,
            self.conv2, self.chomp2, self.relu2, self.dropout2,
        )
        # 如果输入通道数和输出通道数不相同,则需要进行下采样
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        # 初始化权重
        self.init_weights()

    def init_weights(self):
        # 使用均值为0,标准差为0.01的正态分布初始化权重
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        # 前向传播
        out = self.net(x)  # 通过两个卷积层
        res = x if self.downsample is None else self.downsample(x)  # 下采样
        return self.relu(out + res)  # 残差连接
    # 残差连接(Residual connection)是一种在神经网络中引入跨层连接的技术。它的目的是解决深层神经网络训练中的梯度消失或梯度爆炸问题,并促进信息在网络中的流动。
    # 在TCN模型中,残差连接被用于将每个TemporalBlock的输出与输入进行相加。这种设计使得信息可以直接通过跨层连接流动,有助于梯度的传播和模型的训练。
    # 具体地,在TemporalBlock的forward方法中,首先通过两个卷积层进行特征提取和建模,然后将第二个卷积层的输出和输入进行相加。这个相加的操作实现了残差连接。最终,通过ReLU激活函数对相加的结果进行非线性变换。
    # 残差连接的好处是,即使在网络较深的情况下,梯度可以通过跨层连接直接传播到前面的层次,减少了梯度消失的问题。同时,它也提供了一种捕捉输入与输出之间的细微差异和变化的机制,有助于提高模型的性能。


'''
TemporalConvNet模块是 TCN 模型的核心,由多个TemporalBlock组成的网络层次结构。
根据num_channels列表的长度,确定网络层次的深度。
每个层次上的TemporalBlock的参数根据当前层次的位置和前一层的输出通道数进行确定。
通过层次化的结构,模型可以捕捉序列中的长期依赖关系。
'''


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        self.relu = nn.ReLU()
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i - 1]
            out_channels = num_channels[i]
            # 每个层次添加一个TemporalBlock
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size - 1) * dilation_size, dropout=dropout)]
        # 将所有的TemporalBlock按顺序组合成一个网络
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.relu(self.network(x) + x[:, 0, :].unsqueeze(1))


'''
TCN模块:TCN模型的主体部分。
它包括一个TemporalConvNet,一个线性层和一个下采样层。
输入数据首先经过TemporalConvNet进行序列建模和特征提取,然后通过ReLU激活函数和残差连接进行处理。
最后,通过线性层进行预测,并通过下采样层将输入数据的通道数降低到与TemporalBlock的输出通道数相同,以便在残差连接中使用。
'''


class Model(nn.Module):
    def __init__(self,configs):  # input_size 输入的不同的时间序列数目
        super(Model, self).__init__()
        self.input_size = configs.input_size  # 输入特征的大小
        self.output_size = configs.output_size  # 预测结果的维度
        self.num_channels = [configs.nhid] * configs.levels  # 卷积层通道数的列表,用于定义TemporalConvNet的深度
        self.kernel_size = configs.kernel_size  # 卷积核的大小
        self.dropout = configs.dropout  # 随机丢弃率

        # TemporalConvNet层
        self.tcn = TemporalConvNet(self.input_size, self.num_channels, kernel_size=self.kernel_size, dropout=self.dropout)
        # 线性层,用于预测
        self.linear = nn.Linear(self.num_channels[-1], self.output_size)
        # 下采样层,用于通道数降低
        self.downsample = nn.Conv1d(self.input_size, self.num_channels[0], 1)
        self.relu = nn.ReLU()
        # 初始化权重
        self.init_weights()

    def init_weights(self):
        self.linear.weight.data.normal_(0, 0.01)
        self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        # 前向传播
        x = x.transpose(1, 2)
        # 通过TemporalConvNet进行序列建模和特征提取
        y1 = self.relu(self.tcn(x) + x[:, 0, :].unsqueeze(1))
        # 线性层进行预测
        return self.linear(y1[:, :, -1])


Transformer

import torch
import torch.nn as nn

from Models.layers.Transformer.decoder import Decoder
from Models.layers.Transformer.encoder import Encoder
from Models.layers.Transformer.utils import generate_original_PE, generate_regular_PE


class Model(nn.Module):
    """基于Attention is All You Need的Transformer模型。

    适用于顺序数据的经典Transformer模型。
    嵌入(Embedding)已被替换为全连接层,
    最后一层softmax函数替换为sigmoid函数。

    属性
    ----------
    layers_encoding: :py:class:`list` of :class:`Encoder.Encoder`
        编码器层的堆叠。
    layers_decoding: :py:class:`list` of :class:`Decoder.Decoder`
        解码器层的堆叠。

    参数
    ----------
    d_input:
        模型输入的维度。
    d_model:
        输入向量的维度。
    d_output:
        模型输出的维度。
    q:
        查询和键的维度。
    v:
        值的维度。
    h:
        头数。
    N:
        要堆叠的编码器和解码器层数量。
    attention_size:
        应用注意力机制的反向元素数量。
        如果为 ``None``,则不激活。默认为 ``None``。
    dropout:
        每个多头自注意力(MHA)或前馈全连接(PFF)块之后的dropout概率。
        默认为 ``0.3``。
    chunk_mode:
        切块模式,可以是 ``'chunk'``、``'window'`` 或 ``None`` 之一。默认为 ``'chunk'``。
    pe:
        要添加的位置编码类型,可以是 ``'original'``、``'regular'`` 或 ``None`` 之一。默认为 ``None``。
    pe_period:
        如果使用 ``'regular'`` 位置编码,则可以定义周期。默认为 ``None``。
    """

    def __init__(self, configs):
        """根据Encoder和Decoder块创建Transformer结构。"""
        super(Model, self).__init__()
        d_input = configs.d_input
        d_model = configs.d_model
        d_output = configs.d_output
        q = configs.q
        v = configs.v
        h = configs.h
        N = configs.N
        attention_size = configs.attention_size
        dropout = configs.dropout
        chunk_mode = configs.chunk_mode
        pe = configs.pe
        pe_period = configs.pe_period

        self._d_model = d_model

        self.layers_encoding = nn.ModuleList([Encoder(d_model,
                                                      q,
                                                      v,
                                                      h,
                                                      attention_size=attention_size,
                                                      dropout=dropout,
                                                      chunk_mode=chunk_mode) for _ in range(N)])
        self.layers_decoding = nn.ModuleList([Decoder(d_model,
                                                      q,
                                                      v,
                                                      h,
                                                      attention_size=attention_size,
                                                      dropout=dropout,
                                                      chunk_mode=chunk_mode) for _ in range(N)])

        self._embedding = nn.Linear(d_input, d_model)
        self._linear = nn.Linear(d_model, d_output)

        pe_functions = {
            'original': generate_original_PE,
            'regular': generate_regular_PE,
        }

        if pe in pe_functions.keys():
            self._generate_PE = pe_functions[pe]
            self._pe_period = pe_period
        elif pe is None:
            self._generate_PE = None
        else:
            raise NameError(
                f'未知的位置编码(PE)"{pe}"。必须为 {", ".join(pe_functions.keys())} 或 None。')

        self.name = 'transformer'

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """通过Transformer进行输入传播。

        通过嵌入模块、编码器和解码器堆叠以及输出模块进行输入传播。

        参数
        ----------
        x:
            形状为 (batch_size, K, d_input) 的 torch.Tensor。

        返回
        -------
            形状为 (batch_size, K, d_output) 的输出张量。
        """
        K = x.shape[1]

        # 嵌入模块
        encoding = self._embedding(x)

        # 添加位置编码
        if self._generate_PE is not None:
            pe_params = {'period': self._pe_period} if self._pe_period else {}
            positional_encoding = self._generate_PE(K, self._d_model, **pe_params)
            positional_encoding = positional_encoding.to(encoding.device)
            encoding.add_(positional_encoding)

        # 编码器堆叠
        for layer in self.layers_encoding:
            encoding = layer(encoding)

        # 解码器堆叠
        decoding = encoding

        # 添加位置编码
        if self._generate_PE is not None:
            positional_encoding = self._generate_PE(K, self._d_model)
            positional_encoding = positional_encoding.to(decoding.device)
            decoding.add_(positional_encoding)

        for layer in self.layers_decoding:
            decoding = layer(decoding, encoding)

        # 输出模块
        output = self._linear(decoding)
        output = torch.sigmoid(output)
        return output[:, -1, :]

Seq2Seq

import torch
import torch.nn as nn

# Seq2Seq 主类
class Model(nn.Module):
    def __init__(self,configs ):
        super().__init__()
        self.input_size = configs.input_size  # 输入特征的大小
        self.output_size = configs.output_size  # 输出的大小(预测结果的维度)
        self.Encoder = Encoder(configs.input_size, configs.hidden_size, configs.num_layers, configs.batch_size)
        self.Decoder = Decoder(configs.input_size, configs.hidden_size, configs.num_layers, configs.output_size, configs.batch_size)

    def forward(self, input_seq):
        target_len = self.output_size  # 预测步长
        batch_size, seq_len, _ = input_seq.shape[0], input_seq.shape[1], input_seq.shape[2]
        h, c = self.Encoder(input_seq)
        outputs = torch.zeros(batch_size, self.input_size, self.output_size)
        decoder_input = input_seq[:, -1, :]  # 获取解码器的初始输入
        for t in range(target_len):
            decoder_output, h, c = self.Decoder(decoder_input, h, c)  # 解码器的前向传播
            outputs[:, :, t] = decoder_output
            decoder_input = decoder_output  # 将解码器的输出作为下一个时间步的输入

        return outputs[:, 0, :]  # 返回预测输出

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, batch_size):
        super().__init__()
        self.input_size = input_size  # 输入特征的大小
        self.hidden_size = hidden_size  # LSTM 隐藏状态的维度
        self.num_layers = num_layers  # LSTM 层的堆叠层数
        self.num_directions = 1
        self.batch_size = batch_size
        # 定义编码器的LSTM层,接受输入序列
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True, bidirectional=False)

    def forward(self, input_seq):
        batch_size, seq_len = input_seq.shape[0], input_seq.shape[1]
        h_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size)  # 初始化 LSTM 的隐藏状态
        c_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size)  # 初始化 LSTM 的记忆状态
        output, (h, c) = self.lstm(input_seq, (h_0, c_0))  # 前向传播过程,返回 LSTM 层的输出序列和最后一个时间步的隐藏状态

        return h, c


class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.input_size = input_size  # 输入特征的大小
        self.hidden_size = hidden_size  # LSTM 隐藏状态的维度
        self.num_layers = num_layers  # LSTM 层的堆叠层数
        self.output_size = output_size  # 输出的大小(预测结果的维度)
        self.num_directions = 1
        self.batch_size = batch_size
        # 定义解码器的LSTM层和线性层
        self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first=True, bidirectional=False)
        self.linear = nn.Linear(self.hidden_size, self.input_size)  # 线性层,将 LSTM 输出映射到预测结果的维度

    def forward(self, input_seq, h, c):
        # input_seq(batch_size, input_size)
        input_seq = input_seq.unsqueeze(1)  # 在输入序列中添加一个时间步的维度
        output, (h, c) = self.lstm(input_seq, (h, c))  # 前向传播过程,返回 LSTM 层的输出序列和最后一个时间步的隐藏状态
        pred = self.linear(output.squeeze(1))  # 使用线性层进行预测,pred(batch_size, 1, output_size)

        return pred, h, c


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/106653.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Project Costs

/*** 初始化象棋的棋子,正常情况加载双方所有棋子,残局演示加载剩余棋子,按坐标位置摆放* * 【费用】* 因甲方要求产生工作量计算费用;新增、修改、删除需求* 因乙方生产缺陷工作量不计费用;缺陷、延误* * 来说个一个栗…

电商时代,VR全景如何解决实体店难做没流量?

近日,电商和实体经济的对立成为了热门话题,尽管电商的兴起确实对线下实体店造成了一定的冲击,但实体店也不是没有办法挽救。VR全景助力线下实体店打造线上店铺,打通流量全域布局,还能实现打开产品、查看产品内部细节等…

机器学习实验一:KNN算法,手写数字数据集(使用汉明距离)

KNN-手写数字数据集: 使用sklearn中的KNN算法工具包( KNeighborsClassifier)替换实现分类器的构建,注意使用的是汉明距离; 分段解释代码: import os import pandas as pd from Levenshtein import hamming导入所需的库,包括os用于文件操作,pandas用于数据处理,以及hamm…

P7473 重力球

P7473 重力球 Solution 考虑 Brute Force:对于每一次询问,通过 BFS 处理出最近的交汇点,输出答案。 很显然,会 TLE \colorbox{navy}{\color{white}{TLE}} TLE​。 故,考虑 优化: 观察发现障碍物数量非…

服务器数据恢复-服务器系统损坏启动蓝屏的数据恢复案例

服务器故障&分析: 某公司一台华为机架式服务器,运行过程中突然蓝屏。管理员将服务器进行了重启,但是服务器操作系统仍然进入蓝屏状态。 导致服务器蓝屏的原因非常多,比较常见的有:显卡/内存/cpu或者其他板卡接触不…

高品质工地建筑模板,防水耐用,易脱模

欢迎选购我们的产品:高品质工地建筑模板。作为一家专业厂家,我们提供适用于高层建筑的建筑模板,具有出色的防水耐用性能,且不易开胶。1. 高品质工地建筑模板:我们的建筑模板经过精心设计和制作,以确保其高品…

Unity地面交互效果——1、局部UV采样和混合轨迹

大家好,我是阿赵。   这期开始,打算介绍一下地面交互的一些做法。 比如: Unity引擎制作沙地实时凹陷网格的脚印效果 或者: Unity引擎制作雪地效果 这些效果的实现,需要基于一些基础的知识。所以这一篇先介绍一下简单…

QQ云端机器人登录系统php源码开心版

可能很多人不知道这源码有什么用,这款源码主要是针对群机器人爱好者的, 这是一个通过对接挂机宝里面机器人框架的一个网页站点, 用户通过网页登录 QQ 账号至挂机宝里面框架(可扫码登录、账密登录、跳转 QQ 快捷登录)…

抖音上怎么挂小程序?制作小程序挂载抖音视频

公司企业商家现在已经把抖音作为营销的渠道之一,目前抖音支持短视频挂载小程序,可方便做营销。以下给大家分享这一操作流程。 一、申请自主挂载能力 首先需要在抖音开放平台官网注册一个抖音小程序账号,然后申请短视频自主挂载能力。 二、搭…

论文阅读——BERT

ArXiv:https://arxiv.org/abs/1810.04805 github:GitHub - google-research/bert: TensorFlow code and pre-trained models for BERT 一、模型及特点: 1、模型: 深层双向transformer encoder结构 BERT-BASE:(L12, H…

某大型车企:加强汽车应用安全防护,开创智能网联汽车新篇章

​某车企是安徽省最大的整车制造企业,致力于为全球消费者带来高品质汽车产品和服务体验,是国内最早突破百万销量的汽车自主品牌。该车企利用数字技术推动供应链网络的新型互动,加快数字化转型,持续进行场景创新、生态创新&#xf…

JAVA毕业设计106—基于Java+Springboot的外卖系统(源码+数据库)

基于JavaSpringboot的外卖系统(源码数据库)106 一、系统介绍 本系统分为用户端和管理端角色 前台用户功能: 登录、菜品浏览,口味选择,加入购物车,地址管理,提交订单。 管理后台: 登录,员工管…

【Linux】CentOS8.4 安装docker

🦄 🎐个人主页 🎐✨🍁 🪁🍁🪁🍁🪁🍁 感谢点赞和关注 ,每天进步一点点!加油!🪁🍁🪁&…

整个自动驾驶小车001:概述

材料: 1,树梅派4b,作为主控,这个东西有linux系统,方便 2,HC-S104超声波模块,我有多个,不少于4个,我可以前后左右四个方向都搞一个 3,l298n模块,…

Spring概述

Spring概述 Spring 是最受欢迎的企业级 Java 应用程序开发框架,数以百万的来自世界各地的开发人员使用 Spring 框架来创建性能好、易于测试、可重用的代码。 Spring 框架是一个开源的 Java 平台,它最初是由 Rod Johnson 编写的,并且于 2003 …

LVS+keepalived高可用集群

1、定义 keepalived为lvs应运而生的高可用服务。lvs的调度器无法做高可用,keepalived实现的是调度器的高可用,但keepalived不只为lvs集群服务的,也可以做其他代理服务器的高可用,比如nginxkeepalived也可实现高可用(重…

【C语言】memmove()函数(拷贝重叠内存块函数详解)

🦄个人主页:修修修也 🎏所属专栏:C语言 ⚙️操作环境:Visual Studio 2022 目录 一.memmove()函数简介 1.函数功能 2.函数参数 1>.void * destination 2>.onst void * source 3>.size_t num 3.函数返回值 4.函数头文件 二.memmove()函数…

基于nodejs+vue人脸识别考勤管理系统的设计与实现

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性:…

【Linux】部署单体项目以及前后端分离项目(项目部署)

一、简介 以下就是Linux部署单机项目和前后端分离项目的优缺点,希望对你有所帮助。 1、Linux部署单机项目: 优点: 1.简化了系统管理:由于所有服务都在同一台机器上运行,因此可以简化系统管理和维护。 2.提高了性能&a…

【面试经典150 | 栈】最小栈

文章目录 Tag题目来源题目解读解题思路方法一:辅助栈方法二:一个栈方法三:栈中存放差值 其他语言python3 写在最后 Tag 【设计类】【栈】 题目来源 155. 最小栈 题目解读 本题是一个设计类的题目,设计一个最小栈类 MinStack() …