序列模型是指一类特别设计来处理序列数据的神经网络模型。序列数据指的是数据中的每个元素都有先后顺序,比如时间序列数据(股票价格、天气变化等)、自然语言文本(句子中的单词顺序)、语音信号等。
1 统计工具
前面介绍了卷积神经网络架构,但是在处理序列数据时,需要新的神经网络架构,下面以股票价格为例:
我们用
x
t
x_{t}
xt表示价格,其中
t
t
t表示时间步(time step),也就是在时间步
t
t
t时观察到的价格
x
t
x_{t}
xt,我们通过下列公式来表示我们预测第
t
t
t日的价格:
x
t
∼
P
(
x
t
∣
x
t
−
1
,
…
,
x
1
)
.
x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1).
xt∼P(xt∣xt−1,…,x1).
即,在已知
1
1
1 到
t
−
1
t-1
t−1 的价格,求第
t
t
t 天的价格的概率分布。
1.1 自回归模型
为了实现这个预测,可以使用自回归模型:假设当前值
y
t
y_{t}
yt 与过去的值
y
t
−
1
,
y
t
−
2
,
.
.
.
y
t
−
p
y_{t-1} , y_{t-2} , ...y_{t-p}
yt−1,yt−2,...yt−p 之间存在线性关系,一般形式为 :
其中:
大致分为两种策略:
①自回归模型: 假设在现实情况下相当长的序列
x
t
−
1
,
…
,
x
1
x_{t-1}, \ldots, x_1
xt−1,…,x1可能是没价值的,因此我们只需要满足某个长度为
τ
\tau
τ的时间跨度, 即使用观测序列
x
t
−
1
,
…
,
x
t
−
τ
x_{t-1}, \ldots, x_{t-\tau}
xt−1,…,xt−τ。也就是说过长的历史序列可能并不必要,因此只需要关注较短的一段历史数据即可。因为只考虑观测值本身,所以叫自回归模型
②隐变量自回归模型: 即保留一些对过去观测的总结
h
t
h_{t}
ht,这个“总结”是无法直观解释的,它是模型自助捕捉的内部关系依赖,然后同时更新预测值
x
^
t
\hat{x}_t
x^t和
h
t
h_t
ht,即变为下列式子:
x
^
t
=
P
(
x
t
∣
h
t
)
和
h
t
=
g
(
h
t
−
1
,
x
t
−
1
)
\hat{x}_t = P(x_t \mid h_{t}) 和h_t = g(h_{t-1}, x_{t-1})
x^t=P(xt∣ht)和ht=g(ht−1,xt−1)由于
h
t
h_{t}
ht
h
t
h_{t}
ht从未被观测到,这类模型也被称为隐变量自回归模型,这里做出一个假设,即序列本身的动力学(数据随时间演变的方式)不会改变,意味着我们可以用过去的数据来推断未来的趋势,因为我们假定基本的动态规则是一致的。因此,整个序列的概率值可以表示为一系列条件概率的乘积:
P
(
x
1
,
…
,
x
T
)
=
∏
t
=
1
T
P
(
x
t
∣
x
t
−
1
,
…
,
x
1
)
.
P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}, \ldots, x_1).
P(x1,…,xT)=t=1∏TP(xt∣xt−1,…,x1).
注意,如果我们处理的是离散的对象(如单词), 而不是连续的数字,则上述的考虑仍然有效。我们需要使用分类器而不是回归模型来估计
1.2 马尔可夫模型
马尔可夫条件: 在自回归模型中,如果 t t t 时刻的数值,只与 x t − 1 , … , x t − τ x_{t-1}, \ldots, x_{t-\tau} xt−1,…,xt−τ 有关,而不是整个过去的序列,则称其满足马尔可夫条件。
如果
τ
=
1
\tau = 1
τ=1 ,则得到了一个一阶马尔可夫模型,
P
(
x
)
P(x)
P(x)由如下公式表示:
P
(
x
1
,
…
,
x
T
)
=
∏
t
=
1
T
P
(
x
t
∣
x
t
−
1
)
当
P
(
x
1
∣
x
0
)
=
P
(
x
1
)
.
P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}) \text{ 当 } P(x_1 \mid x_0) = P(x_1).
P(x1,…,xT)=t=1∏TP(xt∣xt−1) 当 P(x1∣x0)=P(x1).
若当假设
x
t
x_t
xt 仅是离散值时,可以使用动态规划可以沿着马尔可夫链精确地计算结果。
2 训练、预测
下面我们将用一个正弦函数和一些噪声生成1000个序列数据,并使用自回归模型进行训练和预测
2.1 生成数据
import torch
from torch import nn
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
T=1000
time=torch.arange(1,T+1,dtype=torch.float32)
x=torch.sin(0.01*time)+torch.normal(0,0.2,(T,))
# 绘制折线图
plt.plot(time, x)
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Time Series Data')
plt.show()
运行结果
2.2 构造数据集
我们是准备用 y t = F ( X t ) y_t=F(X_t) yt=F(Xt),其中 X t = [ x t − τ , … , x t − 1 ] X_t= [x_{t-\tau}, \ldots, x_{t-1}] Xt=[xt−τ,…,xt−1],我们这里假设 τ = 4 \tau=4 τ=4,即用前四个数据来预测下一个数据,但是这样的话,前 4 4 4 个数据就没有历史样本去描述了,一般的做法是直接舍弃,或者用零序列去填充。
这里我们用600个数据进行训练,剩余的用于预测。
构建数据集时,使用滑动窗口去构建:
# 构造数据集
tau=4
# 初始化特征矩阵,因为前四个值就是当前值的特征
features = torch.zeros((T - tau, tau))
for i in range(T - tau): # 用滑动窗口进行构建
features[i,:]=x[i:tau+i]
print('features:',features.shape)
print(features[:5])
labels = x[tau:].reshape((-1, 1))
print('labels:',labels.shape)
print(labels[:5])
batch_size = 16
n = 600 # 只有前600个样本用于训练
dataset = TensorDataset(features[:n], labels[:n])
train_iter = DataLoader(dataset, batch_size=batch_size, shuffle=False)
运行结果
2.3 构造模型进行训练
# 构造模型
def init_weights(m):
if type(m)==nn.Linear:
nn.init.xavier_uniform_(m.weight)
def net():
net=nn.Sequential(
nn.Linear(4,10),
nn.ReLU(),
nn.Linear(10,1)
)
net.apply(init_weights)
return net
# 评估模型在给定数据集上的损失
def evaluate_loss(net, data_iter, loss):
"""评估模型在给定数据集上的损失"""
net.eval() # 设置模型为评估模式
total_loss = 0.0
with torch.no_grad(): # 不计算梯度
for X, y in data_iter:
y_hat = net(X)
l = loss(y_hat, y)
total_loss += l.sum().item() # 计算总损失
net.train() # 恢复模型为训练模式
return total_loss / len(data_iter.dataset)
loss=nn.MSELoss(reduction='none')
lr=0.01
net=net()
optimzer=torch.optim.Adam(net.parameters(),lr)
loss_sum=[]
num_epoch=20
def train(net,num_epoch,train_iter,loss,optimzer,loss_sum):
for epoch in range(num_epoch):
for x,y in train_iter:
optimzer.zero_grad()
l=loss(net(x),y)
l.sum().backward()
optimzer.step()
temp=evaluate_loss(net,train_iter,loss)
loss_sum.append(temp)
print("epoch ",epoch+1,": loss:",temp)
train(net,num_epoch,train_iter,loss,optimzer,loss_sum)
# 绘制折线图
plt.plot(range(num_epoch), loss_sum)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()
运行结果
2.4 预测
# 使用模型进行预测
def predict(net, data_iter):
net.eval() # 设置模型为评估模式
predictions = []
with torch.no_grad(): # 不计算梯度
for X, y in data_iter:
y_hat = net(X)
predictions.extend(y_hat.numpy())
net.train() # 恢复模型为训练模式
return predictions
# 获取测试集的预测结果
predictions = predict(net, test_iter)
# 绘制预测结果与真实值的对比图
true_values = labels[n:].numpy()
plt.plot(true_values, label='True Values')
plt.plot(predictions, label='Predictions')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
运行结果
2.5 多步预测
# 多步预测
def multistep_predict(net, data_iter, steps):
net.eval()
multistep_predictions = []
with torch.no_grad():
for X, y in data_iter:
current_features = X.clone()
for _ in range(steps):
'''在每一步中,模型用 current_features 作为输入,并预测出 y_hat。
然后将 y_hat 拼接到 current_features 的末尾,
同时移除 current_features 的第一个时间步,
保持输入长度不变。这样,y_hat 成为下一步的输入'''
y_hat = net(current_features)
current_features = torch.cat([current_features[:, 1:], y_hat], dim=1)
multistep_predictions.extend(y_hat.numpy())
net.train()
return multistep_predictions
# 获取测试集的不同步数的多步预测结果
steps = [4, 16, 32]
multistep_predictions = {step: multistep_predict(net, test_iter, step) for step in steps}
# 绘制结果
plt.figure(figsize=(12, 6)) # 设置图像的宽度为12英寸,高度为6英寸
plt.plot(true_values, label='True Values')
plt.plot(ones_predictions, label='1-step Predictions')
for step, preds in multistep_predictions.items():
plt.plot(preds, label=f'{step}-step Predictions')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
上述的多步预测是迭代预测法,即用自己预测数据再去预测下一个数据,另一种方法是seq2seq,后面在介绍,迭代预测法如下图所示: