本文提出了一种新的频率增强通道注意力机制(FECAM),旨在解决时间序列预测中傅里叶变换因吉布斯现象导致的高频噪声问题。FECAM基于离散余弦变换,能自适应地模拟信道间的频率依赖性,有效避免预测误差。实验显示,该机制可提升多种主流模型的预测性能,如LSTM、Reformer、Informer和Autoformer等。
时间序列预测是一个长期存在的挑战,因为真实世界的信息处于各种场景(例如,能源,天气,交通,经济,地震预警)。然而,一些主流的预测模型预测结果与实际情况大相径庭。我们认为,这是模型缺乏捕获真实世界数据集中丰富包含的频率信息的能力的原因。目前主流的频率信息提取方法都是基于傅里叶变换(FT)的。然而,由于吉布斯现象,FT的使用是有问题的。如果序列两侧的值差异很大,则在两侧周围观察到振荡近似,并且会引入高频噪声。因此,我们提出了一种新的频率增强信道注意力,它基于离散余弦变换自适应地模拟信道之间的频率相互依赖性,这将从本质上避免傅里叶变换过程中有问题的周期引起的高频噪声,这被定义为吉布斯现象。我们证明了该网络在六个真实数据集中的泛化非常有效,并实现了最先进的性能,我们进一步证明了频率增强信道注意力机制模块可以灵活地应用于不同的网络。该模块可以提高现有主流网络的预测能力,只需几行代码,即可在LSTM上降低35.99%的MSE,在Reformer上降低10.01%,在Informer上降低8.71%,在Autoformer上降低8.29%,在Transformer上降低8.06%等,计算成本很小。
这篇论文提出了一种基于离散余弦变换的频率增强通道注意机制(FECAM),通过捕捉时间序列中的频率信息来提升主流深度学习模型的预测能力,有效避免傅里叶变换引入的高频噪声,显著提升了多种时间序列预测任务的性能。
SENET(channel attention)
FECAM(Frequency Enhanced Channel Attention Mechanism)
As a module to enhance the frequency domain modeling capability of transformers and LSTM
下面对其部分源码进行解读:
SlefAttention_Family.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
# from masking import TriangularCausalMask, ProbMask#for parameter count
import os
class FullAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(FullAttention, self).__init__()
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
def forward(self, queries, keys, values, attn_mask):
B, L, H, E = queries.shape
_, S, _, D = values.shape
scale = self.scale or 1. / sqrt(E)
scores = torch.einsum("blhe,bshe->bhls", queries, keys)
if self.mask_flag:
if attn_mask is None:
attn_mask = TriangularCausalMask(B, L, device=queries.device)
scores.masked_fill_(attn_mask.mask, -np.inf)
A = self.dropout(torch.softmax(scale * scores, dim=-1))
V = torch.einsum("bhls,bshd->blhd", A, values)
if self.output_attention:
return (V.contiguous(), A)
else:
return (V.contiguous(), None)
class ProbAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(ProbAttention, self).__init__()
self.factor = factor
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
def _prob_QK(self, Q, K, sample_k, n_top): # n_top: c*ln(L_q)
# Q [B, H, L, D]
B, H, L_K, E = K.shape
_, _, L_Q, _ = Q.shape
# calculate the sampled Q_K
K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
index_sample = torch.randint(L_K, (L_Q, sample_k)) # real U = U_part(factor*ln(L_k))*L_q
K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()
# find the Top_k query with sparisty measurement
M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
M_top = M.topk(n_top, sorted=False)[1]
# use the reduced Q to calculate Q_K
Q_reduce = Q[torch.arange(B)[:, None, None],
torch.arange(H)[None, :, None],
M_top, :] # factor*ln(L_q)
Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1)) # factor*ln(L_q)*L_k
return Q_K, M_top
def _get_initial_context(self, V, L_Q):
B, H, L_V, D = V.shape
if not self.mask_flag:
# V_sum = V.sum(dim=-2)
V_sum = V.mean(dim=-2)
contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
else: # use mask
assert (L_Q == L_V) # requires that L_Q == L_V, i.e. for self-attention only
contex = V.cumsum(dim=-2)
return contex
def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
B, H, L_V, D = V.shape
if self.mask_flag:
attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
scores.masked_fill_(attn_mask.mask, -np.inf)
attn = torch.softmax(scores, dim=-1) # nn.Softmax(dim=-1)(scores)
context_in[torch.arange(B)[:, None, None],
torch.arange(H)[None, :, None],
index, :] = torch.matmul(attn, V).type_as(context_in)
if self.output_attention:
attns = (torch.ones([B, H, L_V, L_V]) / L_V).type_as(attn).to(attn.device)
attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
return (context_in, attns)
else:
return (context_in, None)
def forward(self, queries, keys, values, attn_mask):
B, L_Q, H, D = queries.shape
_, L_K, _, _ = keys.shape
queries = queries.transpose(2, 1)
keys = keys.transpose(2, 1)
values = values.transpose(2, 1)
U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k)
u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q)
U_part = U_part if U_part < L_K else L_K
u = u if u < L_Q else L_Q
scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)
# add scale factor
scale = self.scale or 1. / sqrt(D)
if scale is not None:
scores_top = scores_top * scale
# get the context
context = self._get_initial_context(values, L_Q)
# update the context with selected top_k queries
context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
return context.contiguous(), attn
class AttentionLayer(nn.Module):
def __init__(self, attention, d_model, n_heads, d_keys=None,
d_values=None):
super(AttentionLayer, self).__init__()
d_keys = d_keys or (d_model // n_heads)
d_values = d_values or (d_model // n_heads)
self.inner_attention = attention
self.query_projection = nn.Linear(d_model, d_keys * n_heads)
self.key_projection = nn.Linear(d_model, d_keys * n_heads)
self.value_projection = nn.Linear(d_model, d_values * n_heads)
self.out_projection = nn.Linear(d_values * n_heads, d_model)
self.n_heads = n_heads
def forward(self, queries, keys, values, attn_mask):
B, L, _ = queries.shape
_, S, _ = keys.shape
H = self.n_heads
queries = self.query_projection(queries).view(B, L, H, -1)
keys = self.key_projection(keys).view(B, S, H, -1)
values = self.value_projection(values).view(B, S, H, -1)
out, attn = self.inner_attention(
queries,
keys,
values,
attn_mask
)
out = out.view(B, L, -1)
return self.out_projection(out), attn
这个代码实现了一个注意力机制模块,主要包括了两种注意力机制(全局注意力 FullAttention 和 概率注意力 ProbAttention),以及一个常规的 AttentionLayer 用于处理多头注意力。以下是详细的解释:
1. 基础导入
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
torch
、torch.nn
和torch.nn.functional
是 PyTorch 的核心模块,用于定义和操作神经网络结构。matplotlib.pyplot
和numpy
是常用的 Python 库,用于绘图和数值计算。TriangularCausalMask
和ProbMask
用于为注意力机制提供掩码(masking),以确保特定位置的注意力权重被正确计算。
2. FullAttention 类(全局注意力)
class FullAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(FullAttention, self).__init__()
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
FullAttention
实现了全局自注意力机制。它通过queries
和keys
计算注意力权重。mask_flag
:决定是否应用掩码。scale
:对注意力分数进行缩放。attention_dropout
:为防止过拟合,应用 Dropout。
forward
函数:
def forward(self, queries, keys, values, attn_mask):
B, L, H, E = queries.shape
_, S, _, D = values.shape
scale = self.scale or 1. / sqrt(E)
scores = torch.einsum("blhe,bshe->bhls", queries, keys)
if self.mask_flag:
if attn_mask is None:
attn_mask = TriangularCausalMask(B, L, device=queries.device)
scores.masked_fill_(attn_mask.mask, -np.inf)
A = self.dropout(torch.softmax(scale * scores, dim=-1))
V = torch.einsum("bhls,bshd->blhd", A, values)
return (V.contiguous(), A if self.output_attention else None)
torch.einsum
:使用爱因斯坦求和约定来进行矩阵乘法,计算queries
和keys
的相似度。mask_flag
:当使用掩码时,将填充-np.inf
来忽略特定位置的注意力分数。softmax
:用于将分数转换为概率分布。V
:是使用注意力分数和values
计算出的加权和。
3. ProbAttention 类(概率注意力)
class ProbAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(ProbAttention, self).__init__()
self.factor = factor
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
ProbAttention
实现了一种基于稀疏注意力的机制。通过概率的方法减少计算量,特别适用于长序列的处理。factor
:控制采样和稀疏度。scale
和attention_dropout
类似于FullAttention
中的作用。
_prob_QK
函数:
def _prob_QK(self, Q, K, sample_k, n_top):
K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
index_sample = torch.randint(L_K, (L_Q, sample_k))
K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()
M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
M_top = M.topk(n_top, sorted=False)[1]
Q_reduce = Q[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], M_top, :]
Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))
return Q_K, M_top
- 目的:通过抽样计算
Q
和K
的相似度,从而减少全局计算开销。 - 采样:通过从
K
中随机选择一部分,并与Q
进行计算,得出最具代表性的Top_k
查询,减少计算负担。
4. AttentionLayer 类
class AttentionLayer(nn.Module):
def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None):
super(AttentionLayer, self).__init__()
self.inner_attention = attention
self.query_projection = nn.Linear(d_model, d_keys * n_heads)
self.key_projection = nn.Linear(d_model, d_keys * n_heads)
self.value_projection = nn.Linear(d_model, d_values * n_heads)
self.out_projection = nn.Linear(d_values * n_heads, d_model)
self.n_heads = n_heads
AttentionLayer
是多头注意力层,将输入的queries
、keys
和values
进行线性变换,然后通过注意力机制处理。d_model
:输入特征的维度。n_heads
:多头注意力的头数。inner_attention
:可以是FullAttention
或ProbAttention
。
forward
函数:
def forward(self, queries, keys, values, attn_mask):
B, L, _ = queries.shape
H = self.n_heads
queries = self.query_projection(queries).view(B, L, H, -1)
keys = self.key_projection(keys).view(B, S, H, -1)
values = self.value_projection(values).view(B, S, H, -1)
out, attn = self.inner_attention(
queries,
keys,
values,
attn_mask
)
out = out.view(B, L, -1)
return self.out_projection(out), attn
- 线性变换:
queries
、keys
和values
都通过线性层进行投影,转换为适应多头注意力的形状。 - 多头注意力:将
queries
、keys
和values
分成多个头来计算注意力。 - 输出投影:最后通过
out_projection
投影到原始维度。
总结
- 全局注意力(
FullAttention
)计算每对queries
和keys
的相似度,用于标准的 Transformer 模型。 - 概率注意力(
ProbAttention
)通过稀疏采样减少计算复杂度,特别适用于长序列输入。 AttentionLayer
实现了一个多头注意力机制,可以选择全局注意力或概率注意力来处理输入。
这段代码的核心是实现不同的注意力机制,主要用于序列到序列的建模任务,例如时间序列预测或自然语言处理任务。
在 PyTorch 中,
forward
函数是每个自定义神经网络模块(nn.Module
)的核心方法,它定义了模型的前向传播过程。前向传播是指输入数据如何经过网络层,逐步计算出输出结果的过程。
forward
函数的主要作用:
定义模型的计算逻辑:
forward
函数负责具体的计算过程。每当你将输入传入模型时,模型会自动调用forward
函数来执行从输入到输出的所有计算。在
forward
函数中,你将定义:
- 数据如何流经每一层(如卷积层、全连接层、注意力机制等)。
- 应用哪些激活函数、正则化(如
BatchNorm
、Dropout
)等操作。- 最终如何产生输出。
将输入映射到输出:
forward
函数的核心任务是将输入数据(张量)传递到各层并生成输出。例如,在卷积神经网络中,输入张量会经过卷积层、池化层、激活函数等模块,最后输出结果。自动执行反向传播: 虽然
forward
仅定义前向传播过程,但它与自动微分(autograd
)紧密相连。当你调用loss.backward()
时,PyTorch 会自动根据forward
中的操作来计算梯度,进而进行反向传播和参数更新。
Transformer_EncDec.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.dctnet import dct_channel_block, dct
# from dctnet import dct_channel_block, dct
class ConvLayer(nn.Module):
def __init__(self, c_in):
super(ConvLayer, self).__init__()
self.downConv = nn.Conv1d(in_channels=c_in,
out_channels=c_in,
kernel_size=3,
padding=2,
padding_mode='circular')
self.norm = nn.BatchNorm1d(c_in)
self.activation = nn.ELU()
self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
def forward(self, x):
x = self.downConv(x.permute(0, 2, 1))
x = self.norm(x)
x = self.activation(x)
x = self.maxPool(x)
x = x.transpose(1, 2)
return x
class EncoderLayer(nn.Module):
def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
super(EncoderLayer, self).__init__()
d_ff = d_ff or 4 * d_model
self.attention = attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
self.dct_layer=dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
def forward(self, x, attn_mask=None):
# print(x.shape)
#torch.Size([32, 96, 512])
#torch.Size([32, 49, 512])
'''
before self-attention
'''
# mid = self.dct_layer(x)#
# x = x+mid
new_x, attn = self.attention(
x, x, x,
attn_mask=attn_mask
)
x = x + self.dropout(new_x)
y = x = self.norm1(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
# print("y.shape:",y.shape)#y.shape: torch.Size([32, 96, 512])#为了减少计算量,不然要做512次L=96的dct
#加入到module
result = self.norm2(x + y)
# mid = self.dct_layer(result)
# result = result+mid
# result = self.dct_norm(result) #norm 144
return result, attn
# return self.norm2(x + y), attn
class Encoder(nn.Module):
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(Encoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
def forward(self, x, attn_mask=None):
# x [B, L, D]
attns = []
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x)
attns.append(attn)
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
if self.norm is not None:
x = self.norm(x)
return x, attns
class DecoderLayer(nn.Module):
def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
dropout=0.1, activation="relu"):
super(DecoderLayer, self).__init__()
d_ff = d_ff or 4 * d_model
self.self_attention = self_attention
self.cross_attention = cross_attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
# self.dct_layer=dct_channel_block(512)
# self.dct_norm = nn.LayerNorm([512], eps=1e-6)
def forward(self, x, cross, x_mask=None, cross_mask=None):
x = x + self.dropout(self.self_attention(
x, x, x,
attn_mask=x_mask
)[0])
x = self.norm1(x)
x = x + self.dropout(self.cross_attention(
x, cross, cross,
attn_mask=cross_mask
)[0])
y = x = self.norm2(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
# result = self.norm3(x + y)
# mid = self.dct_layer(result)
# result = result+mid
# result = self.dct_norm(result) #norm 144
# return result
# return self.norm3(x + y)
return self.norm3(x + y)
class Decoder(nn.Module):
def __init__(self, layers, norm_layer=None, projection=None):
super(Decoder, self).__init__()
self.layers = nn.ModuleList(layers)
self.norm = norm_layer
self.projection = projection
def forward(self, x, cross, x_mask=None, cross_mask=None):
for layer in self.layers:
x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)
if self.norm is not None:
x = self.norm(x)
if self.projection is not None:
x = self.projection(x)
return x
这段代码定义了一个类似 Transformer 的神经网络架构,包含卷积层、编码器层和解码器层。它可以用于处理序列数据(如时间序列、自然语言处理等)。具体包括以下组件:
1. ConvLayer(卷积层):
class ConvLayer(nn.Module):
def __init__(self, c_in):
super(ConvLayer, self).__init__()
self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=2, padding_mode='circular')
self.norm = nn.BatchNorm1d(c_in)
self.activation = nn.ELU()
self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
- 这个卷积层首先对输入进行 1D 卷积,然后进行批归一化(BatchNorm)和 ELU 激活,再通过最大池化层减少特征维度。
- 作用:将高维的输入进行降采样,同时提取局部特征。
2. EncoderLayer(编码器层)
class EncoderLayer(nn.Module):
def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
super(EncoderLayer, self).__init__()
d_ff = d_ff or 4 * d_model
self.attention = attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
self.dct_layer = dct_channel_block(512) # DCT增强
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
- 作用:这个层结合了注意力机制、自注意力(self-attention)、卷积层(用于前馈网络),并且通过
LayerNorm
和 Dropout 进行正则化。它还引入了DCT
(离散余弦变换)来增强输入的特征处理。 - 自注意力机制:通过
attention
来捕获不同位置之间的相关性。 - 卷积网络:通过卷积进一步对特征进行变换和学习。
3. Encoder(编码器)
class Encoder(nn.Module):
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(Encoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
- 作用:这个类是编码器的整体结构,由多个
EncoderLayer
组成。如果有卷积层(conv_layers
),会在每次注意力计算后对输入进行卷积操作。 - 流程:输入经过每一层的注意力层,最后通过
LayerNorm
进行归一化。
4. DecoderLayer(解码器层)
class DecoderLayer(nn.Module):
def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
super(DecoderLayer, self).__init__()
d_ff = d_ff or 4 * d_model
self.self_attention = self_attention
self.cross_attention = cross_attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
- 作用:解码器层包括了自注意力(
self_attention
)和交叉注意力(cross_attention
)。自注意力用于捕捉解码器内部的依赖关系,而交叉注意力用于将编码器的输出与解码器的输入结合。 - 卷积层和归一化:使用卷积和归一化对注意力层的输出进一步处理。
5. Decoder(解码器)
class Decoder(nn.Module):
def __init__(self, layers, norm_layer=None, projection=None):
super(Decoder, self).__init__()
self.layers = nn.ModuleList(layers)
self.norm = norm_layer
self.projection = projection
- 作用:解码器由多个
DecoderLayer
组成。输入首先通过每一层的解码器,结合来自编码器的输出,最后通过归一化和可选的投影层生成最终的输出。
6. 前向传播 (forward) 函数
每个模块都有一个 forward
函数,定义了数据在该模块中的流动过程。
ConvLayer.forward
def forward(self, x):
x = self.downConv(x.permute(0, 2, 1))
x = self.norm(x)
x = self.activation(x)
x = self.maxPool(x)
x = x.transpose(1, 2)
return x
- 操作顺序:
- 输入经过一维卷积处理。
- 批归一化。
- 激活函数 ELU。
- 最大池化。
- 变换维度并返回输出。
EncoderLayer.forward
def forward(self, x, attn_mask=None):
new_x, attn = self.attention(x, x, x, attn_mask=attn_mask)
x = x + self.dropout(new_x)
y = x = self.norm1(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
result = self.norm2(x + y)
return result, attn
- 操作顺序:
- 输入通过注意力层进行自注意力计算。
- 经过残差连接和 Dropout。
- 进行 LayerNorm 归一化。
- 输入通过前馈卷积网络。
- 最后输出经过归一化的结果。
DecoderLayer.forward
def forward(self, x, cross, x_mask=None, cross_mask=None):
x = x + self.dropout(self.self_attention(x, x, x, attn_mask=x_mask)[0])
x = self.norm1(x)
x = x + self.dropout(self.cross_attention(x, cross, cross, attn_mask=cross_mask)[0])
y = x = self.norm2(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
return self.norm3(x + y)
- 操作顺序:
- 输入通过自注意力计算。
- 进行 LayerNorm 归一化。
- 再通过交叉注意力与编码器的输出结合。
- 经过前馈卷积层,最后输出结果。
总结
- 卷积层 用于特征提取和降采样。
- 编码器 和 解码器 使用注意力机制来捕捉不同时间步之间的依赖关系,并通过卷积层进行进一步的特征处理。
- 前向传播 函数定义了每个模块内部的数据流动和处理逻辑,结合了卷积、注意力机制和正则化。
Embed.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
import math
class PositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEmbedding, self).__init__()
# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model).float()
pe.require_grad = False
position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return self.pe[:, :x.size(1)]
class TokenEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(TokenEmbedding, self).__init__()
padding = 1 if torch.__version__ >= '1.5.0' else 2
self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
kernel_size=3, padding=padding, padding_mode='circular', bias=False)
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')
def forward(self, x):
x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
return x
class FixedEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(FixedEmbedding, self).__init__()
w = torch.zeros(c_in, d_model).float()
w.require_grad = False
position = torch.arange(0, c_in).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
w[:, 0::2] = torch.sin(position * div_term)
w[:, 1::2] = torch.cos(position * div_term)
self.emb = nn.Embedding(c_in, d_model)
self.emb.weight = nn.Parameter(w, requires_grad=False)
def forward(self, x):
return self.emb(x).detach()
class TemporalEmbedding(nn.Module):
def __init__(self, d_model, embed_type='fixed', freq='h'):
super(TemporalEmbedding, self).__init__()
minute_size = 4
hour_size = 24
weekday_size = 7
day_size = 32
month_size = 13
Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
if freq == 't':
self.minute_embed = Embed(minute_size, d_model)
self.hour_embed = Embed(hour_size, d_model)
self.weekday_embed = Embed(weekday_size, d_model)
self.day_embed = Embed(day_size, d_model)
self.month_embed = Embed(month_size, d_model)
def forward(self, x):
x = x.long()
minute_x = self.minute_embed(x[:, :, 4]) if hasattr(self, 'minute_embed') else 0.
hour_x = self.hour_embed(x[:, :, 3])
weekday_x = self.weekday_embed(x[:, :, 2])
day_x = self.day_embed(x[:, :, 1])
month_x = self.month_embed(x[:, :, 0])
return hour_x + weekday_x + day_x + month_x + minute_x
class TimeFeatureEmbedding(nn.Module):
def __init__(self, d_model, embed_type='timeF', freq='h'):
super(TimeFeatureEmbedding, self).__init__()
freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
d_inp = freq_map[freq]
self.embed = nn.Linear(d_inp, d_model, bias=False)
def forward(self, x):
return self.embed(x)
class DataEmbedding(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
return self.dropout(x)
class DataEmbedding_wo_pos(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding_wo_pos, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
x = self.value_embedding(x) + self.temporal_embedding(x_mark)
return self.dropout(x)
class DataEmbedding_wo_pos_temp(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding_wo_pos_temp, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
x = self.value_embedding(x)
return self.dropout(x)
class DataEmbedding_wo_temp(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding_wo_temp, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
x = self.value_embedding(x) + self.position_embedding(x)
return self.dropout(x)
这段代码实现了一系列嵌入层(Embedding Layers),主要用于将输入数据(如时间序列数据)转换为适合模型处理的向量表示。这种嵌入是序列建模中非常重要的一部分,特别是在处理时间序列数据时。
主要模块解释
-
PositionalEmbedding(位置嵌入)
- 作用:为每个序列位置编码一个向量,使得模型能够感知序列中每个位置的顺序。常用于 Transformer 模型中,因为这种模型本身不具备位置信息的感知能力。
- 实现细节:
- 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用
sin
和cos
。 pe
是一个预计算的二维张量,包含最大序列长度(max_len
)和每个位置的嵌入(d_model
的维度)。
- 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用
-
TokenEmbedding(令牌嵌入)
- 作用:将输入的数据通过一维卷积进行嵌入。可以理解为将输入的多维数据映射到一个高维的特征空间。
- 实现细节:
- 使用 1D 卷积将输入通道数(
c_in
)转换为模型维度(d_model
)。 - 初始化权重时使用
Kaiming
初始化方法,适合 ReLU 和 LeakyReLU 激活函数。
- 使用 1D 卷积将输入通道数(
-
FixedEmbedding(固定嵌入)
- 作用:通过固定的正弦和余弦函数生成嵌入,类似于位置嵌入,但它是用于固定长度的输入数据。
- 实现细节:
- 每个位置的奇数位使用
sin
,偶数位使用cos
。 - 权重矩阵
w
是固定的,不参与训练。
- 每个位置的奇数位使用
-
TemporalEmbedding(时间嵌入)
- 作用:为时间特征(如小时、天、月份等)生成嵌入,特别适用于处理时间序列数据。
- 实现细节:
- 支持嵌入不同的时间粒度(如分钟、小时、周几、天、月份等)。
- 可以选择使用固定嵌入(
FixedEmbedding
)或可学习嵌入(nn.Embedding
)。
-
TimeFeatureEmbedding(时间特征嵌入)
- 作用:为特定时间粒度(如小时、天、周等)的数值进行线性嵌入。
- 实现细节:
- 根据
freq_map
映射不同的时间粒度,并使用线性变换嵌入。
- 根据
-
DataEmbedding(数据嵌入)
- 作用:组合了
TokenEmbedding
、PositionalEmbedding
和TemporalEmbedding
,生成完整的嵌入向量。 - 实现细节:
- 先将输入数据通过
TokenEmbedding
进行嵌入。 - 然后结合
TemporalEmbedding
提取时间相关特征。 - 还会加入位置嵌入(
PositionalEmbedding
)来提供序列位置信息。 - 最后通过
Dropout
防止过拟合。
- 先将输入数据通过
- 作用:组合了
-
DataEmbedding_wo_pos(无位置嵌入的数据嵌入)
- 作用:与
DataEmbedding
类似,但不包含位置嵌入。 - 实现细节:
- 只包含
TokenEmbedding
和TemporalEmbedding
,省略位置嵌入部分。
- 只包含
- 作用:与
-
DataEmbedding_wo_pos_temp(无位置和时间嵌入的数据嵌入)
- 作用:只进行
TokenEmbedding
,不包括时间和位置嵌入。 - 实现细节:
- 仅使用
TokenEmbedding
来处理输入数据。
- 仅使用
- 作用:只进行
-
DataEmbedding_wo_temp(无时间嵌入的数据嵌入)
- 作用:不包括时间嵌入,但仍然使用位置嵌入。
- 实现细节:
- 包含
TokenEmbedding
和PositionalEmbedding
,省略了TemporalEmbedding
。
- 包含
嵌入层的整体作用
这些嵌入层的主要作用是将输入的时间序列数据转换成向量表示,这样可以更好地被神经网络模型(如 Transformer 或 CNN)处理。时间序列数据通常包含时间信息(如天、小时等),嵌入层通过提取这些时间信息来增强模型对时间依赖性的感知。通过组合 TokenEmbedding
(提取特征)、PositionalEmbedding
(位置信息)和 TemporalEmbedding
(时间信息),模型可以捕捉到输入数据的复杂时空关系。
dctnet.py代码:
from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
try:
from torch import irfft
from torch import rfft
except ImportError:
def rfft(x, d):
t = torch.fft.fft(x, dim = (-d))
r = torch.stack((t.real, t.imag), -1)
return r
def irfft(x, d):
t = torch.fft.ifft(torch.complex(x[:,:,0], x[:,:,1]), dim = (-d))
return t.real
def dct(x, norm=None):
"""
Discrete Cosine Transform, Type II (a.k.a. the DCT)
For the meaning of the parameter `norm`, see:
https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.fftpack.dct.html
:param x: the input signal
:param norm: the normalization, None or 'ortho'
:return: the DCT-II of the signal over the last dimension
"""
x_shape = x.shape
N = x_shape[-1]
x = x.contiguous().view(-1, N)
v = torch.cat([x[:, ::2], x[:, 1::2].flip([1])], dim=1)
# Vc = torch.fft.rfft(v, 1, onesided=False)
Vc = rfft(v, 1)
k = - torch.arange(N, dtype=x.dtype, device=x.device)[None, :] * np.pi / (2 * N)
W_r = torch.cos(k)
W_i = torch.sin(k)
V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_i
if norm == 'ortho':
V[:, 0] /= np.sqrt(N) * 2
V[:, 1:] /= np.sqrt(N / 2) * 2
V = 2 * V.view(*x_shape)
return V
# class senet_block(nn.Module):
# def __init__(self, channel=512, ratio=1):
# super(dct_channel_block, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
# self.fc = nn.Sequential(
# nn.Linear(channel, channel // 4, bias=False),
# nn.ReLU(inplace=True),
# nn.Linear(channel //4, channel, bias=False),
# nn.Sigmoid()
# )
# def forward(self, x):
# # b, c, l = x.size() # (B,C,L)
# # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
# # print("y",y.shape)
# x = x.permute(0,2,1)
# b, c, l = x.size()
# y = self.avg_pool(x).view(b, c) # (B,C,L) ->(B,C,1)
# # print("y",y.shape)
# # y = self.fc(y).view(b, c, 96)
# y = self.fc(y).view(b,c,1)
# # print("y",y.shape)
# # return x * y
# return (x*y).permute(0,2,1)
class dct_channel_block(nn.Module):
def __init__(self, channel):
super(dct_channel_block, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
self.fc = nn.Sequential(
nn.Linear(channel, channel*2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear( channel*2, channel, bias=False),
nn.Sigmoid()
)
# self.dct_norm = nn.LayerNorm([512], eps=1e-6)
self.dct_norm = nn.LayerNorm([96], eps=1e-6)#for lstm on length-wise
# self.dct_norm = nn.LayerNorm([36], eps=1e-6)#for lstm on length-wise on ill with input =36
def forward(self, x):
b, c, l = x.size() # (B,C,L) (32,96,512)
# y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
# y = self.avg_pool(x).view(b, c) # (B,C,L) -> (B,C,1)
# print("y",y.shape
# y = self.fc(y).view(b, c, 96)
list = []
for i in range(c):
freq=dct(x[:,i,:])
# print("freq-shape:",freq.shape)
list.append(freq)
stack_dct=torch.stack(list,dim=1)
stack_dct = torch.tensor(stack_dct)
'''
for traffic mission:f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic datasets
'''
lr_weight = self.dct_norm(stack_dct)
lr_weight = self.fc(stack_dct)
lr_weight = self.dct_norm(lr_weight)
# print("lr_weight",lr_weight.shape)
return x *lr_weight #result
if __name__ == '__main__':
tensor = torch.rand(8,7,96)
dct_model = dct_channel_block()
result = dct_model.forward(tensor)
print("result.shape:",result.shape)
这段代码实现了基于离散余弦变换(DCT)的通道块,用于增强神经网络中的特征表示。以下是代码的详细解释:
1. 导入库和兼容性处理
from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
导入了 PyTorch 的 nn
模块用于神经网络构建,math
和 numpy
用于数学运算。
try:
from torch import irfft
from torch import rfft
except ImportError:
def rfft(x, d):
t = torch.fft.fft(x, dim=(-d))
r = torch.stack((t.real, t.imag), -1)
return r
def irfft(x, d):
t = torch.fft.ifft(torch.complex(x[:, :, 0], x[:, :, 1]), dim=(-d))
return t.real
这一段代码用于兼容旧版本的 PyTorch,定义了 rfft
和 irfft
函数。在较新的 PyTorch 版本中,rfft
和 irfft
函数可能已被替换,因此这里使用自定义实现确保代码的兼容性。
2. DCT(离散余弦变换)
def dct(x, norm=None):
...
这个函数实现了 离散余弦变换 (DCT),具体是 DCT-II 类型。DCT 是一种频域转换,通常用于信号处理和特征提取,它可以将时域信号转换为频域表示。
主要步骤:
- 重新排列数据:将输入张量中的偶数和奇数索引部分组合在一起。
- 进行 FFT(快速傅里叶变换):通过
rfft
函数进行快速傅里叶变换,将信号转换到频域。 - 正弦和余弦加权:通过
cos
和sin
函数对 FFT 的结果进行加权,得到离散余弦变换的结果。 - 归一化处理:如果指定
norm='ortho'
,进行正交归一化。
3. DCT 通道块
class dct_channel_block(nn.Module):
def __init__(self, channel):
super(dct_channel_block, self).__init__()
self.fc = nn.Sequential(
nn.Linear(channel, channel * 2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear(channel * 2, channel, bias=False),
nn.Sigmoid()
)
self.dct_norm = nn.LayerNorm([96], eps=1e-6)
这个类实现了基于 DCT 的通道块,主要用于对输入张量进行变换,增强特征的表达能力。
主要组件:
- 全连接层(
fc
):使用了两层全连接层,结合ReLU
激活函数和Dropout
,来对 DCT 变换后的频域特征进行进一步处理。 - LayerNorm(
dct_norm
):用于标准化处理,以稳定训练过程。
前向传播 (forward
):
def forward(self, x):
b, c, l = x.size() # (B,C,L) (32,96,512)
list = []
for i in range(c):
freq = dct(x[:, i, :])
list.append(freq)
stack_dct = torch.stack(list, dim=1)
stack_dct = torch.tensor(stack_dct)
lr_weight = self.dct_norm(stack_dct)
lr_weight = self.fc(stack_dct)
lr_weight = self.dct_norm(lr_weight)
return x * lr_weight
- 输入:
x
的形状为(B, C, L)
,其中B
是批量大小,C
是通道数,L
是序列长度。 - DCT 变换:对于每个通道,计算 DCT,得到频域表示。
- 处理 DCT 结果:将 DCT 结果通过全连接层和标准化层处理,得到特征权重。
- 输出:将原始输入与 DCT 处理后的权重相乘,得到最终的输出。
4. 主函数
if __name__ == '__main__':
tensor = torch.rand(8, 7, 96)
dct_model = dct_channel_block(96)
result = dct_model.forward(tensor)
print("result.shape:", result.shape)
这部分代码用于测试 dct_channel_block
类:
- 输入:随机生成一个形状为
(8, 7, 96)
的张量,代表批量大小为 8,通道数为 7,序列长度为 96。 - 模型实例化:实例化
dct_channel_block
,并使用前向传播方法对输入进行处理。 - 输出:打印处理后的输出张量的形状。
总结
- DCT 的作用:DCT 可以将时域信号转换为频域信号,在信号处理和特征提取中非常有用。通过这种变换,可以捕捉输入数据中的频率模式,从而增强模型的表达能力。
- 通道块的作用:
dct_channel_block
结合 DCT 和全连接层来处理输入特征,并生成频域权重以调整输入特征。 - 代码的目的:实现了一种结合 DCT 的特征增强机制,旨在通过频域处理提升神经网络在时序数据上的表现。
LSTM.py代码:
from pip import main
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
# from dctnet import dct_channel_block,dct #for parameters calc
import argparse
from fvcore.nn import FlopCountAnalysis,parameter_count_table
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通
def __init__(self,configs,input_size=None,hidden_size=16, output_size=None,batch_size=None,num_layers=2):#input_size与output_size是通道数
super(Model, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.input_size = configs.enc_in #channel
self.hidden_size = hidden_size #输出维度 也就是输出通道
self.num_layers = num_layers
self.output_size = configs.enc_in #channel #输出个数
self.num_directions = 1 # 单向LSTM
self.batch_size = configs.batch_size
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)#通道数对齐层
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)#输出长度对齐层
# self.linear = nn.Linear()
#add dce-block
self.dct_layer=dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)#作为模块一般normal channel效果好点
# self.dct_norm = nn.LayerNorm([512], eps=1e-6)#作为模块一般normal channel效果好点
def forward(self, x):
# x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL
# b, c, l = x.size() # (B,C,L)
# batch_size, seq_len = x.shape[0], x.shape[1]
# h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
# c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
# output(batch_size, seq_len, num_directions * hidden_size)
output, _ = self.lstm(x, (h_0, c_0)) # output(5, 30, 64)
# print("output.shape:",output.shape)#result.shape: torch.Size([8, 96, 8])
result = self.linear(output) # (B,L,C)
# output.shape: torch.Size([8, 96, 16])#16是hidden_size
# result.shape: torch.Size([8, 96, 8])#8是为了符合通道数对齐,exchange_rate有8个变量
'''
dct
'''
result = self.dct_layer(result.permute(0,2,1))#加入dct模块,mse降低0.12个点
result_len = self.linear_out_len(result)#为了输出长度对齐 (8,8,96)
'''
dct
'''
# result_len = self.linear_out_len(result.permute(0,2,1))#为了输出长度对齐 (8,8,96)
# print("result.shape:",result.shape)#result.shape: torch.Size([8, 96, 8])
# result = result.permute(0,2,1)#(B,L,C)=》(B,C,L)
# result = self.dct_layer(result_len)#加入dct模块,mse降低0.12个点
# result = result.permute(0,2,1)#(B,C,L)=》(B,L,C)
return result_len.permute(0,2,1)
# lstm = LSTM(input_size=7,hidden_size=64, output_size=7,batch_size=8,num_layers=5).to(device)
# tensor = torch.rand(8, 96, 7).to(device)
# result = lstm(tensor)
# print("result.shape:",result.shape)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
# forecasting task
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--label_len', type=int, default=48, help='start token length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # DLinear with --individual, use this hyperparameter as the number of channels
parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
parser.add_argument('--c_out', type=int, default=7, help='output size')
parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')
# optimization
parser.add_argument('--num_workers', type=int, default=10, help='data loader num workers')
parser.add_argument('--itr', type=int, default=2, help='experiments times')
parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
parser.add_argument('--des', type=str, default='test', help='exp description')
parser.add_argument('--loss', type=str, default='mse', help='loss function')
parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)
# GPU
parser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')
parser.add_argument('--gpu', type=int, default=0, help='gpu')
parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')
parser.add_argument('--test_flop', action='store_true', default=False, help='See utils/tools for usage')
args = parser.parse_args()
model = Model(args)
print(parameter_count_table(model))
这段代码实现了一个基于 LSTM(长短期记忆网络)和 DCT(离散余弦变换)的时间序列预测模型,并且可以处理不同的模型配置和超参数。这段代码主要用于时间序列数据的预测任务,具体解释如下:
1. 模型类 Model
class Model(nn.Module):
def __init__(self, configs, input_size=None, hidden_size=16, output_size=None, batch_size=None, num_layers=2):
super(Model, self).__init__()
...
configs
:通过命令行传递的配置参数(如序列长度、批量大小、隐藏层大小等)。input_size
:输入的特征数量(通道数),从configs.enc_in
中获取。hidden_size
:LSTM 隐藏层的大小,默认是16
,即 LSTM 输出的特征维度。num_layers
:LSTM 网络的层数,默认为2
。output_size
:模型的输出特征数,从configs.enc_in
中获取。
该模型的结构包含:
- 一个 LSTM 层,用于处理输入的时间序列数据。
- 两个 线性层(
linear
和linear_out_len
),用于通道数和序列长度的对齐。 - 一个基于 DCT 的增强模块
dct_channel_block
,用于提取频域特征。
2. LSTM 和线性层
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size) # 通道数对齐
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len) # 输出长度对齐
- LSTM 层:输入维度为
input_size
,输出维度为hidden_size
,LSTM 层数为num_layers
。 - 线性层
linear
:用于将 LSTM 的输出从hidden_size
映射回输入的特征数量(通道数)。 - 线性层
linear_out_len
:用于将序列长度从seq_len
映射到预测的序列长度pred_len
。
3. DCT 通道块
self.dct_layer = dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)
dct_channel_block
:通过 DCT(离散余弦变换)提取频域信息,并对输入特征进行处理。它有助于改善模型对时间序列中的周期性或频域特征的捕捉能力。dct_norm
:标准化层,用于将经过 DCT 处理的特征进行归一化。
4. 前向传播函数 forward
def forward(self, x):
...
h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
output, _ = self.lstm(x, (h_0, c_0)) # LSTM 前向传播
result = self.linear(output) # 通道数对齐
result = self.dct_layer(result.permute(0, 2, 1)) # DCT 模块
result_len = self.linear_out_len(result) # 长度对齐
return result_len.permute(0, 2, 1)
- 初始化隐藏状态和细胞状态:
h_0
和c_0
是 LSTM 的初始隐藏状态和细胞状态。 - LSTM 前向传播:输入
x
经过 LSTM 层,得到output
,并进行通道数对齐。 - DCT 处理:经过 LSTM 输出后的结果通过 DCT 模块处理,并对频域特征进行操作。
- 长度对齐:通过
linear_out_len
线性层调整预测序列的长度为pred_len
。 - 输出:返回最终的预测结果,并调整形状。
5. 命令行参数配置
parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
...
args = parser.parse_args()
model = Model(args)
seq_len
:输入序列的长度。pred_len
:预测序列的长度。enc_in
:输入特征的通道数。batch_size
:批量大小。
通过 argparse
模块,可以通过命令行传入这些参数来控制模型的配置。
6. 模型参数和 FLOPs 计算
from fvcore.nn import FlopCountAnalysis, parameter_count_table
print(parameter_count_table(model))
parameter_count_table
:用于计算模型的参数数量。FlopCountAnalysis
:用于分析模型的浮点操作(FLOPs)。
这段代码通过 fvcore
库计算模型的参数量,并输出这些信息以便进行模型复杂度的评估。
总结
- 该模型结合了 LSTM 和 DCT,用于时间序列数据的预测。
- 通过 DCT 增强模型对频域特征的捕捉能力,有助于提高模型的预测性能。
- 通过命令行参数,模型的配置可以灵活调整,并且支持多 GPU 训练和 FLOPs 计算。
oneCNN.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通#CNN
def __init__(self,configs):
super(Model, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.channel_num = configs.enc_in
self.conv1 = nn.Conv1d(in_channels=self.channel_num,out_channels=self.channel_num,kernel_size=5,stride=1,padding=2) #输入通道数和输出通道数应该一样
# input = torch.randn(32,7,96)
# batch_size x text_len x embedding_size -> batch_size x embedding_size x text_len
self.dct_layer=dct_channel_block(self.seq_len)
self.linear = nn.Linear(self.seq_len,self.pred_len)
# self.dct_norm = nn.LayerNorm([7], eps=1e-6)#作为模块一般normal channel效果好点
def forward(self, x):
# print("x.shape:",x.shape)
x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL and 1-d conv
# b, c, l = x.size() # (B,C,L)
out = self.conv1(x) #b,c,l
out = self.linear(x)#b,c,l
# print(out.size())
# out = self.dct_layer(out)#加入dct模块,mse降低0.12个点
# out = self.linear(x)#b,c,l
# x = x+mid
# x = x.permute(0,2,1)
# x = self.dct_norm(x) #norm 144
# x = x.permute(0,2,1)
return (out).permute(0,2,1)#b,l,c
这段代码定义了一个基于 CNN(卷积神经网络) 和 DCT(离散余弦变换) 的简单时间序列预测模型。模型通过 1D 卷积和 DCT 增强特征提取,结合线性层对输出进行预测。
代码详解
1. 模型初始化
class Model(nn.Module):
def __init__(self, configs):
super(Model, self).__init__()
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.channel_num = configs.enc_in
self.conv1 = nn.Conv1d(in_channels=self.channel_num, out_channels=self.channel_num, kernel_size=5, stride=1, padding=2)
self.dct_layer = dct_channel_block(self.seq_len)
self.linear = nn.Linear(self.seq_len, self.pred_len)
seq_len
:输入序列的长度。pred_len
:预测序列的长度。channel_num
:输入的通道数,代表时间序列的特征数量(例如 7 维度的输入表示有 7 个不同的特征)。
网络组件:
-
conv1
:一维卷积层,输入和输出通道数保持一致,卷积核大小为5
,步幅为1
,并使用padding=2
保证输入和输出序列的长度保持不变。 -
dct_layer
:通过dct_channel_block
提取频域特征,用离散余弦变换增强特征。 -
linear
:线性层用于将序列长度从seq_len
映射到预测长度pred_len
,在输出前对时间序列进行长度调整。
2. 前向传播函数 forward
def forward(self, x):
x = x.permute(0, 2, 1) # 转置输入,调整维度顺序 (B, L, C) -> (B, C, L)
out = self.conv1(x) # 应用一维卷积,提取特征 (B, C, L)
out = self.linear(x) # 应用线性层进行序列长度变换
return out.permute(0, 2, 1) # 再次转置,调整维度回到 (B, L, C)
-
输入:输入
x
的形状为(B, L, C)
,即B
表示批量大小,L
表示序列长度,C
表示输入的特征维度(通道数)。 -
转置:在进行 1D 卷积前,先通过
x.permute(0, 2, 1)
将输入从形状(B, L, C)
转为(B, C, L)
,使得输入符合 1D 卷积的要求。 -
卷积:通过
conv1
提取特征,卷积后维度仍然是(B, C, L)
。 -
线性层:将经过卷积后的特征通过线性层
linear
,将序列长度从seq_len
转换为pred_len
。 -
输出:最后将输出再转置回
(B, L, C)
,作为最终的预测结果。
可选模块(未激活的部分)
# out = self.dct_layer(out) # DCT 层:可选的 DCT 增强模块,用于通过离散余弦变换进一步处理特征。
# out = self.linear(x) # 可选的线性层,可以用于对卷积后输出再进行进一步处理。
# x = self.dct_norm(x) # 标准化层,用于标准化 DCT 后的特征。
这些部分暂时被注释掉了,表示 DCT 层和标准化层可能是可选的改进点。在某些情况下,通过 DCT 提取频域特征可以提高模型的性能。
3. 总结
- 卷积操作:1D 卷积用于捕捉输入时间序列的局部特征。
- DCT 操作:可选的 DCT 层能够将时域信号转换为频域信号,捕捉输入数据中的周期性或频率特征,从而进一步增强模型的表现。
- 线性映射:线性层负责调整序列的长度,保证输入和输出长度匹配。
这个模型设计相对简单,但由于使用了卷积操作和 DCT 层,因此在处理具有时空依赖性的数据(如时间序列预测)时有一定优势。
Transformer.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.Transformer_EncDec import Decoder, DecoderLayer, Encoder, EncoderLayer, ConvLayer
from layers.SelfAttention_Family import FullAttention, AttentionLayer
from layers.Embed import DataEmbedding,DataEmbedding_wo_pos,DataEmbedding_wo_temp,DataEmbedding_wo_pos_temp
import numpy as np
from layers.dctnet import dct_channel_block,dct
class Model(nn.Module):
"""
Vanilla Transformer with O(L^2) complexity
"""
def __init__(self, configs):
super(Model, self).__init__()
self.pred_len = configs.pred_len
self.output_attention = configs.output_attention
# Embedding
if configs.embed_type == 0:
self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
elif configs.embed_type == 1:
self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
elif configs.embed_type == 2:
self.enc_embedding = DataEmbedding_wo_pos(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.dec_embedding = DataEmbedding_wo_pos(configs.dec_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
elif configs.embed_type == 3:
self.enc_embedding = DataEmbedding_wo_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.dec_embedding = DataEmbedding_wo_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
elif configs.embed_type == 4:
self.enc_embedding = DataEmbedding_wo_pos_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
self.dec_embedding = DataEmbedding_wo_pos_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
configs.dropout)
# Encoder
self.encoder = Encoder(
[
EncoderLayer(
AttentionLayer(
FullAttention(False, configs.factor, attention_dropout=configs.dropout,
output_attention=configs.output_attention), configs.d_model, configs.n_heads),
configs.d_model,
configs.d_ff,
dropout=configs.dropout,
activation=configs.activation
) for l in range(configs.e_layers)
],
norm_layer=torch.nn.LayerNorm(configs.d_model)
)
# Decoder
self.decoder = Decoder(
[
DecoderLayer(
AttentionLayer(
FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
configs.d_model, configs.n_heads),
AttentionLayer(
FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
configs.d_model, configs.n_heads),
configs.d_model,
configs.d_ff,
dropout=configs.dropout,
activation=configs.activation,
)
for l in range(configs.d_layers)
],
norm_layer=torch.nn.LayerNorm(configs.d_model),
projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
)
self.dct_layer=dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
enc_out = self.enc_embedding(x_enc, x_mark_enc)
enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
# print("enc_out.shape:",enc_out.shape)#enc_out.shape: torch.Size([32, 96, 512])
#加入dct模块
# mid = self.dct_layer(enc_out)
# enc_out = enc_out+mid
# enc_out = self.dct_norm(enc_out) #norm 144
dec_out = self.dec_embedding(x_dec, x_mark_dec)
dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
if self.output_attention:
return dec_out[:, -self.pred_len:, :], attns
else:
return dec_out[:, -self.pred_len:, :] # [B, L, D]
这段代码实现了一个结合 Transformer 和 DCT(离散余弦变换) 的神经网络模型,主要用于时间序列预测任务。模型结构基于 Transformer 编码器-解码器框架,支持多种嵌入方式,并集成了 DCT 通道块用于增强特征处理。
1. 模型简介
该模型的核心是标准的 Transformer 编码器-解码器结构,具有以下主要模块:
- 嵌入模块:通过
DataEmbedding
和其变种为输入数据生成嵌入表示。 - 编码器(Encoder):基于多层自注意力机制的编码器,用于提取输入序列的高维特征。
- 解码器(Decoder):结合自注意力和交叉注意力的解码器,用于生成预测序列。
- DCT 模块:用于对编码器输出进行频域增强处理(当前注释掉,作为可选模块)。
2. 嵌入模块
if configs.embed_type == 0:
self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
DataEmbedding
:为输入数据生成嵌入。嵌入方式由configs.embed_type
控制,支持五种不同类型的嵌入:- 0:默认的嵌入(值、时间、位置嵌入的组合)。
- 1:与
embed_type=0
相同。 - 2:没有位置嵌入。
- 3:没有时间嵌入。
- 4:没有时间和位置嵌入。
3. 编码器(Encoder)
self.encoder = Encoder(
[
EncoderLayer(
AttentionLayer(
FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=configs.output_attention),
configs.d_model, configs.n_heads),
configs.d_model,
configs.d_ff,
dropout=configs.dropout,
activation=configs.activation
) for l in range(configs.e_layers)
],
norm_layer=torch.nn.LayerNorm(configs.d_model)
)
Encoder
:由多个EncoderLayer
组成,每层包括自注意力机制和前馈神经网络。FullAttention
:注意力机制,False
表示不使用掩码(即标准自注意力)。LayerNorm
:标准化层,用于稳定训练。
4. 解码器(Decoder)
self.decoder = Decoder(
[
DecoderLayer(
AttentionLayer(
FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
configs.d_model, configs.n_heads),
AttentionLayer(
FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
configs.d_model, configs.n_heads),
configs.d_model,
configs.d_ff,
dropout=configs.dropout,
activation=configs.activation,
)
for l in range(configs.d_layers)
],
norm_layer=torch.nn.LayerNorm(configs.d_model),
projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
)
Decoder
:由多个DecoderLayer
组成,每层结合了自注意力和交叉注意力机制。projection
:最后一层是线性层,用于将解码器的输出投影到目标输出维度(configs.c_out
)。
5. DCT 通道块
self.dct_layer = dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
dct_channel_block
:DCT 模块用于对编码器的输出进行频域变换,但该部分目前被注释掉。dct_norm
:LayerNorm 层用于对经过 DCT 的特征进行标准化处理。
6. 前向传播函数
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
enc_out = self.enc_embedding(x_enc, x_mark_enc)
enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
# DCT 模块(可选)
# mid = self.dct_layer(enc_out)
# enc_out = enc_out + mid
# enc_out = self.dct_norm(enc_out)
dec_out = self.dec_embedding(x_dec, x_mark_dec)
dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
if self.output_attention:
return dec_out[:, -self.pred_len:, :], attns
else:
return dec_out[:, -self.pred_len:, :]
-
输入:
x_enc
和x_mark_enc
是编码器的输入和时间标记。x_dec
和x_mark_dec
是解码器的输入和时间标记。enc_self_mask
、dec_self_mask
、dec_enc_mask
是可选的注意力掩码。
-
编码器:通过
enc_embedding
和encoder
模块处理编码器输入。 -
DCT:注释掉的 DCT 模块可以对编码器输出进行频域处理。
-
解码器:通过
dec_embedding
和decoder
模块处理解码器输入,并结合编码器的输出进行预测。 -
输出:
- 如果
output_attention
为True
,返回解码器的输出和注意力矩阵。 - 否则,只返回解码器的最终预测。
- 如果
7. 总结
- 该模型是一个结合 Transformer 和 DCT 的时间序列预测模型。
- Transformer 编码器-解码器架构使模型能够捕捉复杂的时间依赖关系。
- DCT 模块作为可选部分,能够增强模型对频域特征的捕捉。
- 模型支持多种嵌入方式,通过调整
configs.embed_type
来选择不同的嵌入策略。
Linear.py代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
# class Model(nn.Module):SENET for ETTmx
# """
# Just one Linear layer
# """
# def __init__(self,configs,channel=7,ratio=1):
# super(Model, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
# self.fc = nn.Sequential(
# nn.Linear(7,14, bias=False),
# nn.Dropout(p=0.1),
# nn.ReLU(inplace=True) ,
# nn.Linear(14,7, bias=False),
# nn.Sigmoid()
# )
# self.seq_len = configs.seq_len
# self.pred_len = configs.pred_len
# self.Linear_More_1 = nn.Linear(self.seq_len,self.pred_len * 2)
# self.Linear_More_2 = nn.Linear(self.pred_len*2,self.pred_len)
# self.relu = nn.ReLU()
# self.gelu = nn.GELU()
# self.drop = nn.Dropout(p=0.1)
# # Use this line if you want to visualize the weights
#
# def forward(self, x):
# # x: [Batch, Input length, Channel]
#
# x = x.permute(0,2,1) # (B,L,C)->(B,C,L)
# b, c, l = x.size() # (B,C,L)
# y = self.avg_pool(x).view(b, c) # (B,C,L)
# # np.save('f_weight.npy', f_weight_np)
# # # np.save('%d f_weight.npy' %epoch, f_weight_np)
# # print("y",y.shape)
# # return (x * y).permute(0,2,1)
# return (z).permute(0,2,1)
class my_Layernorm(nn.Module):
"""
Special designed layernorm for the seasonal part
"""
def __init__(self, channels):
super(my_Layernorm, self).__init__()
self.layernorm = nn.LayerNorm(channels)
def forward(self, x):
x_hat = self.layernorm(x)
bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
return x_hat - bias
class Model(nn.Module):
def __init__(self,configs,channel=96,ratio=1):
super(Model, self).__init__()
# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.channel_num = configs.enc_in
self.fc = nn.Sequential(
nn.Linear(channel, channel*2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear( channel*2, channel, bias=False),
nn.Sigmoid()
)
self.fc_inverse = nn.Sequential(
nn.Linear(channel, channel//2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear( channel//2, channel, bias=False),
nn.Sigmoid()
)
# self.fc_plot = nn.Linear(channel, channel, bias=False)
self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)
self.Linear = nn.Linear(self.seq_len, self.pred_len)
self.Linear_1 = nn.Linear(self.seq_len, self.pred_len)
# self.dct_norm = nn.LayerNorm([self.channel_num], eps=1e-6)
self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
# self.my_layer_norm = nn.LayerNorm([96], eps=1e-6)
def forward(self, x):
x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL
b, c, l = x.size() # (B,C,L)
list = []
for i in range(c):#i represent channel
freq=dct.dct(x[:,i,:]) #dct
# print("freq-shape:",freq.shape)
list.append(freq)
stack_dct=torch.stack(list,dim=1)
stack_dct = torch.tensor(stack_dct)#(B,L,C)
stack_dct = self.dct_norm(stack_dct)#matters for traffic
f_weight = self.fc(stack_dct)
f_weight = self.dct_norm(f_weight)#matters for traffic
#visualization for fecam tensor
f_weight_cpu = f_weight
f_weight_np = f_weight_cpu.cpu().detach().numpy()
np.save('f_weight_weather_wf.npy', f_weight_np)
# np.save('%d f_weight.npy' %epoch, f_weight_np)
# f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic
# result = self.Linear(x)#forL
# f_weight_np = result.cpu().detach().numpy()
# np.save('f_weight.npy', f_weight_np)
# x = x.permute(0,2,1)
# result = self.Linear((x *(f_weight_inverse)))#forL
result = self.Linear((x *(f_weight)))#forL
return result.permute(0,2,1)
这段代码实现了一个时间序列预测模型,模型结合了 DCT(离散余弦变换) 和 线性层 来对输入的序列进行处理,并输出预测结果。模型在数据处理中引入了频域变换,并对频域特征进行了加权处理。以下是详细的解释:
1. my_Layernorm
层
class my_Layernorm(nn.Module):
"""
Special designed layernorm for the seasonal part
"""
def __init__(self, channels):
super(my_Layernorm, self).__init__()
self.layernorm = nn.LayerNorm(channels)
def forward(self, x):
x_hat = self.layernorm(x)
bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
return x_hat - bias
- 作用:实现了一种特殊的 LayerNorm 层,它对每个样本进行标准化,并减去每个样本的均值,目的是更好地处理序列中具有季节性或周期性的特征。
2. 模型 Model
初始化
class Model(nn.Module):
def __init__(self, configs, channel=96, ratio=1):
super(Model, self).__init__()
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.channel_num = configs.enc_in
self.fc = nn.Sequential(
nn.Linear(channel, channel*2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear(channel*2, channel, bias=False),
nn.Sigmoid()
)
self.fc_inverse = nn.Sequential(
nn.Linear(channel, channel//2, bias=False),
nn.Dropout(p=0.1),
nn.ReLU(inplace=True),
nn.Linear(channel//2, channel, bias=False),
nn.Sigmoid()
)
self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)
self.Linear = nn.Linear(self.seq_len, self.pred_len)
self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
fc
和fc_inverse
:两个线性层用于调整输入通道的权重。fc
是增大通道维度的网络,fc_inverse
是缩小通道维度的网络,主要用于特征的重整和处理。mid_Linear
和Linear
:分别用于序列长度的映射。mid_Linear
用于中间的特征处理,Linear
将输入的序列长度从seq_len
映射到pred_len
。dct_norm
:用于对经过 DCT 处理的频域特征进行标准化,以确保数据分布稳定,减少模型训练的波动。
3. 前向传播 forward
def forward(self, x):
x = x.permute(0, 2, 1) # 调整输入维度 (B, L, C) -> (B, C, L)
b, c, l = x.size() # 获取输入数据的维度 (Batch, Channel, Length)
list = []
# 对每个通道的输入数据进行 DCT
for i in range(c):
freq = dct.dct(x[:, i, :]) # 对第 i 个通道应用 DCT
list.append(freq)
# 将各通道的 DCT 结果堆叠在一起
stack_dct = torch.stack(list, dim=1)
stack_dct = torch.tensor(stack_dct)
# 对 DCT 结果进行 LayerNorm 归一化
stack_dct = self.dct_norm(stack_dct)
# 通过全连接层对 DCT 结果进行处理
f_weight = self.fc(stack_dct)
f_weight = self.dct_norm(f_weight)
# 保存处理后的权重用于可视化
f_weight_cpu = f_weight
f_weight_np = f_weight_cpu.cpu().detach().numpy()
np.save('f_weight_weather_wf.npy', f_weight_np)
# 将原始输入和经过处理的 DCT 权重相乘,并通过线性层进行序列预测
result = self.Linear(x * f_weight) # 结合权重后的输入数据进行预测
return result.permute(0, 2, 1) # 调整输出维度回 (B, L, C)
前向传播步骤:
- 维度调整:通过
permute
将输入的维度从(B, L, C)
转换为(B, C, L)
,方便后续进行 1D 操作。 - DCT 处理:对每个通道的数据应用 DCT(离散余弦变换),捕捉数据的频域特征。
- 堆叠 DCT 结果:将每个通道的 DCT 结果堆叠在一起,形成一个新的张量。
- 标准化:通过
LayerNorm
对 DCT 结果进行标准化,确保特征的稳定性。 - 全连接层处理:通过全连接层调整 DCT 结果的权重,提取出更高层次的频域特征。
- 可视化保存:将处理后的权重保存到
.npy
文件中,用于后续的可视化分析。 - 预测结果生成:将输入数据和处理后的权重相乘,然后通过线性层输出预测结果。
- 维度恢复:最后将输出的维度恢复为
(B, L, C)
,与输入数据的形状保持一致。
4. 总结
- DCT 应用:该模型在每个通道上应用了 DCT,能够提取出输入数据中的频域特征,这对于具有周期性或频率特征的时间序列数据非常有效。
- 全连接层权重调整:通过全连接层对 DCT 特征进行加权处理,有助于模型学习不同频率成分对预测的影响。
- 标准化:使用
LayerNorm
保证频域特征在不同通道之间的平衡,避免某些通道的特征在模型中过度放大或缩小。
这个模型结合了 DCT 和神经网络的优势,通过频域特征处理提高了对复杂时间序列数据的预测能力,特别适合具有周期性变化的时间序列任务。
run_longExp.py代码:
这段代码是一个基于命令行参数的实验框架,用于时间序列预测任务,使用了 Transformer 系列模型(如 Autoformer、Informer、Transformer)。通过命令行参数,可以灵活调整模型、数据集、超参数等配置,执行训练、测试和预测流程。
参考:
用于时间序列预测的频率增强信道注意力机制(Python代码实现)_fecam-CSDN博客