前言
系列专栏:【深度学习:算法项目实战】✨︎
涉及医疗健康、财经金融、商业零售、食品饮料、运动健身、交通运输、环境科学、社交媒体以及文本和图像处理等诸多领域,讨论了各种复杂的深度神经网络思想,如卷积神经网络、循环神经网络、生成对抗网络、门控循环单元、长短期记忆、自然语言处理、深度强化学习、大型语言模型和迁移学习。
在当今快速发展的智能交通系统中,准确预测交通流量对于城市规划、交通管理以及缓解交通拥堵等问题具有重要意义。传统的交通流量预测方法往往依赖于统计模型或简单的机器学习算法,这些方法在处理复杂、非线性且高度动态变化的交通流量数据时显得力不从心。随着深度学习技术的兴起,特别是在自然语言处理(NLP)和图像识别等领域取得显著成功后,越来越多的研究者开始探索深度学习在交通流量预测中的应用潜力。
本文旨在探讨一种基于BiLSTM(双向长短期记忆网络)与Transformer混合模型的时间序列预测方法,专门用于交通流量的预测。BiLSTM模型以其能够捕捉序列数据中前后依赖关系的能力而闻名,尤其适合处理具有时间序列特性的数据。而Transformer模型,自2017年由Vaswani等人提出以来,凭借其自注意力机制(self-attention)和并行计算能力,在自然语言处理等多个领域取得了突破性进展。将这两种模型结合起来,旨在充分利用BiLSTM对时间序列局部依赖性的建模能力以及Transformer的全局上下文捕捉能力,从而提高交通流量预测的准确性和效率。
本文的混合模型设计思路如下:首先,利用BiLSTM层对交通流量时间序列进行初步处理,捕捉数据的局部依赖性和趋势信息;随后,将BiLSTM的输出作为Transformer模型的输入,通过自注意力机制进一步挖掘数据中的全局上下文信息,实现更为精确的预测。此外,本文还将介绍如何使用PyTorch这一深度学习框架来实现这一混合模型,详细阐述模型的构建、训练及评估过程,并提供相应的Python代码示例。
文章目录
- 1. 数据集介绍
- 2. 数据集加载
- 3. 数据可视化
- 4. 特征工程
- 4.1 正态分布检验
- 4.2 特征缩放(归一化)
- 4.3 构建时间序列数据
- 4.4 数据集划分
- 4.5 数据集张量
- 5. 构建时序模型(TSF)
- 5.1 构建BiLSTM-Transformer模型
- 5.2 定义模型、损失函数与优化器
- 5.3 模型概要
- 6. 模型训练与可视化
- 6.1 定义训练与评估函数
- 6.2 绘制训练与验证损失曲线
- 7.1 构建预测函数
- 7.2 验证集预测
- 7.3 回归拟合图
- 7.4 评估指标
1. 数据集介绍
PEMS03数据集是由Caltrans Performance Measurement System (PeMS)收集的加州交通流量数据集。数据来源于横跨加州所有主要城市地区的探测器。PeMS系统每30秒收集一次数据,每5分钟对数据进行一次聚合。该数据集常用于交通流量预测研究,有助于提升运输效率、提高公共交通安全、助力智慧城市的建设。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, \
mean_absolute_percentage_error, \
mean_squared_error, root_mean_squared_error, \
r2_score
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset
from torchinfo import summary
np.random.seed(0)
2. 数据集加载
pems03_data = np.load('./PEMS03/pems03.npz')
traffic_data = pems03_data['data']
print(traffic_data.shape) # 91天*24小时*12(5分钟统计一次流量数据),探测器数量358,特征数 1
(26208, 358, 1)
我们可以清楚观察到数据集的形状为 (26208, 358, 1)
,即数据量、探测器和特征数。
data = traffic_data[:, 0, 0] # 第一个探测器的交通流量数据
3. 数据可视化
这里我们通过 matplotlib
实现数据可视化。可视化时间序列有助于快速识别趋势或季节性影响等模式。图形是了解数据动态并发现其中任何异常的简单方法。
fig = plt.figure(figsize=(18, 6))
plt.style.use('_mpl-gallery')
plt.plot(np.arange(len(data)), data, linestyle= '-')
fig.autofmt_xdate(rotation= 45)
plt.show()
4. 特征工程
4.1 正态分布检验
# 计算偏度
skewness = stats.skew(data)
print("skewness: {:.2f}".format(skewness))
# 计算峰度
# fisher=False表示使用Pearson的定义,峰度应该接近 3
kurtosis = stats.kurtosis(data, fisher=False)
print("kurtosis: {:.2f}".format(kurtosis))
skewness: 0.22
kurtosis: 2.10
- 偏度 = 0.22:这个值比较接近0,说明数据的分布形态在偏度上比较接近正态分布。
- 峰度 = 2.10:这个值比正态分布的标准峰度3要小,说明数据的分布形态在峰度上比正态分布要平缓一些,即数据的峰值没有正态分布那么尖锐,尾部也相对较短。
4.2 特征缩放(归一化)
reshape
方法用于改变数组的形状。第一个参数 -1
表示自动推断该维度的大小,使得数组在保持总元素数量不变的情况下,根据第二个参数的要求进行重塑。第二个参数 1
指定了新数组的第二维度为 1
。举例说明,如果原始的 data
是一个一维数组 [1, 2, 3, 4]
,执行 data.reshape(-1, 1)
后,data
将变为一个二维数组 [[1], [2], [3], [4]]
。如果原始的 data
是一个二维数组,比如 [[1, 2], [3, 4]]
,执行该操作后会将其转换为一个二维数组,其中每个元素都被转换为一个包含单个元素的列表,即 [[1], [2], [3], [4]]
。
# 重塑数组
data = data.reshape(-1, 1)
在机器学习中 StandardScaler()
函数将数据的特征值转换为符合正态分布的形式,它将数据缩放到均值为0,标准差为1的区间。常用于不同尺度特征数据的标准化,以提高模型的泛化能力。fit_transform()
方法首先计算特征数据 features
的均值和方差,然后对数据进行标准化,使其具有零均值和单位方差。
# 创建 StandardScaler实例,对特征进行拟合和变换
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
print(scaled_data.shape)
4.3 构建时间序列数据
我们创建一个时间序列数据,时间步 time_steps
假设设置为10
time_steps = 10
X_list = []
y_list = []
for i in range(len(scaled_data) - time_steps):
X_list.append(scaled_data[i:i+time_steps])
y_list.append(scaled_data[i+time_steps])
X = np.array(X_list) # [samples, time_steps, num_features]
y = np.array(y_list) # [target]
上述代码的目的是进行时间序列数据的预处理,将原始的时间序列数据转换为适合机器学习模型输入的格式。具体来说,它通过滑动窗口的方式将时间序列数据分割成多个样本,每个样本包含一定数量的时间步 time_steps
的特征数据以及对应的一个目标值。time_steps
:表示每个样本中包含的时间步数。它决定了模型在预测时考虑的历史数据长度。X_list
:用于存储分割后的特征数据样本的列表。y_list
:用于存储每个特征数据样本对应的目标值的列表。
X_list.append(features_scaled[i:i + time_steps])
:将从当前位置 i
开始,长度为 time_steps
的特征数据切片添加到 X_list
中。这样就得到了一系列连续的时间步的特征数据样本。
y_list.append(target_scaled[i + time_steps])
:将当前位置 i + time_steps
的目标值添加到 y_list
中。这个目标值对应于当前特征数据样本之后的一个时间步的目标值。
samples, time_steps, num_features = X.shape
4.4 数据集划分
train_test_split
函数将数组或矩阵随机分成训练子集和测试子集。
X_train, X_valid,\
y_train, y_valid = train_test_split(X, y,
test_size=0.2,
random_state=45,
shuffle=False)
print(X_train.shape, X_valid.shape, y_train.shape, y_valid.shape)
以上代码中 random_state=45
设置了随机种子,以确保每次运行代码时分割结果的一致性。shuffle=False
表示在分割数据时不进行随机打乱。如果设置为True
(默认值),则会在分割之前对数据进行随机打乱,这样可以增加数据的随机性,但时间序列数据具有连续性,所以设置为False
。
4.5 数据集张量
# 将 NumPy数组转换为 tensor张量
X_train_tensor = torch.from_numpy(X_train).type(torch.Tensor)
X_valid_tensor = torch.from_numpy(X_valid).type(torch.Tensor)
y_train_tensor = torch.from_numpy(y_train).type(torch.Tensor).view(-1, 1)
y_valid_tensor = torch.from_numpy(y_valid).type(torch.Tensor).view(-1, 1)
print(X_train_tensor.shape, X_valid_tensor.shape, y_train_tensor.shape, y_valid_tensor.shape)
以上代码通过 train_test_split
划分得到的训练集和验证集中的特征数据 X_train
、X_valid
以及标签数据 y_train
、y_valid
从 numpy
数组转换为 PyTorch
的张量(tensor)类型。.type(torch.Tensor)
确保张量的数据类型为标准的 torch.Tensor
类型,.view(-1, 1)
对张量进行维度调整
class DataHandler(Dataset):
def __init__(self, X_train_tensor, y_train_tensor, X_valid_tensor, y_valid_tensor):
self.X_train_tensor = X_train_tensor
self.y_train_tensor = y_train_tensor
self.X_valid_tensor = X_valid_tensor
self.y_valid_tensor = y_valid_tensor
def __len__(self):
return len(self.X_train_tensor)
def __getitem__(self, idx):
sample = self.X_train_tensor[idx]
labels = self.y_train_tensor[idx]
return sample, labels
def train_loader(self):
train_dataset = TensorDataset(self.X_train_tensor, self.y_train_tensor)
return DataLoader(train_dataset, batch_size=32, shuffle=True)
def valid_loader(self):
valid_dataset = TensorDataset(self.X_valid_tensor, self.y_valid_tensor)
return DataLoader(valid_dataset, batch_size=32, shuffle=False)
在上述代码中,定义了一个名为 DataHandler
的类,它继承自 torch.utils.data.Dataset
__init__
方法用于接收数据和标签。__len__
方法返回数据集的长度。__getitem__
方法根据给定的索引 idx
返回相应的数据样本和标签。
data_handler = DataHandler(X_train_tensor, y_train_tensor, X_valid_tensor, y_valid_tensor)
train_loader = data_handler.train_loader()
valid_loader = data_handler.valid_loader()
5. 构建时序模型(TSF)
5.1 构建BiLSTM-Transformer模型
class BiLSTM_Transformer(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, num_heads, output_dim, dropout=0.5):
super(BiLSTM_Transformer, self).__init__()
# BiLSTM层
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True)
# Transformer编码器层
encoder_layer = nn.TransformerEncoderLayer(hidden_dim * 2, num_heads, dim_feedforward=hidden_dim * 4, dropout=dropout, batch_first=True)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# 输出层
self.fc = nn.Linear(hidden_dim * 2, output_dim)
def forward(self, x):
# BiLSTM输出
lstm_out, _ = self.lstm(x)
# Transformer输入:由于BiLSTM是双向的,所以输出维度是hidden_dim * 2
transformer_input = lstm_out
# Transformer输出
transformer_out = self.transformer_encoder(transformer_input)
# 预测输出
output = self.fc(transformer_out[:, -1, :])
return output
5.2 定义模型、损失函数与优化器
model = BiLSTM_Transformer(input_dim = num_features, # 输入特征维度
hidden_dim = 4,
num_layers = 2, # BiLSTM 中 num_layers为 2
num_heads = 4,
output_dim = 1)
criterion_mse = nn.MSELoss() # 定义均方误差损失函数
criterion_mae = nn.L1Loss() # 定义平均绝对误差损失
optimizer = torch.optim.Adam(model.parameters(), lr=1e-04) # 定义优化器
PyTorch中,损失函数 nn.MSELoss()
用于计算模型预测值和真实值之间的均方误差(Mean Squared Error, MSE),nn.L1Loss()
用于计算模型预测值和真实值之间的平均绝对误差(Mean Absolute Error, MAE)
5.3 模型概要
# batch_size, seq_len(time_steps), input_dim
summary(model, (32, time_steps, num_features))
===============================================================================================
Layer (type:depth-idx) Output Shape Param #
===============================================================================================
BiLSTM_Transformer [32, 1] --
├─LSTM: 1-1 [32, 10, 8] 672
├─TransformerEncoder: 1-2 [32, 10, 8] --
│ └─ModuleList: 2-1 -- --
│ │ └─TransformerEncoderLayer: 3-1 [32, 10, 8] 600
│ │ └─TransformerEncoderLayer: 3-2 [32, 10, 8] 600
├─Linear: 1-3 [32, 1] 9
===============================================================================================
Total params: 1,881
Trainable params: 1,881
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.22
===============================================================================================
Input size (MB): 0.00
Forward/backward pass size (MB): 0.02
Params size (MB): 0.00
Estimated Total Size (MB): 0.02
===============================================================================================
6. 模型训练与可视化
6.1 定义训练与评估函数
在模型训练之前,我们需先定义 train
函数来执行模型训练过程
def train(model, iterator, optimizer):
epoch_loss_mse = 0
epoch_loss_mae = 0
model.train() # 确保模型处于训练模式
for batch in iterator:
optimizer.zero_grad() # 清空梯度
inputs, targets = batch # 获取输入和目标值
outputs = model(inputs) # 前向传播
loss_mse = criterion_mse(outputs, targets) # 计算损失
loss_mae = criterion_mae(outputs, targets)
combined_loss = loss_mse + loss_mae # 可以根据需要调整两者的权重
combined_loss.backward()
optimizer.step()
epoch_loss_mse += loss_mse.item() # 累计损失
epoch_loss_mae += loss_mae.item()
average_loss_mse = epoch_loss_mse / len(iterator) # 计算平均损失
average_loss_mae = epoch_loss_mae / len(iterator)
return average_loss_mse, average_loss_mae
上述代码定义了一个名为 train
的函数,用于训练给定的模型。它接收模型、数据迭代器、优化器作为参数,并返回训练过程中的平均损失。
def evaluate(model, iterator):
epoch_loss_mse = 0
epoch_loss_mae = 0
model.eval() # 将模型设置为评估模式,例如关闭 Dropout 等
with torch.no_grad(): # 不需要计算梯度
for batch in iterator:
inputs, targets = batch
outputs = model(inputs) # 前向传播
loss_mse = criterion_mse(outputs, targets) # 计算损失
loss_mae = criterion_mae(outputs, targets)
epoch_loss_mse += loss_mse.item() # 累计损失
epoch_loss_mae += loss_mae.item()
return epoch_loss_mse / len(iterator), epoch_loss_mae / len(iterator)
上述代码定义了一个名为 evaluate
的函数,用于评估给定模型在给定数据迭代器上的性能。它接收模型、数据迭代器作为参数,并返回评估过程中的平均损失。这个函数通常在模型训练的过程中定期被调用,以监控模型在验证集或测试集上的性能。通过评估模型的性能,可以了解模型的泛化能力和训练的进展情况。
epoch = 100
train_mselosses = []
valid_mselosses = []
train_maelosses = []
valid_maelosses = []
for epoch in range(epoch):
train_loss_mse, train_loss_mae = train(model, train_loader, optimizer)
valid_loss_mse, valid_loss_mae = evaluate(model, valid_loader)
train_mselosses.append(train_loss_mse)
valid_mselosses.append(valid_loss_mse)
train_maelosses.append(train_loss_mae)
valid_maelosses.append(valid_loss_mae)
print(f'Epoch: {epoch+1:02}, Train MSELoss: {train_loss_mse:.5f}, Train MAELoss: {train_loss_mae:.3f}, Val. MSELoss: {valid_loss_mse:.5f}, Val. MAELoss: {valid_loss_mae:.3f}')
Epoch: 01, Train MSELoss: 0.45846, Train MAELoss: 0.516, Val. MSELoss: 0.10861, Val. MAELoss: 0.253
Epoch: 02, Train MSELoss: 0.15595, Train MAELoss: 0.298, Val. MSELoss: 0.08986, Val. MAELoss: 0.223
Epoch: 03, Train MSELoss: 0.11736, Train MAELoss: 0.255, Val. MSELoss: 0.06817, Val. MAELoss: 0.189
Epoch: 04, Train MSELoss: 0.09147, Train MAELoss: 0.222, Val. MSELoss: 0.05284, Val. MAELoss: 0.163
Epoch: 05, Train MSELoss: 0.07257, Train MAELoss: 0.197, Val. MSELoss: 0.04172, Val. MAELoss: 0.145
******
Epoch: 96, Train MSELoss: 0.03495, Train MAELoss: 0.136, Val. MSELoss: 0.03072, Val. MAELoss: 0.129
Epoch: 97, Train MSELoss: 0.03517, Train MAELoss: 0.136, Val. MSELoss: 0.03066, Val. MAELoss: 0.129
Epoch: 98, Train MSELoss: 0.03507, Train MAELoss: 0.136, Val. MSELoss: 0.03144, Val. MAELoss: 0.131
Epoch: 99, Train MSELoss: 0.03503, Train MAELoss: 0.136, Val. MSELoss: 0.03061, Val. MAELoss: 0.129
Epoch: 100, Train MSELoss: 0.03528, Train MAELoss: 0.136, Val. MSELoss: 0.03062, Val. MAELoss: 0.129
6.2 绘制训练与验证损失曲线
# 绘制 MSE损失图
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_mselosses, label='Train MSELoss')
plt.plot(valid_mselosses, label='Validation MSELoss')
plt.xlabel('Epoch')
plt.ylabel('MSELoss')
plt.title('Train and Validation MSELoss')
plt.legend()
plt.grid(True)
# 绘制 MAE损失图
plt.subplot(1, 2, 2)
plt.plot(train_maelosses, label='Train MAELoss')
plt.plot(valid_maelosses, label='Validation MAELoss')
plt.xlabel('Epoch')
plt.ylabel('MAELoss')
plt.title('Train and Validation MAELoss')
plt.legend()
plt.grid(True)
plt.show()
7.1 构建预测函数
定义预测函数prediction
方便调用
# 定义 prediction函数
def prediction(model, iterator):
all_targets = []
all_predictions = []
model.eval()
with torch.no_grad():
for batch in iterator:
inputs, targets = batch
predictions = model(inputs)
all_targets.extend(targets.numpy())
all_predictions.extend(predictions.numpy())
return all_targets, all_predictions
这段代码定义了一个名为 prediction
的函数,其主要目的是使用给定的模型对输入数据进行预测,并收集所有的目标值和预测值。
7.2 验证集预测
# 模型预测
targets, predictions = prediction(model, valid_loader)
# 反归一化
denormalized_targets = scaler.inverse_transform(targets)
denormalized_predictions = scaler.inverse_transform(predictions)
targets
是经过标准化处理后的目标值数组,predictions
是经过标准化处理后的预测值数组。scaler
是StandardScaler()
标准化类的实例,inverse_transform
方法会将标准化后的数组还原为原始数据的尺度,即对预测值进行反标准化操作。
# Visualize the data
plt.figure(figsize=(12,6))
plt.style.use('_mpl-gallery')
plt.title('Comparison of validation set prediction results')
plt.plot(denormalized_targets, color='blue',label='Actual Value')
plt.plot(denormalized_predictions, color='orange', label='Valid Value')
plt.legend()
plt.show()
7.3 回归拟合图
使用 regplot()
函数绘制数据图并拟合线性回归模型。
plt.figure(figsize=(5, 5), dpi=100)
sns.regplot(x=denormalized_targets, y=denormalized_predictions, scatter=True, marker="*", color='orange',line_kws={'color': 'red'})
plt.show()
7.4 评估指标
以下代码使用了一些常见的评估指标:平均绝对误差(MAE)、平均绝对百分比误差(MAPE)、均方误差(MSE)、均方根误差(RMSE)和决定系数(R²)来衡量模型预测的性能。这里我们将通过调用 sklearn.metrics
模块中的 mean_absolute_error
mean_absolute_percentage_error
mean_squared_error
root_mean_squared_error
r2_score
函数来对模型的预测效果进行评估。
mae = mean_absolute_error(targets, predictions)
print(f"MAE: {mae:.4f}")
mape = mean_absolute_percentage_error(targets, predictions)
print(f"MAPE: {mape * 100:.4f}%")
mse = mean_squared_error(targets, predictions)
print(f"MSE: {mse:.4f}")
rmse = root_mean_squared_error(targets, predictions)
print(f"RMSE: {rmse:.4f}")
r2 = r2_score(targets, predictions)
print(f"R²: {r2:.4f}")
MAE: 0.1287
MAPE: 50.5869%
MSE: 0.0306
RMSE: 0.1750
R²: 0.9676