频率增强通道注意力机制(FECAM)学习总结

本文提出了一种新的频率增强通道注意力机制(FECAM),旨在解决时间序列预测中傅里叶变换因吉布斯现象导致的高频噪声问题。FECAM基于离散余弦变换,能自适应地模拟信道间的频率依赖性,有效避免预测误差。实验显示,该机制可提升多种主流模型的预测性能,如LSTM、Reformer、Informer和Autoformer等。


时间序列预测是一个长期存在的挑战,因为真实世界的信息处于各种场景(例如,能源,天气,交通,经济,地震预警)。然而,一些主流的预测模型预测结果与实际情况大相径庭。我们认为,这是模型缺乏捕获真实世界数据集中丰富包含的频率信息的能力的原因。目前主流的频率信息提取方法都是基于傅里叶变换(FT)的。然而,由于吉布斯现象,FT的使用是有问题的。如果序列两侧的值差异很大,则在两侧周围观察到振荡近似,并且会引入高频噪声。因此,我们提出了一种新的频率增强信道注意力,它基于离散余弦变换自适应地模拟信道之间的频率相互依赖性,这将从本质上避免傅里叶变换过程中有问题的周期引起的高频噪声,这被定义为吉布斯现象。我们证明了该网络在六个真实数据集中的泛化非常有效,并实现了最先进的性能,我们进一步证明了频率增强信道注意力机制模块可以灵活地应用于不同的网络。该模块可以提高现有主流网络的预测能力,只需几行代码,即可在LSTM上降低35.99%的MSE,在Reformer上降低10.01%,在Informer上降低8.71%,在Autoformer上降低8.29%,在Transformer上降低8.06%等,计算成本很小。

这篇论文提出了一种基于离散余弦变换的频率增强通道注意机制(FECAM),通过捕捉时间序列中的频率信息来提升主流深度学习模型的预测能力,有效避免傅里叶变换引入的高频噪声,显著提升了多种时间序列预测任务的性能。

SENET(channel attention)

FECAM(Frequency Enhanced Channel Attention Mechanism)

As a module to enhance the frequency domain modeling capability of transformers and LSTM

 下面对其部分源码进行解读:

SlefAttention_Family.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt

import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
# from masking import TriangularCausalMask, ProbMask#for parameter count
import os


class FullAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(FullAttention, self).__init__()
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)

    def forward(self, queries, keys, values, attn_mask):
        B, L, H, E = queries.shape
        _, S, _, D = values.shape
        scale = self.scale or 1. / sqrt(E)

        scores = torch.einsum("blhe,bshe->bhls", queries, keys)

        if self.mask_flag:
            if attn_mask is None:
                attn_mask = TriangularCausalMask(B, L, device=queries.device)

            scores.masked_fill_(attn_mask.mask, -np.inf)

        A = self.dropout(torch.softmax(scale * scores, dim=-1))
        V = torch.einsum("bhls,bshd->blhd", A, values)

        if self.output_attention:
            return (V.contiguous(), A)
        else:
            return (V.contiguous(), None)


class ProbAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(ProbAttention, self).__init__()
        self.factor = factor
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)

    def _prob_QK(self, Q, K, sample_k, n_top):  # n_top: c*ln(L_q)
        # Q [B, H, L, D]
        B, H, L_K, E = K.shape
        _, _, L_Q, _ = Q.shape

        # calculate the sampled Q_K
        K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
        index_sample = torch.randint(L_K, (L_Q, sample_k))  # real U = U_part(factor*ln(L_k))*L_q
        K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
        Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()

        # find the Top_k query with sparisty measurement
        M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
        M_top = M.topk(n_top, sorted=False)[1]

        # use the reduced Q to calculate Q_K
        Q_reduce = Q[torch.arange(B)[:, None, None],
                   torch.arange(H)[None, :, None],
                   M_top, :]  # factor*ln(L_q)
        Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))  # factor*ln(L_q)*L_k

        return Q_K, M_top

    def _get_initial_context(self, V, L_Q):
        B, H, L_V, D = V.shape
        if not self.mask_flag:
            # V_sum = V.sum(dim=-2)
            V_sum = V.mean(dim=-2)
            contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
        else:  # use mask
            assert (L_Q == L_V)  # requires that L_Q == L_V, i.e. for self-attention only
            contex = V.cumsum(dim=-2)
        return contex

    def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
        B, H, L_V, D = V.shape

        if self.mask_flag:
            attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
            scores.masked_fill_(attn_mask.mask, -np.inf)

        attn = torch.softmax(scores, dim=-1)  # nn.Softmax(dim=-1)(scores)

        context_in[torch.arange(B)[:, None, None],
        torch.arange(H)[None, :, None],
        index, :] = torch.matmul(attn, V).type_as(context_in)
        if self.output_attention:
            attns = (torch.ones([B, H, L_V, L_V]) / L_V).type_as(attn).to(attn.device)
            attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
            return (context_in, attns)
        else:
            return (context_in, None)

    def forward(self, queries, keys, values, attn_mask):
        B, L_Q, H, D = queries.shape
        _, L_K, _, _ = keys.shape

        queries = queries.transpose(2, 1)
        keys = keys.transpose(2, 1)
        values = values.transpose(2, 1)

        U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item()  # c*ln(L_k)
        u = self.factor * np.ceil(np.log(L_Q)).astype('int').item()  # c*ln(L_q)

        U_part = U_part if U_part < L_K else L_K
        u = u if u < L_Q else L_Q

        scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)

        # add scale factor
        scale = self.scale or 1. / sqrt(D)
        if scale is not None:
            scores_top = scores_top * scale
        # get the context
        context = self._get_initial_context(values, L_Q)
        # update the context with selected top_k queries
        context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)

        return context.contiguous(), attn


class AttentionLayer(nn.Module):
    def __init__(self, attention, d_model, n_heads, d_keys=None,
                 d_values=None):
        super(AttentionLayer, self).__init__()

        d_keys = d_keys or (d_model // n_heads)
        d_values = d_values or (d_model // n_heads)

        self.inner_attention = attention
        self.query_projection = nn.Linear(d_model, d_keys * n_heads)
        self.key_projection = nn.Linear(d_model, d_keys * n_heads)
        self.value_projection = nn.Linear(d_model, d_values * n_heads)
        self.out_projection = nn.Linear(d_values * n_heads, d_model)
        self.n_heads = n_heads

    def forward(self, queries, keys, values, attn_mask):
        B, L, _ = queries.shape
        _, S, _ = keys.shape
        H = self.n_heads

        queries = self.query_projection(queries).view(B, L, H, -1)
        keys = self.key_projection(keys).view(B, S, H, -1)
        values = self.value_projection(values).view(B, S, H, -1)

        out, attn = self.inner_attention(
            queries,
            keys,
            values,
            attn_mask
        )
        out = out.view(B, L, -1)

        return self.out_projection(out), attn

这个代码实现了一个注意力机制模块,主要包括了两种注意力机制(全局注意力 FullAttention 和 概率注意力 ProbAttention),以及一个常规的 AttentionLayer 用于处理多头注意力。以下是详细的解释:

1. 基础导入

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
  • torchtorch.nntorch.nn.functional 是 PyTorch 的核心模块,用于定义和操作神经网络结构。
  • matplotlib.pyplotnumpy 是常用的 Python 库,用于绘图和数值计算。
  • TriangularCausalMaskProbMask 用于为注意力机制提供掩码(masking),以确保特定位置的注意力权重被正确计算。

2. FullAttention 类(全局注意力)

class FullAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(FullAttention, self).__init__()
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)
  • FullAttention 实现了全局自注意力机制。它通过 querieskeys 计算注意力权重。
  • mask_flag:决定是否应用掩码。
  • scale:对注意力分数进行缩放。
  • attention_dropout:为防止过拟合,应用 Dropout。

forward 函数:

def forward(self, queries, keys, values, attn_mask):
    B, L, H, E = queries.shape
    _, S, _, D = values.shape
    scale = self.scale or 1. / sqrt(E)

    scores = torch.einsum("blhe,bshe->bhls", queries, keys)
    if self.mask_flag:
        if attn_mask is None:
            attn_mask = TriangularCausalMask(B, L, device=queries.device)
        scores.masked_fill_(attn_mask.mask, -np.inf)
    A = self.dropout(torch.softmax(scale * scores, dim=-1))
    V = torch.einsum("bhls,bshd->blhd", A, values)
    return (V.contiguous(), A if self.output_attention else None)
  • torch.einsum:使用爱因斯坦求和约定来进行矩阵乘法,计算 querieskeys 的相似度。
  • mask_flag:当使用掩码时,将填充 -np.inf 来忽略特定位置的注意力分数。
  • softmax:用于将分数转换为概率分布。
  • V:是使用注意力分数和 values 计算出的加权和。

3. ProbAttention 类(概率注意力)

class ProbAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(ProbAttention, self).__init__()
        self.factor = factor
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)
  • ProbAttention 实现了一种基于稀疏注意力的机制。通过概率的方法减少计算量,特别适用于长序列的处理。
  • factor:控制采样和稀疏度。
  • scaleattention_dropout 类似于 FullAttention 中的作用。

_prob_QK 函数:

def _prob_QK(self, Q, K, sample_k, n_top):
    K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
    index_sample = torch.randint(L_K, (L_Q, sample_k))
    K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
    Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()

    M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
    M_top = M.topk(n_top, sorted=False)[1]

    Q_reduce = Q[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], M_top, :]
    Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))
    return Q_K, M_top
  • 目的:通过抽样计算 QK 的相似度,从而减少全局计算开销。
  • 采样:通过从 K 中随机选择一部分,并与 Q 进行计算,得出最具代表性的 Top_k 查询,减少计算负担。

4. AttentionLayer 类

class AttentionLayer(nn.Module):
    def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None):
        super(AttentionLayer, self).__init__()
        self.inner_attention = attention
        self.query_projection = nn.Linear(d_model, d_keys * n_heads)
        self.key_projection = nn.Linear(d_model, d_keys * n_heads)
        self.value_projection = nn.Linear(d_model, d_values * n_heads)
        self.out_projection = nn.Linear(d_values * n_heads, d_model)
        self.n_heads = n_heads
  • AttentionLayer 是多头注意力层,将输入的 querieskeysvalues 进行线性变换,然后通过注意力机制处理。
  • d_model:输入特征的维度。
  • n_heads:多头注意力的头数。
  • inner_attention:可以是 FullAttentionProbAttention
forward 函数:
def forward(self, queries, keys, values, attn_mask):
    B, L, _ = queries.shape
    H = self.n_heads

    queries = self.query_projection(queries).view(B, L, H, -1)
    keys = self.key_projection(keys).view(B, S, H, -1)
    values = self.value_projection(values).view(B, S, H, -1)

    out, attn = self.inner_attention(
        queries,
        keys,
        values,
        attn_mask
    )
    out = out.view(B, L, -1)
    return self.out_projection(out), attn
  • 线性变换querieskeysvalues 都通过线性层进行投影,转换为适应多头注意力的形状。
  • 多头注意力:将 querieskeysvalues 分成多个头来计算注意力。
  • 输出投影:最后通过 out_projection 投影到原始维度。

总结

  • 全局注意力FullAttention)计算每对 querieskeys 的相似度,用于标准的 Transformer 模型。
  • 概率注意力ProbAttention)通过稀疏采样减少计算复杂度,特别适用于长序列输入。
  • AttentionLayer 实现了一个多头注意力机制,可以选择全局注意力或概率注意力来处理输入。

这段代码的核心是实现不同的注意力机制,主要用于序列到序列的建模任务,例如时间序列预测或自然语言处理任务。


在 PyTorch 中,forward 函数是每个自定义神经网络模块(nn.Module)的核心方法,它定义了模型的前向传播过程。前向传播是指输入数据如何经过网络层,逐步计算出输出结果的过程。

forward 函数的主要作用:

  1. 定义模型的计算逻辑forward 函数负责具体的计算过程。每当你将输入传入模型时,模型会自动调用 forward 函数来执行从输入到输出的所有计算。

    forward 函数中,你将定义:

    • 数据如何流经每一层(如卷积层、全连接层、注意力机制等)。
    • 应用哪些激活函数、正则化(如 BatchNormDropout)等操作。
    • 最终如何产生输出。
  2. 将输入映射到输出forward 函数的核心任务是将输入数据(张量)传递到各层并生成输出。例如,在卷积神经网络中,输入张量会经过卷积层、池化层、激活函数等模块,最后输出结果。

  3. 自动执行反向传播: 虽然 forward 仅定义前向传播过程,但它与自动微分(autograd)紧密相连。当你调用 loss.backward() 时,PyTorch 会自动根据 forward 中的操作来计算梯度,进而进行反向传播和参数更新。


Transformer_EncDec.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.dctnet import dct_channel_block, dct
# from dctnet import dct_channel_block, dct

class ConvLayer(nn.Module):
    def __init__(self, c_in):
        super(ConvLayer, self).__init__()
        self.downConv = nn.Conv1d(in_channels=c_in,
                                  out_channels=c_in,
                                  kernel_size=3,
                                  padding=2,
                                  padding_mode='circular')
        self.norm = nn.BatchNorm1d(c_in)
        self.activation = nn.ELU()
        self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        x = self.downConv(x.permute(0, 2, 1))
        x = self.norm(x)
        x = self.activation(x)
        x = self.maxPool(x)
        x = x.transpose(1, 2)
        return x


class EncoderLayer(nn.Module):
    def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        self.dct_layer=dct_channel_block(512)
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)


    def forward(self, x, attn_mask=None):
        # print(x.shape)
        
        #torch.Size([32, 96, 512])
        #torch.Size([32, 49, 512])
        '''
        before self-attention
        '''
        # mid  = self.dct_layer(x)#
        # x = x+mid
        new_x, attn = self.attention(
            x, x, x,
            attn_mask=attn_mask
        )
        x = x + self.dropout(new_x)
        
        y = x = self.norm1(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        
        # print("y.shape:",y.shape)#y.shape: torch.Size([32, 96, 512])#为了减少计算量,不然要做512次L=96的dct
        #加入到module
        result = self.norm2(x + y)
        # mid  = self.dct_layer(result)
        # result = result+mid
        # result = self.dct_norm(result) #norm 144


        return result, attn
        # return self.norm2(x + y), attn


class Encoder(nn.Module):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(Encoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer

    def forward(self, x, attn_mask=None):
        # x [B, L, D]
        attns = []
        if self.conv_layers is not None:
            for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
                x, attn = attn_layer(x, attn_mask=attn_mask)
                x = conv_layer(x)
                attns.append(attn)
            x, attn = self.attn_layers[-1](x)
            attns.append(attn)
        else:
            for attn_layer in self.attn_layers:
                x, attn = attn_layer(x, attn_mask=attn_mask)
                attns.append(attn)

        if self.norm is not None:
            x = self.norm(x)

        return x, attns


class DecoderLayer(nn.Module):
    def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
                 dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        # self.dct_layer=dct_channel_block(512)
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        x = x + self.dropout(self.self_attention(
            x, x, x,
            attn_mask=x_mask
        )[0])
        x = self.norm1(x)

        x = x + self.dropout(self.cross_attention(
            x, cross, cross,
            attn_mask=cross_mask
        )[0])

        y = x = self.norm2(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        
        # result = self.norm3(x + y)
        
        # mid  = self.dct_layer(result)
        # result = result+mid
        # result = self.dct_norm(result) #norm 144
        # return result
        # return self.norm3(x + y)
        return self.norm3(x + y)


class Decoder(nn.Module):
    def __init__(self, layers, norm_layer=None, projection=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        for layer in self.layers:
            x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)

        if self.norm is not None:
            x = self.norm(x)

        if self.projection is not None:
            x = self.projection(x)
        return x

这段代码定义了一个类似 Transformer 的神经网络架构,包含卷积层、编码器层和解码器层。它可以用于处理序列数据(如时间序列、自然语言处理等)。具体包括以下组件:

1. ConvLayer(卷积层):

class ConvLayer(nn.Module):
    def __init__(self, c_in):
        super(ConvLayer, self).__init__()
        self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=2, padding_mode='circular')
        self.norm = nn.BatchNorm1d(c_in)
        self.activation = nn.ELU()
        self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
  • 这个卷积层首先对输入进行 1D 卷积,然后进行批归一化(BatchNorm)和 ELU 激活,再通过最大池化层减少特征维度。
  • 作用:将高维的输入进行降采样,同时提取局部特征。

2. EncoderLayer(编码器层)

class EncoderLayer(nn.Module):
    def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        self.dct_layer = dct_channel_block(512)  # DCT增强
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • 作用:这个层结合了注意力机制、自注意力(self-attention)、卷积层(用于前馈网络),并且通过 LayerNorm 和 Dropout 进行正则化。它还引入了 DCT(离散余弦变换)来增强输入的特征处理。
  • 自注意力机制:通过 attention 来捕获不同位置之间的相关性。
  • 卷积网络:通过卷积进一步对特征进行变换和学习。

3. Encoder(编码器)

class Encoder(nn.Module):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(Encoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer
  • 作用:这个类是编码器的整体结构,由多个 EncoderLayer 组成。如果有卷积层(conv_layers),会在每次注意力计算后对输入进行卷积操作。
  • 流程:输入经过每一层的注意力层,最后通过 LayerNorm 进行归一化。

4. DecoderLayer(解码器层)

class DecoderLayer(nn.Module):
    def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
  • 作用:解码器层包括了自注意力(self_attention)和交叉注意力(cross_attention)。自注意力用于捕捉解码器内部的依赖关系,而交叉注意力用于将编码器的输出与解码器的输入结合。
  • 卷积层和归一化:使用卷积和归一化对注意力层的输出进一步处理。

5. Decoder(解码器)

class Decoder(nn.Module):
    def __init__(self, layers, norm_layer=None, projection=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection
  • 作用:解码器由多个 DecoderLayer 组成。输入首先通过每一层的解码器,结合来自编码器的输出,最后通过归一化和可选的投影层生成最终的输出。

6. 前向传播 (forward) 函数

每个模块都有一个 forward 函数,定义了数据在该模块中的流动过程。

ConvLayer.forward
def forward(self, x):
    x = self.downConv(x.permute(0, 2, 1))
    x = self.norm(x)
    x = self.activation(x)
    x = self.maxPool(x)
    x = x.transpose(1, 2)
    return x
  • 操作顺序
    1. 输入经过一维卷积处理。
    2. 批归一化。
    3. 激活函数 ELU。
    4. 最大池化。
    5. 变换维度并返回输出。
EncoderLayer.forward
def forward(self, x, attn_mask=None):
    new_x, attn = self.attention(x, x, x, attn_mask=attn_mask)
    x = x + self.dropout(new_x)
    
    y = x = self.norm1(x)
    y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
    y = self.dropout(self.conv2(y).transpose(-1, 1))
    
    result = self.norm2(x + y)
    return result, attn
  • 操作顺序
    1. 输入通过注意力层进行自注意力计算。
    2. 经过残差连接和 Dropout。
    3. 进行 LayerNorm 归一化。
    4. 输入通过前馈卷积网络。
    5. 最后输出经过归一化的结果。
DecoderLayer.forward
def forward(self, x, cross, x_mask=None, cross_mask=None):
    x = x + self.dropout(self.self_attention(x, x, x, attn_mask=x_mask)[0])
    x = self.norm1(x)
    x = x + self.dropout(self.cross_attention(x, cross, cross, attn_mask=cross_mask)[0])

    y = x = self.norm2(x)
    y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
    y = self.dropout(self.conv2(y).transpose(-1, 1))

    return self.norm3(x + y)
  • 操作顺序
    1. 输入通过自注意力计算。
    2. 进行 LayerNorm 归一化。
    3. 再通过交叉注意力与编码器的输出结合。
    4. 经过前馈卷积层,最后输出结果。

总结

  • 卷积层 用于特征提取和降采样。
  • 编码器解码器 使用注意力机制来捕捉不同时间步之间的依赖关系,并通过卷积层进行进一步的特征处理。
  • 前向传播 函数定义了每个模块内部的数据流动和处理逻辑,结合了卷积、注意力机制和正则化。

Embed.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
import math


class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model).float()
        pe.require_grad = False

        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]


class TokenEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(TokenEmbedding, self).__init__()
        padding = 1 if torch.__version__ >= '1.5.0' else 2
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
                                   kernel_size=3, padding=padding, padding_mode='circular', bias=False)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
        return x


class FixedEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = torch.zeros(c_in, d_model).float()
        w.require_grad = False

        position = torch.arange(0, c_in).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        w[:, 0::2] = torch.sin(position * div_term)
        w[:, 1::2] = torch.cos(position * div_term)

        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()


class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4
        hour_size = 24
        weekday_size = 7
        day_size = 32
        month_size = 13

        Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
        if freq == 't':
            self.minute_embed = Embed(minute_size, d_model)
        self.hour_embed = Embed(hour_size, d_model)
        self.weekday_embed = Embed(weekday_size, d_model)
        self.day_embed = Embed(day_size, d_model)
        self.month_embed = Embed(month_size, d_model)

    def forward(self, x):
        x = x.long()

        minute_x = self.minute_embed(x[:, :, 4]) if hasattr(self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:, :, 3])
        weekday_x = self.weekday_embed(x[:, :, 2])
        day_x = self.day_embed(x[:, :, 1])
        month_x = self.month_embed(x[:, :, 0])

        return hour_x + weekday_x + day_x + month_x + minute_x


class TimeFeatureEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='timeF', freq='h'):
        super(TimeFeatureEmbedding, self).__init__()

        freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
        d_inp = freq_map[freq]
        self.embed = nn.Linear(d_inp, d_model, bias=False)

    def forward(self, x):
        return self.embed(x)


class DataEmbedding(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
        return self.dropout(x)


class DataEmbedding_wo_pos(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.temporal_embedding(x_mark)
        return self.dropout(x)

class DataEmbedding_wo_pos_temp(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos_temp, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x)
        return self.dropout(x)

class DataEmbedding_wo_temp(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_temp, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.position_embedding(x)
        return self.dropout(x)

这段代码实现了一系列嵌入层(Embedding Layers),主要用于将输入数据(如时间序列数据)转换为适合模型处理的向量表示。这种嵌入是序列建模中非常重要的一部分,特别是在处理时间序列数据时。

主要模块解释

  1. PositionalEmbedding(位置嵌入)

    • 作用:为每个序列位置编码一个向量,使得模型能够感知序列中每个位置的顺序。常用于 Transformer 模型中,因为这种模型本身不具备位置信息的感知能力。
    • 实现细节
      • 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用 sincos
      • pe 是一个预计算的二维张量,包含最大序列长度(max_len)和每个位置的嵌入(d_model 的维度)。
  2. TokenEmbedding(令牌嵌入)

    • 作用:将输入的数据通过一维卷积进行嵌入。可以理解为将输入的多维数据映射到一个高维的特征空间。
    • 实现细节
      • 使用 1D 卷积将输入通道数(c_in)转换为模型维度(d_model)。
      • 初始化权重时使用 Kaiming 初始化方法,适合 ReLU 和 LeakyReLU 激活函数。
  3. FixedEmbedding(固定嵌入)

    • 作用:通过固定的正弦和余弦函数生成嵌入,类似于位置嵌入,但它是用于固定长度的输入数据。
    • 实现细节
      • 每个位置的奇数位使用 sin,偶数位使用 cos
      • 权重矩阵 w 是固定的,不参与训练。
  4. TemporalEmbedding(时间嵌入)

    • 作用:为时间特征(如小时、天、月份等)生成嵌入,特别适用于处理时间序列数据。
    • 实现细节
      • 支持嵌入不同的时间粒度(如分钟、小时、周几、天、月份等)。
      • 可以选择使用固定嵌入(FixedEmbedding)或可学习嵌入(nn.Embedding)。
  5. TimeFeatureEmbedding(时间特征嵌入)

    • 作用:为特定时间粒度(如小时、天、周等)的数值进行线性嵌入。
    • 实现细节
      • 根据 freq_map 映射不同的时间粒度,并使用线性变换嵌入。
  6. DataEmbedding(数据嵌入)

    • 作用:组合了 TokenEmbeddingPositionalEmbeddingTemporalEmbedding,生成完整的嵌入向量。
    • 实现细节
      • 先将输入数据通过 TokenEmbedding 进行嵌入。
      • 然后结合 TemporalEmbedding 提取时间相关特征。
      • 还会加入位置嵌入(PositionalEmbedding)来提供序列位置信息。
      • 最后通过 Dropout 防止过拟合。
  7. DataEmbedding_wo_pos(无位置嵌入的数据嵌入)

    • 作用:与 DataEmbedding 类似,但不包含位置嵌入。
    • 实现细节
      • 只包含 TokenEmbeddingTemporalEmbedding,省略位置嵌入部分。
  8. DataEmbedding_wo_pos_temp(无位置和时间嵌入的数据嵌入)

    • 作用:只进行 TokenEmbedding,不包括时间和位置嵌入。
    • 实现细节
      • 仅使用 TokenEmbedding 来处理输入数据。
  9. DataEmbedding_wo_temp(无时间嵌入的数据嵌入)

    • 作用:不包括时间嵌入,但仍然使用位置嵌入。
    • 实现细节
      • 包含 TokenEmbeddingPositionalEmbedding,省略了 TemporalEmbedding

嵌入层的整体作用

这些嵌入层的主要作用是将输入的时间序列数据转换成向量表示,这样可以更好地被神经网络模型(如 Transformer 或 CNN)处理。时间序列数据通常包含时间信息(如天、小时等),嵌入层通过提取这些时间信息来增强模型对时间依赖性的感知。通过组合 TokenEmbedding(提取特征)、PositionalEmbedding(位置信息)和 TemporalEmbedding(时间信息),模型可以捕捉到输入数据的复杂时空关系。

dctnet.py代码:

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
try:
    from torch import irfft
    from torch import rfft
except ImportError:
    def rfft(x, d):
        t = torch.fft.fft(x, dim = (-d))
        r = torch.stack((t.real, t.imag), -1)
        return r
    def irfft(x, d):
        t = torch.fft.ifft(torch.complex(x[:,:,0], x[:,:,1]), dim = (-d))
        return t.real

def dct(x, norm=None):
    """
    Discrete Cosine Transform, Type II (a.k.a. the DCT)

    For the meaning of the parameter `norm`, see:
    https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.fftpack.dct.html

    :param x: the input signal
    :param norm: the normalization, None or 'ortho'
    :return: the DCT-II of the signal over the last dimension
    """
    x_shape = x.shape
    N = x_shape[-1]
    x = x.contiguous().view(-1, N)

    v = torch.cat([x[:, ::2], x[:, 1::2].flip([1])], dim=1)

    # Vc = torch.fft.rfft(v, 1, onesided=False)
    Vc = rfft(v, 1)

    k = - torch.arange(N, dtype=x.dtype, device=x.device)[None, :] * np.pi / (2 * N)
    W_r = torch.cos(k)
    W_i = torch.sin(k)

    V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_i

    if norm == 'ortho':
        V[:, 0] /= np.sqrt(N) * 2
        V[:, 1:] /= np.sqrt(N / 2) * 2

    V = 2 * V.view(*x_shape)
    

    return V


# class senet_block(nn.Module):
#     def __init__(self, channel=512, ratio=1):
#         super(dct_channel_block, self).__init__()
#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(channel, channel // 4, bias=False),
#                 nn.ReLU(inplace=True),
#                 nn.Linear(channel //4, channel, bias=False),
#                 nn.Sigmoid()
#         )

#     def forward(self, x):
#         # b, c, l = x.size() # (B,C,L)
#         # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
#         # print("y",y.shape)
#         x = x.permute(0,2,1)
#         b, c, l = x.size() 
#         y = self.avg_pool(x).view(b, c) # (B,C,L) ->(B,C,1)
#         # print("y",y.shape)
#         # y = self.fc(y).view(b, c, 96)

#         y = self.fc(y).view(b,c,1)
#         # print("y",y.shape)
#         # return x * y
#         return (x*y).permute(0,2,1)
class dct_channel_block(nn.Module):
    def __init__(self, channel):
        super(dct_channel_block, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.fc = nn.Sequential(
                nn.Linear(channel, channel*2, bias=False),
                nn.Dropout(p=0.1),
                nn.ReLU(inplace=True),
                nn.Linear( channel*2, channel, bias=False),
                nn.Sigmoid()
        )
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  
        self.dct_norm = nn.LayerNorm([96], eps=1e-6)#for lstm on length-wise
        # self.dct_norm = nn.LayerNorm([36], eps=1e-6)#for lstm on length-wise on ill with input =36


    def forward(self, x):
        b, c, l = x.size() # (B,C,L) (32,96,512)
        # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
        
        # y = self.avg_pool(x).view(b, c) # (B,C,L) -> (B,C,1)
        # print("y",y.shape
        # y = self.fc(y).view(b, c, 96)
        list = []
        for i in range(c):
            freq=dct(x[:,i,:])     
            # print("freq-shape:",freq.shape)
            list.append(freq)
         
        

        stack_dct=torch.stack(list,dim=1)
        stack_dct = torch.tensor(stack_dct)
        '''
        for traffic mission:f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic datasets
        '''
        
        lr_weight = self.dct_norm(stack_dct) 
        lr_weight = self.fc(stack_dct)
        lr_weight = self.dct_norm(lr_weight) 
        
        # print("lr_weight",lr_weight.shape)
        return x *lr_weight #result


if __name__ == '__main__':
    
    tensor = torch.rand(8,7,96)
    dct_model = dct_channel_block()
    result = dct_model.forward(tensor) 
    print("result.shape:",result.shape)


这段代码实现了基于离散余弦变换(DCT)的通道块,用于增强神经网络中的特征表示。以下是代码的详细解释:

1. 导入库和兼容性处理

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch

导入了 PyTorch 的 nn 模块用于神经网络构建,mathnumpy 用于数学运算。

try:
    from torch import irfft
    from torch import rfft
except ImportError:
    def rfft(x, d):
        t = torch.fft.fft(x, dim=(-d))
        r = torch.stack((t.real, t.imag), -1)
        return r

    def irfft(x, d):
        t = torch.fft.ifft(torch.complex(x[:, :, 0], x[:, :, 1]), dim=(-d))
        return t.real

这一段代码用于兼容旧版本的 PyTorch,定义了 rfftirfft 函数。在较新的 PyTorch 版本中,rfftirfft 函数可能已被替换,因此这里使用自定义实现确保代码的兼容性。

2. DCT(离散余弦变换)

def dct(x, norm=None):
    ...

这个函数实现了 离散余弦变换 (DCT),具体是 DCT-II 类型。DCT 是一种频域转换,通常用于信号处理和特征提取,它可以将时域信号转换为频域表示。

主要步骤:
  • 重新排列数据:将输入张量中的偶数和奇数索引部分组合在一起。
  • 进行 FFT(快速傅里叶变换):通过 rfft 函数进行快速傅里叶变换,将信号转换到频域。
  • 正弦和余弦加权:通过 cossin 函数对 FFT 的结果进行加权,得到离散余弦变换的结果。
  • 归一化处理:如果指定 norm='ortho',进行正交归一化。

3. DCT 通道块

class dct_channel_block(nn.Module):
    def __init__(self, channel):
        super(dct_channel_block, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(channel, channel * 2, bias=False),
            nn.Dropout(p=0.1),
            nn.ReLU(inplace=True),
            nn.Linear(channel * 2, channel, bias=False),
            nn.Sigmoid()
        )
        self.dct_norm = nn.LayerNorm([96], eps=1e-6)

这个类实现了基于 DCT 的通道块,主要用于对输入张量进行变换,增强特征的表达能力。

主要组件:
  • 全连接层(fc:使用了两层全连接层,结合 ReLU 激活函数和 Dropout,来对 DCT 变换后的频域特征进行进一步处理。
  • LayerNorm(dct_norm:用于标准化处理,以稳定训练过程。
前向传播 (forward):
def forward(self, x):
    b, c, l = x.size() # (B,C,L) (32,96,512)
    list = []
    for i in range(c):
        freq = dct(x[:, i, :])
        list.append(freq)

    stack_dct = torch.stack(list, dim=1)
    stack_dct = torch.tensor(stack_dct)
    
    lr_weight = self.dct_norm(stack_dct)
    lr_weight = self.fc(stack_dct)
    lr_weight = self.dct_norm(lr_weight)
    
    return x * lr_weight
  • 输入x 的形状为 (B, C, L),其中 B 是批量大小,C 是通道数,L 是序列长度。
  • DCT 变换:对于每个通道,计算 DCT,得到频域表示。
  • 处理 DCT 结果:将 DCT 结果通过全连接层和标准化层处理,得到特征权重。
  • 输出:将原始输入与 DCT 处理后的权重相乘,得到最终的输出。

4. 主函数

if __name__ == '__main__':
    tensor = torch.rand(8, 7, 96)
    dct_model = dct_channel_block(96)
    result = dct_model.forward(tensor)
    print("result.shape:", result.shape)

这部分代码用于测试 dct_channel_block 类:

  • 输入:随机生成一个形状为 (8, 7, 96) 的张量,代表批量大小为 8,通道数为 7,序列长度为 96。
  • 模型实例化:实例化 dct_channel_block,并使用前向传播方法对输入进行处理。
  • 输出:打印处理后的输出张量的形状。

总结

  • DCT 的作用:DCT 可以将时域信号转换为频域信号,在信号处理和特征提取中非常有用。通过这种变换,可以捕捉输入数据中的频率模式,从而增强模型的表达能力。
  • 通道块的作用dct_channel_block 结合 DCT 和全连接层来处理输入特征,并生成频域权重以调整输入特征。
  • 代码的目的:实现了一种结合 DCT 的特征增强机制,旨在通过频域处理提升神经网络在时序数据上的表现。

LSTM.py代码:

from pip import main
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
# from dctnet import dct_channel_block,dct #for parameters calc
import argparse
from fvcore.nn import FlopCountAnalysis,parameter_count_table  

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通
    def __init__(self,configs,input_size=None,hidden_size=16, output_size=None,batch_size=None,num_layers=2):#input_size与output_size是通道数
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.input_size = configs.enc_in #channel
        self.hidden_size = hidden_size #输出维度 也就是输出通道
        self.num_layers = num_layers
        self.output_size = configs.enc_in #channel #输出个数
        self.num_directions = 1 # 单向LSTM
        self.batch_size = configs.batch_size
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
        self.linear = nn.Linear(self.hidden_size, self.output_size)#通道数对齐层
        self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)#输出长度对齐层
        # self.linear = nn.Linear()
        #add dce-block
        self.dct_layer=dct_channel_block(configs.seq_len)
        self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)#作为模块一般normal channel效果好点
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)#作为模块一般normal channel效果好点
    def forward(self, x):
        # x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL
        # b, c, l = x.size() # (B,C,L)

        # batch_size, seq_len = x.shape[0], x.shape[1]
        # h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
        # c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
        h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
        c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
        # output(batch_size, seq_len, num_directions * hidden_size)
        output, _ = self.lstm(x, (h_0, c_0)) # output(5, 30, 64)
        # print("output.shape:",output.shape)#result.shape: torch.Size([8, 96, 8])
        result = self.linear(output)  # (B,L,C)
        # output.shape: torch.Size([8, 96, 16])#16是hidden_size
        # result.shape: torch.Size([8, 96, 8])#8是为了符合通道数对齐,exchange_rate有8个变量
        '''
        dct
        '''
        result = self.dct_layer(result.permute(0,2,1))#加入dct模块,mse降低0.12个点
        result_len =  self.linear_out_len(result)#为了输出长度对齐 (8,8,96)
        '''
        dct
        '''
        # result_len =  self.linear_out_len(result.permute(0,2,1))#为了输出长度对齐 (8,8,96)
            

        # print("result.shape:",result.shape)#result.shape: torch.Size([8, 96, 8])
        # result = result.permute(0,2,1)#(B,L,C)=》(B,C,L)
        # result = self.dct_layer(result_len)#加入dct模块,mse降低0.12个点
        # result = result.permute(0,2,1)#(B,C,L)=》(B,L,C)

        return  result_len.permute(0,2,1)
# lstm = LSTM(input_size=7,hidden_size=64, output_size=7,batch_size=8,num_layers=5).to(device)
# tensor = torch.rand(8, 96, 7).to(device)
# result = lstm(tensor)
# print("result.shape:",result.shape)
if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
    # forecasting task
    parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
    parser.add_argument('--label_len', type=int, default=48, help='start token length')
    parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
    parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')
    parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # DLinear with --individual, use this hyperparameter as the number of channels
    parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
    parser.add_argument('--c_out', type=int, default=7, help='output size')
    parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
    parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
    parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
    parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')

    # optimization
    parser.add_argument('--num_workers', type=int, default=10, help='data loader num workers')
    parser.add_argument('--itr', type=int, default=2, help='experiments times')
    parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
    parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
    parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
    parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
    parser.add_argument('--des', type=str, default='test', help='exp description')
    parser.add_argument('--loss', type=str, default='mse', help='loss function')
    parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
    parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)

    # GPU
    parser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')
    parser.add_argument('--gpu', type=int, default=0, help='gpu')
    parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
    parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')
    parser.add_argument('--test_flop', action='store_true', default=False, help='See utils/tools for usage')

    args = parser.parse_args()
    model = Model(args)
    print(parameter_count_table(model))

这段代码实现了一个基于 LSTM(长短期记忆网络)和 DCT(离散余弦变换)的时间序列预测模型,并且可以处理不同的模型配置和超参数。这段代码主要用于时间序列数据的预测任务,具体解释如下:

1. 模型类 Model

class Model(nn.Module):
    def __init__(self, configs, input_size=None, hidden_size=16, output_size=None, batch_size=None, num_layers=2):
        super(Model, self).__init__()
        ...
  • configs:通过命令行传递的配置参数(如序列长度、批量大小、隐藏层大小等)。
  • input_size:输入的特征数量(通道数),从 configs.enc_in 中获取。
  • hidden_size:LSTM 隐藏层的大小,默认是 16,即 LSTM 输出的特征维度。
  • num_layers:LSTM 网络的层数,默认为 2
  • output_size:模型的输出特征数,从 configs.enc_in 中获取。

该模型的结构包含:

  • 一个 LSTM 层,用于处理输入的时间序列数据。
  • 两个 线性层linearlinear_out_len),用于通道数和序列长度的对齐。
  • 一个基于 DCT 的增强模块 dct_channel_block,用于提取频域特征。

2. LSTM 和线性层

self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)  # 通道数对齐
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)  # 输出长度对齐
  • LSTM 层:输入维度为 input_size,输出维度为 hidden_size,LSTM 层数为 num_layers
  • 线性层 linear:用于将 LSTM 的输出从 hidden_size 映射回输入的特征数量(通道数)。
  • 线性层 linear_out_len:用于将序列长度从 seq_len 映射到预测的序列长度 pred_len

3. DCT 通道块

self.dct_layer = dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)
  • dct_channel_block:通过 DCT(离散余弦变换)提取频域信息,并对输入特征进行处理。它有助于改善模型对时间序列中的周期性或频域特征的捕捉能力。
  • dct_norm:标准化层,用于将经过 DCT 处理的特征进行归一化。

4. 前向传播函数 forward

def forward(self, x):
    ...
    h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
    c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
    
    output, _ = self.lstm(x, (h_0, c_0))  # LSTM 前向传播
    result = self.linear(output)  # 通道数对齐

    result = self.dct_layer(result.permute(0, 2, 1))  # DCT 模块
    result_len = self.linear_out_len(result)  # 长度对齐

    return result_len.permute(0, 2, 1)
  • 初始化隐藏状态和细胞状态h_0c_0 是 LSTM 的初始隐藏状态和细胞状态。
  • LSTM 前向传播:输入 x 经过 LSTM 层,得到 output,并进行通道数对齐。
  • DCT 处理:经过 LSTM 输出后的结果通过 DCT 模块处理,并对频域特征进行操作。
  • 长度对齐:通过 linear_out_len 线性层调整预测序列的长度为 pred_len
  • 输出:返回最终的预测结果,并调整形状。

5. 命令行参数配置

parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
...
args = parser.parse_args()
model = Model(args)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • enc_in:输入特征的通道数。
  • batch_size:批量大小。

通过 argparse 模块,可以通过命令行传入这些参数来控制模型的配置。

6. 模型参数和 FLOPs 计算

from fvcore.nn import FlopCountAnalysis, parameter_count_table  
print(parameter_count_table(model))
  • parameter_count_table:用于计算模型的参数数量。
  • FlopCountAnalysis:用于分析模型的浮点操作(FLOPs)。

这段代码通过 fvcore 库计算模型的参数量,并输出这些信息以便进行模型复杂度的评估。

总结

  • 该模型结合了 LSTMDCT,用于时间序列数据的预测。
  • 通过 DCT 增强模型对频域特征的捕捉能力,有助于提高模型的预测性能。
  • 通过命令行参数,模型的配置可以灵活调整,并且支持多 GPU 训练和 FLOPs 计算。

oneCNN.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
        
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通#CNN
    def __init__(self,configs):
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.conv1 = nn.Conv1d(in_channels=self.channel_num,out_channels=self.channel_num,kernel_size=5,stride=1,padding=2) #输入通道数和输出通道数应该一样
        # input = torch.randn(32,7,96)
        # batch_size x text_len x embedding_size -> batch_size x embedding_size x text_len
        self.dct_layer=dct_channel_block(self.seq_len)
        self.linear = nn.Linear(self.seq_len,self.pred_len)
        # self.dct_norm = nn.LayerNorm([7], eps=1e-6)#作为模块一般normal channel效果好点



    def forward(self, x):
        # print("x.shape:",x.shape)
        x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL and 1-d conv
        # b, c, l = x.size() # (B,C,L)
        
        out = self.conv1(x) #b,c,l
        out = self.linear(x)#b,c,l
        # print(out.size())
        # out  = self.dct_layer(out)#加入dct模块,mse降低0.12个点
        # out = self.linear(x)#b,c,l
        # x = x+mid
        # x = x.permute(0,2,1) 
        # x = self.dct_norm(x) #norm 144
        # x = x.permute(0,2,1) 
        return  (out).permute(0,2,1)#b,l,c

这段代码定义了一个基于 CNN(卷积神经网络)DCT(离散余弦变换) 的简单时间序列预测模型。模型通过 1D 卷积和 DCT 增强特征提取,结合线性层对输出进行预测。

代码详解

1. 模型初始化
class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        
        self.conv1 = nn.Conv1d(in_channels=self.channel_num, out_channels=self.channel_num, kernel_size=5, stride=1, padding=2)
        self.dct_layer = dct_channel_block(self.seq_len)
        self.linear = nn.Linear(self.seq_len, self.pred_len)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • channel_num:输入的通道数,代表时间序列的特征数量(例如 7 维度的输入表示有 7 个不同的特征)。
网络组件:
  • conv1:一维卷积层,输入和输出通道数保持一致,卷积核大小为 5,步幅为 1,并使用 padding=2 保证输入和输出序列的长度保持不变。

  • dct_layer:通过 dct_channel_block 提取频域特征,用离散余弦变换增强特征。

  • linear:线性层用于将序列长度从 seq_len 映射到预测长度 pred_len,在输出前对时间序列进行长度调整。

2. 前向传播函数 forward
def forward(self, x):
    x = x.permute(0, 2, 1)  # 转置输入,调整维度顺序 (B, L, C) -> (B, C, L)
    
    out = self.conv1(x)  # 应用一维卷积,提取特征 (B, C, L)
    out = self.linear(x)  # 应用线性层进行序列长度变换
    
    return out.permute(0, 2, 1)  # 再次转置,调整维度回到 (B, L, C)
  • 输入:输入 x 的形状为 (B, L, C),即 B 表示批量大小,L 表示序列长度,C 表示输入的特征维度(通道数)。

  • 转置:在进行 1D 卷积前,先通过 x.permute(0, 2, 1) 将输入从形状 (B, L, C) 转为 (B, C, L),使得输入符合 1D 卷积的要求。

  • 卷积:通过 conv1 提取特征,卷积后维度仍然是 (B, C, L)

  • 线性层:将经过卷积后的特征通过线性层 linear,将序列长度从 seq_len 转换为 pred_len

  • 输出:最后将输出再转置回 (B, L, C),作为最终的预测结果。

可选模块(未激活的部分)

# out  = self.dct_layer(out)  # DCT 层:可选的 DCT 增强模块,用于通过离散余弦变换进一步处理特征。
# out = self.linear(x)  # 可选的线性层,可以用于对卷积后输出再进行进一步处理。
# x = self.dct_norm(x)  # 标准化层,用于标准化 DCT 后的特征。

这些部分暂时被注释掉了,表示 DCT 层和标准化层可能是可选的改进点。在某些情况下,通过 DCT 提取频域特征可以提高模型的性能。

3. 总结

  • 卷积操作:1D 卷积用于捕捉输入时间序列的局部特征。
  • DCT 操作:可选的 DCT 层能够将时域信号转换为频域信号,捕捉输入数据中的周期性或频率特征,从而进一步增强模型的表现。
  • 线性映射:线性层负责调整序列的长度,保证输入和输出长度匹配。

这个模型设计相对简单,但由于使用了卷积操作和 DCT 层,因此在处理具有时空依赖性的数据(如时间序列预测)时有一定优势。


Transformer.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.Transformer_EncDec import Decoder, DecoderLayer, Encoder, EncoderLayer, ConvLayer
from layers.SelfAttention_Family import FullAttention, AttentionLayer
from layers.Embed import DataEmbedding,DataEmbedding_wo_pos,DataEmbedding_wo_temp,DataEmbedding_wo_pos_temp
import numpy as np
from layers.dctnet import dct_channel_block,dct

class Model(nn.Module):
    """
    Vanilla Transformer with O(L^2) complexity
    """
    def __init__(self, configs):
        super(Model, self).__init__()
        self.pred_len = configs.pred_len
        self.output_attention = configs.output_attention

        # Embedding
        if configs.embed_type == 0:
            self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                            configs.dropout)
            self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
                                           configs.dropout)
        elif configs.embed_type == 1:
            self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
            self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
        elif configs.embed_type == 2:
            self.enc_embedding = DataEmbedding_wo_pos(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
            self.dec_embedding = DataEmbedding_wo_pos(configs.dec_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)

        elif configs.embed_type == 3:
            self.enc_embedding = DataEmbedding_wo_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
            self.dec_embedding = DataEmbedding_wo_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
        elif configs.embed_type == 4:
            self.enc_embedding = DataEmbedding_wo_pos_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
            self.dec_embedding = DataEmbedding_wo_pos_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
                                                    configs.dropout)
        # Encoder
        self.encoder = Encoder(
            [
                EncoderLayer(
                    AttentionLayer(
                        FullAttention(False, configs.factor, attention_dropout=configs.dropout,
                                      output_attention=configs.output_attention), configs.d_model, configs.n_heads),
                    configs.d_model,
                    configs.d_ff,
                    dropout=configs.dropout,
                    activation=configs.activation
                ) for l in range(configs.e_layers)
            ],
            norm_layer=torch.nn.LayerNorm(configs.d_model)
        )
        # Decoder
        self.decoder = Decoder(
            [
                DecoderLayer(
                    AttentionLayer(
                        FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                        configs.d_model, configs.n_heads),
                    AttentionLayer(
                        FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                        configs.d_model, configs.n_heads),
                    configs.d_model,
                    configs.d_ff,
                    dropout=configs.dropout,
                    activation=configs.activation,
                )
                for l in range(configs.d_layers)
            ],
            norm_layer=torch.nn.LayerNorm(configs.d_model),
            projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
        )
        self.dct_layer=dct_channel_block(512)
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)

    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):

        enc_out = self.enc_embedding(x_enc, x_mark_enc)
        enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
        # print("enc_out.shape:",enc_out.shape)#enc_out.shape: torch.Size([32, 96, 512])
        #加入dct模块
        # mid  = self.dct_layer(enc_out)
        # enc_out = enc_out+mid
        # enc_out = self.dct_norm(enc_out) #norm 144

        dec_out = self.dec_embedding(x_dec, x_mark_dec)
        dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)

        if self.output_attention:
            return dec_out[:, -self.pred_len:, :], attns
        else:
            return dec_out[:, -self.pred_len:, :]  # [B, L, D]

这段代码实现了一个结合 TransformerDCT(离散余弦变换) 的神经网络模型,主要用于时间序列预测任务。模型结构基于 Transformer 编码器-解码器框架,支持多种嵌入方式,并集成了 DCT 通道块用于增强特征处理。

1. 模型简介

该模型的核心是标准的 Transformer 编码器-解码器结构,具有以下主要模块:

  • 嵌入模块:通过 DataEmbedding 和其变种为输入数据生成嵌入表示。
  • 编码器(Encoder):基于多层自注意力机制的编码器,用于提取输入序列的高维特征。
  • 解码器(Decoder):结合自注意力和交叉注意力的解码器,用于生成预测序列。
  • DCT 模块:用于对编码器输出进行频域增强处理(当前注释掉,作为可选模块)。

2. 嵌入模块

if configs.embed_type == 0:
    self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
    self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
  • DataEmbedding:为输入数据生成嵌入。嵌入方式由 configs.embed_type 控制,支持五种不同类型的嵌入:
    • 0:默认的嵌入(值、时间、位置嵌入的组合)。
    • 1:与 embed_type=0 相同。
    • 2:没有位置嵌入。
    • 3:没有时间嵌入。
    • 4:没有时间和位置嵌入。

3. 编码器(Encoder)

self.encoder = Encoder(
    [
        EncoderLayer(
            AttentionLayer(
                FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=configs.output_attention),
                configs.d_model, configs.n_heads),
            configs.d_model,
            configs.d_ff,
            dropout=configs.dropout,
            activation=configs.activation
        ) for l in range(configs.e_layers)
    ],
    norm_layer=torch.nn.LayerNorm(configs.d_model)
)
  • Encoder:由多个 EncoderLayer 组成,每层包括自注意力机制和前馈神经网络。
  • FullAttention:注意力机制,False 表示不使用掩码(即标准自注意力)。
  • LayerNorm:标准化层,用于稳定训练。

4. 解码器(Decoder)

self.decoder = Decoder(
    [
        DecoderLayer(
            AttentionLayer(
                FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                configs.d_model, configs.n_heads),
            AttentionLayer(
                FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                configs.d_model, configs.n_heads),
            configs.d_model,
            configs.d_ff,
            dropout=configs.dropout,
            activation=configs.activation,
        )
        for l in range(configs.d_layers)
    ],
    norm_layer=torch.nn.LayerNorm(configs.d_model),
    projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
)
  • Decoder:由多个 DecoderLayer 组成,每层结合了自注意力和交叉注意力机制。
  • projection:最后一层是线性层,用于将解码器的输出投影到目标输出维度(configs.c_out)。

5. DCT 通道块

self.dct_layer = dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • dct_channel_block:DCT 模块用于对编码器的输出进行频域变换,但该部分目前被注释掉。
  • dct_norm:LayerNorm 层用于对经过 DCT 的特征进行标准化处理。

6. 前向传播函数

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
            enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):

    enc_out = self.enc_embedding(x_enc, x_mark_enc)
    enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
    
    # DCT 模块(可选)
    # mid  = self.dct_layer(enc_out)
    # enc_out = enc_out + mid
    # enc_out = self.dct_norm(enc_out)

    dec_out = self.dec_embedding(x_dec, x_mark_dec)
    dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)

    if self.output_attention:
        return dec_out[:, -self.pred_len:, :], attns
    else:
        return dec_out[:, -self.pred_len:, :]
  • 输入

    • x_encx_mark_enc 是编码器的输入和时间标记。
    • x_decx_mark_dec 是解码器的输入和时间标记。
    • enc_self_maskdec_self_maskdec_enc_mask 是可选的注意力掩码。
  • 编码器:通过 enc_embeddingencoder 模块处理编码器输入。

  • DCT:注释掉的 DCT 模块可以对编码器输出进行频域处理。

  • 解码器:通过 dec_embeddingdecoder 模块处理解码器输入,并结合编码器的输出进行预测。

  • 输出

    • 如果 output_attentionTrue,返回解码器的输出和注意力矩阵。
    • 否则,只返回解码器的最终预测。

7. 总结

  • 该模型是一个结合 Transformer 和 DCT 的时间序列预测模型。
  • Transformer 编码器-解码器架构使模型能够捕捉复杂的时间依赖关系。
  • DCT 模块作为可选部分,能够增强模型对频域特征的捕捉。
  • 模型支持多种嵌入方式,通过调整 configs.embed_type 来选择不同的嵌入策略。

Linear.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct

import numpy as np
import math
# class Model(nn.Module):SENET for ETTmx

#     """
#     Just one Linear layer
#     """
#     def __init__(self,configs,channel=7,ratio=1):
#         super(Model, self).__init__()

#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(7,14, bias=False),
#                 nn.Dropout(p=0.1),
#                 nn.ReLU(inplace=True) ,
#                 nn.Linear(14,7, bias=False),
#                 nn.Sigmoid()
#         )
#         self.seq_len = configs.seq_len
#         self.pred_len = configs.pred_len

#         self.Linear_More_1 = nn.Linear(self.seq_len,self.pred_len * 2)
#         self.Linear_More_2 = nn.Linear(self.pred_len*2,self.pred_len)
#         self.relu = nn.ReLU()
#         self.gelu = nn.GELU()    

#         self.drop = nn.Dropout(p=0.1)
#         # Use this line if you want to visualize the weights
#        
#     def forward(self, x):
#         # x: [Batch, Input length, Channel]
#   
#         x = x.permute(0,2,1) # (B,L,C)->(B,C,L)
#         b, c, l = x.size() # (B,C,L)
#         y = self.avg_pool(x).view(b, c) # (B,C,L) 
        
        
#         # np.save('f_weight.npy', f_weight_np)
# #         # np.save('%d f_weight.npy' %epoch, f_weight_np)
#         # print("y",y.shape)
#         # return (x * y).permute(0,2,1)
#         return (z).permute(0,2,1)

class my_Layernorm(nn.Module):
    """
    Special designed layernorm for the seasonal part
    """
    def __init__(self, channels):
        super(my_Layernorm, self).__init__()
        self.layernorm = nn.LayerNorm(channels)

    def forward(self, x):
        x_hat = self.layernorm(x)
        bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
        return x_hat - bias
class Model(nn.Module):
        
    def __init__(self,configs,channel=96,ratio=1):
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.fc = nn.Sequential(
                nn.Linear(channel, channel*2, bias=False),
                nn.Dropout(p=0.1),
                nn.ReLU(inplace=True),
                nn.Linear( channel*2, channel, bias=False),
                nn.Sigmoid()
        )
        self.fc_inverse = nn.Sequential(
            nn.Linear(channel, channel//2, bias=False),
            nn.Dropout(p=0.1),
            nn.ReLU(inplace=True),
            nn.Linear( channel//2, channel, bias=False),
            nn.Sigmoid()
        )
        # self.fc_plot = nn.Linear(channel, channel, bias=False)
        self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)

        self.Linear = nn.Linear(self.seq_len, self.pred_len)
        self.Linear_1 = nn.Linear(self.seq_len, self.pred_len)
        # self.dct_norm = nn.LayerNorm([self.channel_num], eps=1e-6)
        self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
        # self.my_layer_norm = nn.LayerNorm([96], eps=1e-6)
    def forward(self, x):
        x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL

        
       
        b, c, l = x.size() # (B,C,L)
        list = []
        
        for i in range(c):#i represent channel 
            freq=dct.dct(x[:,i,:])     #dct
            # print("freq-shape:",freq.shape)
            list.append(freq)
         
        
        stack_dct=torch.stack(list,dim=1) 
        stack_dct = torch.tensor(stack_dct)#(B,L,C)
        
        stack_dct = self.dct_norm(stack_dct)#matters for traffic
        f_weight = self.fc(stack_dct)
        f_weight = self.dct_norm(f_weight)#matters for traffic
        


        #visualization for fecam tensor
        f_weight_cpu = f_weight
        
        f_weight_np = f_weight_cpu.cpu().detach().numpy()
        
        np.save('f_weight_weather_wf.npy', f_weight_np)
        # np.save('%d f_weight.npy' %epoch, f_weight_np)


        




        # f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic
        # result = self.Linear(x)#forL
        
        # f_weight_np = result.cpu().detach().numpy()
        
        # np.save('f_weight.npy', f_weight_np)
        # x = x.permute(0,2,1)
        # result = self.Linear((x *(f_weight_inverse)))#forL 
        result = self.Linear((x *(f_weight)))#forL
        return  result.permute(0,2,1)
        
        


这段代码实现了一个时间序列预测模型,模型结合了 DCT(离散余弦变换)线性层 来对输入的序列进行处理,并输出预测结果。模型在数据处理中引入了频域变换,并对频域特征进行了加权处理。以下是详细的解释:

1. my_Layernorm

class my_Layernorm(nn.Module):
    """
    Special designed layernorm for the seasonal part
    """
    def __init__(self, channels):
        super(my_Layernorm, self).__init__()
        self.layernorm = nn.LayerNorm(channels)

    def forward(self, x):
        x_hat = self.layernorm(x)
        bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
        return x_hat - bias
  • 作用:实现了一种特殊的 LayerNorm 层,它对每个样本进行标准化,并减去每个样本的均值,目的是更好地处理序列中具有季节性或周期性的特征。

2. 模型 Model 初始化

class Model(nn.Module):
    def __init__(self, configs, channel=96, ratio=1):
        super(Model, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.fc = nn.Sequential(
            nn.Linear(channel, channel*2, bias=False),
            nn.Dropout(p=0.1),
            nn.ReLU(inplace=True),
            nn.Linear(channel*2, channel, bias=False),
            nn.Sigmoid()
        )
        self.fc_inverse = nn.Sequential(
            nn.Linear(channel, channel//2, bias=False),
            nn.Dropout(p=0.1),
            nn.ReLU(inplace=True),
            nn.Linear(channel//2, channel, bias=False),
            nn.Sigmoid()
        )
        self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)
        self.Linear = nn.Linear(self.seq_len, self.pred_len)
        self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
  • fcfc_inverse:两个线性层用于调整输入通道的权重。fc 是增大通道维度的网络,fc_inverse 是缩小通道维度的网络,主要用于特征的重整和处理。
  • mid_LinearLinear:分别用于序列长度的映射。mid_Linear 用于中间的特征处理,Linear 将输入的序列长度从 seq_len 映射到 pred_len
  • dct_norm:用于对经过 DCT 处理的频域特征进行标准化,以确保数据分布稳定,减少模型训练的波动。

3. 前向传播 forward

def forward(self, x):
    x = x.permute(0, 2, 1)  # 调整输入维度 (B, L, C) -> (B, C, L)

    b, c, l = x.size()  # 获取输入数据的维度 (Batch, Channel, Length)
    list = []

    # 对每个通道的输入数据进行 DCT
    for i in range(c):
        freq = dct.dct(x[:, i, :])  # 对第 i 个通道应用 DCT
        list.append(freq)
    
    # 将各通道的 DCT 结果堆叠在一起
    stack_dct = torch.stack(list, dim=1)
    stack_dct = torch.tensor(stack_dct)

    # 对 DCT 结果进行 LayerNorm 归一化
    stack_dct = self.dct_norm(stack_dct)
    
    # 通过全连接层对 DCT 结果进行处理
    f_weight = self.fc(stack_dct)
    f_weight = self.dct_norm(f_weight)

    # 保存处理后的权重用于可视化
    f_weight_cpu = f_weight
    f_weight_np = f_weight_cpu.cpu().detach().numpy()
    np.save('f_weight_weather_wf.npy', f_weight_np)

    # 将原始输入和经过处理的 DCT 权重相乘,并通过线性层进行序列预测
    result = self.Linear(x * f_weight)  # 结合权重后的输入数据进行预测
    return result.permute(0, 2, 1)  # 调整输出维度回 (B, L, C)
前向传播步骤:
  1. 维度调整:通过 permute 将输入的维度从 (B, L, C) 转换为 (B, C, L),方便后续进行 1D 操作。
  2. DCT 处理:对每个通道的数据应用 DCT(离散余弦变换),捕捉数据的频域特征。
  3. 堆叠 DCT 结果:将每个通道的 DCT 结果堆叠在一起,形成一个新的张量。
  4. 标准化:通过 LayerNorm 对 DCT 结果进行标准化,确保特征的稳定性。
  5. 全连接层处理:通过全连接层调整 DCT 结果的权重,提取出更高层次的频域特征。
  6. 可视化保存:将处理后的权重保存到 .npy 文件中,用于后续的可视化分析。
  7. 预测结果生成:将输入数据和处理后的权重相乘,然后通过线性层输出预测结果。
  8. 维度恢复:最后将输出的维度恢复为 (B, L, C),与输入数据的形状保持一致。

4. 总结

  • DCT 应用:该模型在每个通道上应用了 DCT,能够提取出输入数据中的频域特征,这对于具有周期性或频率特征的时间序列数据非常有效。
  • 全连接层权重调整:通过全连接层对 DCT 特征进行加权处理,有助于模型学习不同频率成分对预测的影响。
  • 标准化:使用 LayerNorm 保证频域特征在不同通道之间的平衡,避免某些通道的特征在模型中过度放大或缩小。

这个模型结合了 DCT 和神经网络的优势,通过频域特征处理提高了对复杂时间序列数据的预测能力,特别适合具有周期性变化的时间序列任务。


 

run_longExp.py代码:

这段代码是一个基于命令行参数的实验框架,用于时间序列预测任务,使用了 Transformer 系列模型(如 Autoformer、Informer、Transformer)。通过命令行参数,可以灵活调整模型、数据集、超参数等配置,执行训练、测试和预测流程。


参考:

用于时间序列预测的频率增强信道注意力机制(Python代码实现)_fecam-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/882564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

博睿谷IT认证-订阅试学习

在这个信息爆炸的时代&#xff0c;拥有一张IT认证证书&#xff0c;就像拿到了职场晋升的通行证。博睿谷&#xff0c;作为IT认证培训的佼佼者&#xff0c;帮你轻松拿下华为、Oracle等热门认证。下面&#xff0c;让我们一起看看博睿谷如何助你一臂之力。 学习时间&#xff0c;你说…

巨潮股票爬虫逆向

目标网站 aHR0cDovL3dlYmFwaS5jbmluZm8uY29tLmNuLyMvSVBPTGlzdD9tYXJrZXQ9c3o 一、抓包分析 请求头参数加密 二、逆向分析 下xhr断点 参数生成位置 发现是AES加密&#xff0c;不过是混淆的&#xff0c;但并不影响咱们扣代码 文章仅提供技术交流学习&#xff0c;不可对目标服…

脱离枯燥的CRUD,灵活使用Mybatis,根据mybatis动态的xml片段和接口规范动态生成代理类,轻松应付简单业务场景。

需求 需求是这样的&#xff0c;我们有一个数据服务平台的产品&#xff0c;用户先将数据源信息保存到平台上&#xff0c;一个数据源可以提供多个接口服务&#xff0c;而每个接口服务在数据库中存一个具有mybatis语法的sql片段。这样的话&#xff0c;对于一些简单的业务只需要编…

Linux 文件权限详解与管理

文章目录 前言一、文件权限概述1. 权限表示格式2. 权限组合值 二、查看文件权限三、修改文件所有者与所属组1. 使用 chown 修改文件所有者2. 使用 chgrp 修改文件所属组3. 添加所有者 四、修改文件权限1. 符号方式2. 八进制方式3. 实际修改 总结 前言 在 Linux 系统中&#xf…

香港科技大学广州|金融科技学域博士招生宣讲会——武汉大学、华中科技大学

&#x1f514;&#x1f514;&#x1f514;明日宣讲&#x1f514;&#x1f514;&#x1f514; &#x1f490;香港科技大学广州&#xff5c;金融科技学域博士招生宣讲会 &#x1f4cd;武汉大学专场 &#x1f559;时间&#xff1a;2024年9月24日&#xff08;星期二&#xff09;1…

Java项目实战II基于Java+Spring Boot+MySQL的洗衣店订单管理系统(开发文档+源码+数据库)

目录 一、前言 二、技术介绍 三、系统实现 四、论文参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者 一、前言 随着生活节奏的加快&#xff0c;现代人对便捷、高效服务的需求日益增长&#xff0c;洗衣店作为日常生…

11 - TCPClient实验

在上一个章节的UDP通信测试中&#xff0c;尽管通信的实现过程相对简洁&#xff0c;但出现了通信数据丢包的问题。因此&#xff0c;本章节将基于之前建立的WIFI网络连接&#xff0c;构建一个基础的TCPClient连接机制。我们利用网络调试助手工具来发送数据&#xff0c;测试网络通…

【图虫创意-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

力扣 困难 154.寻找旋转排序数组中的最小值 II

文章目录 题目介绍题解 题目介绍 题解 题源&#xff1a; 153.寻找旋转排序数组中的最小值 在此基础上&#xff0c;进行二分之前&#xff0c;单独处理一下左指针和最后一个数相同的情况就好了。 class Solution {public int findMin(int[] nums) {int left 0, right nums.le…

conda 虚拟环境安装GDAL

一. 背景 换了新电脑&#xff0c;要重新安装GDAL。从前是下了GDAL的.wheel文件用pip安装&#xff0c;但平时下轮子的网站现在都打不开&#xff0c;比如https://www.lfd.uci.edu/~gohlke/pythonlibs/#gdal&#xff0c;不晓得为什么。 后面看了这篇教程解决了问题&#xff08;h…

Codeforces Round 973 (Div. 2) - D题

传送门&#xff1a;Problem - D - Codeforces 题目大意&#xff1a; 思路&#xff1a; 尽量要 最大值变小&#xff0c;最小值变大 即求 最大值的最小 和 最小值的最大 -> 二分答案 AC代码&#xff1a; 代码有注释 #include<bits/stdc.h> using namespace std; #…

C++模拟实现list:list、list类的初始化和尾插、list的迭代器的基本实现、list的完整实现、测试、整个list类等的介绍

文章目录 前言一、list二、list类的初始化和尾插三、list的迭代器的基本实现四、list的完整实现五、测试六、整个list类总结 前言 C模拟实现list&#xff1a;list、list类的初始化和尾插、list的迭代器的基本实现、list的完整实现、测试、整个list类等的介绍 一、list list本…

计算机网络34——Windows内存管理

1、计算机体系结构 2、内存管理 分为连续分配管理和非连续分配管理 在块内存在的未使用空间叫内部碎片&#xff0c;在块外存在的未使用空间叫外部碎片 固定分区分配可能出现内部碎片&#xff0c;动态分区分配可能出现外部碎片 3、逻辑地址和实际地址的互相转换 4、缺页中断 …

k8s中,pod生命周期,初始化容器,容器探针,事件处理函数,理解其设计思路及作用

k8s中&#xff0c;为什么要设计pod 平台直接管理容器不是挺好的吗 为什么要以pod为单位进行管理&#xff0c; 然后把容器放在pod里面 那么有pod和没pod的区别是什么 也就是pod提供了什么作用 这个可以考虑从pod生命周期管理的角度去思考 如图&#xff0c;pod主容器在运行…

JAVA并发编程系列(10)Condition条件队列-并发协作者

一线大厂面试真题&#xff0c;模拟消费者-生产者场景。 同样今天的分享&#xff0c;我们不纸上谈兵&#xff0c;也不空谈八股文。以实际面经、工作实战经验进行开题&#xff0c;然后再剖析核心源码原理。 按常见面经要求&#xff0c;生产者生产完指定数量产品后&#xff0c;才能…

文档矫正算法:DocTr++

文档弯曲矫正&#xff08;Document Image Rectification&#xff09;的主要作用是在图像处理领域中&#xff0c;对由于拍摄、扫描或打印过程中产生的弯曲、扭曲文档进行校正&#xff0c;使其恢复为平整、易读的形态。 一. 论文和代码 论文地址&#xff1a;https://arxiv.org/…

LDRA Testbed(TBrun)软件单元测试_常见问题及处理

系列文章目录 LDRA Testbed软件静态分析_操作指南 LDRA Testbed软件静态分析_自动提取静态分析数据生成文档 LDRA Testbed软件静态分析_Jenkins持续集成&#xff08;自动静态分析并用邮件自动发送分析结果&#xff09; LDRA Testbed软件静态分析_软件质量度量 LDRA Testbed软件…

POI操作EXCEL增加下拉框

文章目录 POI操作EXCEL增加下拉框 POI操作EXCEL增加下拉框 有时候通过excel将数据批量导入到系统&#xff0c;而业务操作人员对于一些列不想手动输入&#xff0c;而是采用下拉框的方式来进行选择 采用隐藏sheet页的方式来进行操作 String sheetName "supplier_hidden_s…

Python记录

1.冒泡排序 时间复杂度O&#xff08;n^2) 选择、插入都是 def bubble(data, reverse):for i in range(len(data)-1):for j in range(len(data)-i-1):if data[j] > data[j1]:data[j], data[j1] data[j1], data[j]if reverse:data.reverse()return data 2.快速排序 时间…

基于深度学习的文本情感原因提取研究综述——论文阅读

前言 既然要学习情感分析&#xff0c;那么肯定还要了解情感原因对抽取的发展历程&#xff0c;所以我又搜了一篇研究综述&#xff0c;虽然是2023年发表的&#xff0c;但是里面提及到的历程仅停留到2022年。这篇综述发布在TASLP期刊&#xff0c;是音频、声学、语言信号处理的顶级…