源码地址
由于用的是tensflow,其实和pytorch差不多看懂就行(pytorch版本我还在改,有些难度)因为这里面的tcn结构是用的
别人的代码https://github.com/philipperemy/keras-tcn
首先说一下什么是TCN
首先我想说 我们预测股票价格的时候,我们是用以前的数据预测当前数据
所以我们TCN的概念就是让我们的网络通过卷积的方式使得我们的模型能够大量记忆我们之前的状态,来保证我们网络的预测能力。
它包含三个结构
因果卷积,膨胀卷积和残差链接。
因果卷积
很简单,一句话就是通过之前的状态预测现在的状态。但是我们希望每次进行卷积的时候,我们的原始层(也就是被卷积核卷积的那个时间序列)长度最好不变,因为我们用常规思维思考的时候,我们肯定会觉得想获得更多的以前数据,只需要增加序列长度就行了,那么假如这样你可能不知道参数是如何增长的。**好的,我的意思是想保证每次卷积的时候序列不变,但是能获得更多的以前数据我该怎么做?**这个时候就不得不提到膨胀卷积了
膨胀卷积
很简单,一句话就是更多的卷积层,来获得更多的以前数据,其实就是感受野打个比方就是第1层卷积后变成第二层,第二层就是第一层以前数据的浓缩,那么我们可以在通过改变卷积核的大小(添0)来让我们获得更多以前数据。
残差链接
就是防止梯度消失,说人话就是怕卷着卷着最开始的数据或者卷积过程中的数据没了,毕竟我们要卷积很多次。
实现
##环境安装
tushare 1.4.6
tensorflow 2.16.1
其他的都是直接安装
首先是获取数据集
这里用tushare的00001号股票做好的csv文件
代码如下先生成但是你得先取tushare里面找到自己的api替换到下面的xxx中
但是要先安装相应接口
pip install tushare
import torch
import tushare as ts
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch import nn
from torch.utils.data import TensorDataset
from tqdm import tqdm
# 替换成你的Tushare Pro Token
pro = ts.pro_api('xxx')#xxx替换为你的api
# 获取股票数据 (深圳交易所股票代码为 000001 的股票数据) 获取2013 7月11到2024 4月8号的
df = pro.daily(ts_code='000001.SZ', start_date='20130711', end_date='20240408')
# 确保trade_date列存在
if 'trade_date' in df.columns:
df.index = pd.to_datetime(df.trade_date) # 索引转为日期
df = df.iloc[::-1] # 数据倒序调整
# 保存为CSV文件
df.to_csv('000001_Daily_2013_2024.csv', index=False)
print("数据已保存为 '000001_SZ_20130711_20240408.csv'")
else:
print("数据中没有找到 'trade_date' 列,请检查数据结构!")
定义一些参数
window_size = 10 # 窗口大小(也就是时间步数)
batch_size = 32 # 训练批次大小
epochs = 30 # 训练轮数
filter_nums = 10 # filter数量 也就是输出通道的数量。
kernel_size = 4 # kernel大小卷积核大小
划分数据集
def get_dataset():
df = pd.read_csv('./keras-tcn/000001_Daily_2013_2024.csv')
scaler = MinMaxScaler()
open_arr = scaler.fit_transform(df['open'].values.reshape(-1, 1)).reshape(-1)
X = np.zeros(shape=(len(open_arr) - window_size, window_size))
label = np.zeros(shape=(len(open_arr) - window_size))
for i in range(len(open_arr) - window_size):
X[i, :] = open_arr[i:i+window_size]
label[i] = open_arr[i+window_size]
train_size = 2000
test_size = 611
train_X = X[:train_size, :]
train_label = label[:train_size]
test_X = X[train_size:train_size + test_size, :]
test_label = label[train_size:train_size + test_size]
print("train_X" + str(train_X.shape), "train_label_shape" + str(train_label.shape))
return train_X, train_label, test_X, test_label, scaler, open_arr
定义模型并训练保存参数
def build_model():
train_X, train_label, test_X, test_label, scaler, open_arr = get_dataset()
train_X = train_X.reshape(-1, window_size, 1) # 把输入变成TCN的维度输入[batchsize,时间步数,特征数] 因为是每个批次中一列数据是10个以前数据来预测今日数据所以我们只需要每10个时间步数的开盘价(open)就行
test_X = test_X.reshape(-1, window_size, 1) # 同理
model = keras.models.Sequential([
keras.layers.Input(shape=(window_size, 1)),
TCN(nb_filters=filter_nums, kernel_size=kernel_size, dilations=[1, 2, 4, 8]),
keras.layers.Dense(units=1, activation='relu')#输出只需要我们的预测值
])
model.summary()
model.compile(optimizer='adam', loss='mae', metrics=['mae'])
model.fit(train_X, train_label, validation_split=0.2, epochs=epochs)
model.save('my_model.h5') # 保存模型
prediction = model.predict(test_X)
scaled_prediction = scaler.inverse_transform(prediction.reshape(-1, 1)).reshape(-1)
scaled_test_label = scaler.inverse_transform(test_label.reshape(-1, 1)).reshape(-1)
print('RMSE ', RMSE(scaled_prediction, scaled_test_label))
plot_and_save(scaled_prediction, scaled_test_label)
return model, scaler, open_arr
利用模型预测股价
def load_model_and_predict():
model = keras.models.load_model('my_model.h5', custom_objects={'TCN': TCN, 'mae': tf.keras.losses.MeanAbsoluteError()})
_, _, _, _, scaler, open_arr = get_dataset()
# 预测明天的股价
last_window = open_arr[-window_size:] # 获取最近的window_size天的数据
last_window_scaled = scaler.transform(last_window.reshape(-1, 1)).reshape(1, window_size, 1)
next_day_prediction = model.predict(last_window_scaled)
next_day_price = scaler.inverse_transform(next_day_prediction.reshape(-1, 1)).reshape(-1)
print('Predicted price for the next day (loaded model):', next_day_price[0])
def predict_next_day():
model, scaler, open_arr = build_model()
# 预测明天的股价
last_window = open_arr[-window_size:] # 获取最近的window_size天的数据
last_window_scaled = scaler.transform(last_window.reshape(-1, 1)).reshape(1, window_size, 1)
next_day_prediction = model.predict(last_window_scaled)
next_day_price = scaler.inverse_transform(next_day_prediction.reshape(-1, 1)).reshape(-1)
print('Predicted price for the next day:', next_day_price[0])
全部代码
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tcn.tcn import TCN
from tensorflow import keras
import tensorflow as tf
window_size = 10 # 窗口大小(也就是时间步数)
batch_size = 32 # 训练批次大小
epochs = 30 # 训练轮数
filter_nums = 10 # filter数量 也就是输出通道的数量。
kernel_size = 4 # kernel大小卷积核大小
def get_dataset():
df = pd.read_csv('./keras-tcn/000001_Daily_2013_2024.csv')
scaler = MinMaxScaler()
open_arr = scaler.fit_transform(df['open'].values.reshape(-1, 1)).reshape(-1)
X = np.zeros(shape=(len(open_arr) - window_size, window_size))
label = np.zeros(shape=(len(open_arr) - window_size))
for i in range(len(open_arr) - window_size):
X[i, :] = open_arr[i:i+window_size]
label[i] = open_arr[i+window_size]
train_size = 2000
test_size = 611
train_X = X[:train_size, :]
train_label = label[:train_size]
test_X = X[train_size:train_size + test_size, :]
test_label = label[train_size:train_size + test_size]
print("train_X" + str(train_X.shape), "train_label_shape" + str(train_label.shape))
return train_X, train_label, test_X, test_label, scaler, open_arr
def RMSE(pred, true):
return np.sqrt(np.mean(np.square(pred - true)))
def plot(pred, true):
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(range(len(pred)), pred)
ax.plot(range(len(true)), true)
plt.show()
def build_model():
train_X, train_label, test_X, test_label, scaler, open_arr = get_dataset()
train_X = train_X.reshape(-1, window_size, 1) # 把输入变成TCN的维度输入[batchsize,时间步数,特征数] 因为是每个批次中一列数据是10个以前数据来预测今日数据所以我们只需要每10个时间步数的开盘价(open)就行
test_X = test_X.reshape(-1, window_size, 1) # 同理
model = keras.models.Sequential([
keras.layers.Input(shape=(window_size, 1)),
TCN(nb_filters=filter_nums, kernel_size=kernel_size, dilations=[1, 2, 4, 8]),
keras.layers.Dense(units=1, activation='relu')#输出只需要我们的预测值
])
model.summary()
model.compile(optimizer='adam', loss='mae', metrics=['mae'])
model.fit(train_X, train_label, validation_split=0.2, epochs=epochs)
model.save('my_model.h5') # 保存模型
prediction = model.predict(test_X)
scaled_prediction = scaler.inverse_transform(prediction.reshape(-1, 1)).reshape(-1)
scaled_test_label = scaler.inverse_transform(test_label.reshape(-1, 1)).reshape(-1)
print('RMSE ', RMSE(scaled_prediction, scaled_test_label))
plot_and_save(scaled_prediction, scaled_test_label)
return model, scaler, open_arr
def plot_and_save(pred, true, filename="plot.png"):
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(range(len(pred)), pred, label='Prediction')
ax.plot(range(len(true)), true, label='True')
ax.legend()
plt.xlabel("Time")
plt.ylabel("Value")
plt.title("Prediction vs True Values")
plt.savefig(filename)
plt.close(fig)
def predict_next_day():
model, scaler, open_arr = build_model()
# 预测明天的股价
last_window = open_arr[-window_size:] # 获取最近的window_size天的数据
last_window_scaled = scaler.transform(last_window.reshape(-1, 1)).reshape(1, window_size, 1)
next_day_prediction = model.predict(last_window_scaled)
next_day_price = scaler.inverse_transform(next_day_prediction.reshape(-1, 1)).reshape(-1)
print('Predicted price for the next day:', next_day_price[0])
def load_model_and_predict():
model = keras.models.load_model('my_model.h5', custom_objects={'TCN': TCN, 'mae': tf.keras.losses.MeanAbsoluteError()})
_, _, _, _, scaler, open_arr = get_dataset()
# 预测明天的股价
last_window = open_arr[-window_size:] # 获取最近的window_size天的数据
last_window_scaled = scaler.transform(last_window.reshape(-1, 1)).reshape(1, window_size, 1)
next_day_prediction = model.predict(last_window_scaled)
next_day_price = scaler.inverse_transform(next_day_prediction.reshape(-1, 1)).reshape(-1)
print('Predicted price for the next day (loaded model):', next_day_price[0])
if __name__ == '__main__':
build_model() # 训练并保存模型
load_model_and_predict() # 加载模型并预测
效果
拟合还不错
也是成功的可以取买股票了哈哈哈哈
后记
pytorch版本之后写好再发