1. 系统概述
这是一个基于Streamlit和AKShare的股票数据查询系统,提供了便捷的股票数据查询和可视化功能。系统支持按板块筛选股票、多股票代码查询、数据导出等功能。
1.1 主要功能
- 股票代码直接输入查询
- 按板块筛选和选择股票
- 历史数据和实时行情查询
- 财务报表数据获取
- 新闻公告展示
- 数据导出Excel
1.2 技术栈
- 前端框架: Streamlit
- 数据源: AKShare
- 数据处理: Pandas
- 数据导出: OpenPyXL
2. 系统架构
2.1 模块组织
stock.py
├── 数据获取模块
│ ├── get_sector_list() # 获取板块列表
│ ├── get_sector_stocks() # 获取板块成分股
│ └── get_stock_data() # 获取股票详细数据
├── 数据处理模块
│ └── save_to_excel() # 数据导出Excel
└── 主界面模块
└── main() # 主程序入口
2.2 状态管理
使用Streamlit的Session State管理以下状态:
st.session_state:
├── stock_codes_input # 股票代码输入
├── selected_stocks # 已选择的股票列表
├── sector_stocks_df # 板块成分股数据
├── start_date # 开始日期
├── end_date # 结束日期
└── reset_sector # 板块重置标志
3. 核心功能实现
3.1 板块股票选择
# 获取板块列表
sector_list = get_sector_list()
sector_name = st.selectbox("选择板块", options=sector_list)
# 获取板块成分股
if sector_name:
sector_stocks = get_sector_stocks(sector_name)
selected_stocks = st.multiselect(
"选择要查看的股票",
options=sector_stocks['代码'].tolist(),
format_func=lambda x: f"{x} ({sector_stocks[sector_stocks['代码']==x]['名称'].iloc[0]})"
)
3.2 数据获取
def get_stock_data(stock_codes, sector_name, start_date, end_date):
results = {}
# 获取个股历史数据
stock_hist = ak.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=start_date,
end_date=end_date
)
# 获取实时行情
stock_real = ak.stock_zh_a_spot_em()
# 获取财务数据
financial_data = ak.stock_financial_report_sina(
stock=code,
symbol="资产负债表"
)
return results
3.3 界面布局
使用Streamlit的列布局实现左右分栏:
left_column, right_column = st.columns([1, 3]) # 1:3的宽度比例
with left_column:
# 查询参数输入区
st.header("查询参数")
...
with right_column:
# 数据显示区
st.markdown("### 数据显示")
...
4. 使用流程
-
选择板块
- 从下拉列表选择股票板块
- 系统自动获取并显示板块成分股列表
-
选择股票
- 从成分股列表中选择感兴趣的股票
- 选中的股票代码自动添加到输入框
- 板块选择自动重置为空
-
设置查询参数
- 设置查询日期范围
- 可以手动添加或修改股票代码
-
获取数据
- 点击"获取数据"按钮
- 系统获取并显示所选股票的详细数据
- 可以下载数据到Excel文件
5. 数据展示
5.1 成分股列表
- 使用表格展示完整的成分股信息
- 支持多选操作
- 实时显示已选股票列表
5.2 股票数据
- 历史行情数据
- 实时市场数据
- 财务报表数据
- 相关新闻公告
5.3 新闻展示
- 新闻标题和发布时间双列布局
- 支持点击标题跳转到新闻详情
- 按时间顺序排列
6. 注意事项
-
数据刷新
- 实时数据每次查询都会更新
- 历史数据基于选择的日期范围
-
性能优化
- 使用缓存减少重复API调用
- 批量处理多个股票的数据请求
-
错误处理
- 对API调用异常进行捕获和提示
- 确保数据完整性和显示正确性
7. 后续优化方向
-
数据分析
- 添加技术分析指标
- 实现数据可视化图表
-
用户体验
- 添加数据加载进度条
- 优化大量数据的显示效果
-
功能扩展
- 添加自选股票组合
- 实现数据监控告警
- 支持更多类型的数据导出
8. 依赖安装
pip install streamlit
pip install akshare
pip install pandas
pip install openpyxl
9. 运行方式
streamlit run stock.py
10. 代码示例
import streamlit as st
import akshare as ak
import pandas as pd
from datetime import datetime, timedelta
import io
@st.cache_data(ttl=3600) # 缓存板块数据1小时
def get_sector_list():
"""获取所有板块名称列表"""
try:
sector_data = ak.stock_board_industry_name_em()
sector_list = sector_data['板块名称'].tolist()
return [''] + sector_list # 添加空选项作为默认值
except Exception as e:
st.error(f"获取板块列表时出错: {str(e)}")
return ['']
def get_sector_stocks(sector_name):
"""获取板块内的所有股票"""
try:
# 获取板块成分股
stocks = ak.stock_board_industry_cons_em(symbol=sector_name)
return stocks
except Exception as e:
st.error(f"获取板块成分股时出错: {str(e)}")
return pd.DataFrame()
def get_stock_data(stock_codes, sector_name, start_date, end_date):
"""获取股票数据"""
results = {}
# 1. 获取股票数据
if stock_codes:
for code in stock_codes:
try:
# 获取个股历史数据
stock_hist = ak.stock_zh_a_hist(symbol=code, period="daily",
start_date=start_date, end_date=end_date)
results[f"股票{code}历史数据"] = stock_hist
# 获取个股实时行情
stock_real = ak.stock_zh_a_spot_em()
stock_info = stock_real[stock_real['代码'] == code]
results[f"股票{code}实时行情"] = stock_info
# 获取财务报表数据
try:
financial_data = ak.stock_financial_report_sina(stock=code, symbol="资产负债表")
results[f"股票{code}财务数据"] = financial_data
except:
st.warning(f"获取股票{code}财务数据失败")
except Exception as e:
st.error(f"获取股票{code}数据时出错: {str(e)}")
# 2. 获取板块数据
if sector_name:
try:
# 获取板块数据
sector_data = ak.stock_board_industry_name_em()
sector_data = sector_data[sector_data['板块名称'] == sector_name]
results["板块数据"] = sector_data
# 获取板块成分股列表
sector_stocks = get_sector_stocks(sector_name)
if not sector_stocks.empty:
results["板块成分股列表"] = sector_stocks
except Exception as e:
st.error(f"获取板块数据时出错: {str(e)}")
# 3. 获取指数数据
try:
# 获取上证指数数据并按照时间筛选
sh_index = ak.stock_zh_index_daily(symbol="sh000001")
sh_index['日期'] = pd.to_datetime(sh_index['date']) # 确保日期格式一致
start_datetime = pd.to_datetime(start_date)
end_datetime = pd.to_datetime(end_date)
sh_index = sh_index[
(sh_index['日期'] >= start_datetime) &
(sh_index['日期'] <= end_datetime)
]
if not sh_index.empty:
results["上证指数数据"] = sh_index
else:
st.warning("选定时间范围内没有上证指数数据")
# 获取深证指数数据并按照时间筛选
sz_index = ak.stock_zh_index_daily(symbol="sz399001")
sz_index['日期'] = pd.to_datetime(sz_index['date'])
sz_index = sz_index[
(sz_index['日期'] >= start_datetime) &
(sz_index['日期'] <= end_datetime)
]
if not sz_index.empty:
results["深证指数数据"] = sz_index
else:
st.warning("选定时间范围内没有深证指数数据")
except Exception as e:
st.error(f"获取指数数据时出错: {str(e)}")
# 4. 获取新闻公告
try:
news_data = ak.stock_news_em()
# 确保新闻数据不为空且包含必要的列
if not news_data.empty and '新闻标题' in news_data.columns and '新闻链接' in news_data.columns and '发布时间' in news_data.columns:
# 只保留最新的20条新闻
news_data = news_data.head(20)
results["新闻公告"] = news_data
else:
st.warning("暂无新闻数据")
except Exception as e:
st.error(f"获取新闻数据时出错: {str(e)}")
return results
def save_to_excel(data_dict):
"""将数据保存到Excel文件"""
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
for sheet_name, df in data_dict.items():
if not df.empty:
df.to_excel(writer, sheet_name=sheet_name[:31]) # Excel sheet名称最大31字符
output.seek(0)
return output
def main():
st.set_page_config(layout="wide") # 设置宽屏模式
# 创建左右两列布局
left_column, right_column = st.columns([1, 3]) # 1:3的宽度比例
# 初始化session state
if 'stock_codes_input' not in st.session_state:
st.session_state.stock_codes_input = ""
if 'selected_stocks' not in st.session_state:
st.session_state.selected_stocks = []
if 'sector_stocks_df' not in st.session_state:
st.session_state.sector_stocks_df = None
if 'start_date' not in st.session_state:
st.session_state.start_date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d")
if 'end_date' not in st.session_state:
st.session_state.end_date = datetime.now().strftime("%Y%m%d")
if 'reset_sector' not in st.session_state:
st.session_state.reset_sector = False
# 左侧边栏:输入参数
with left_column:
st.header("查询参数")
# 股票代码输入
stock_codes_input = st.text_input(
"股票代码(用空格分隔)",
value=st.session_state.stock_codes_input,
help="沪市股票代码以60、68开头,如:600000(浦发银行)、688001(华兴源创)\n"
"深市股票代码以00、30开头,如:000001(平安银行)、300059(东方财富)、301469(德视佳)\n"
"多个股票代码用空格分隔,例如:600000 000001 300059",
key="stock_input"
)
stock_codes = [code.strip() for code in stock_codes_input.split() if code.strip()]
# 板块名称下拉选择
sector_list = get_sector_list()
sector_index = 0 if st.session_state.reset_sector else None
sector_name = st.selectbox(
"选择板块",
options=sector_list,
index=sector_index, # 如果需要重置,则选择第一个空选项
help="选择要查询的板块,不选则不获取板块数据",
key="sector_selector"
)
# 重置标志
st.session_state.reset_sector = False
# 日期选择
col1, col2 = st.columns(2)
with col1:
start_date = st.date_input(
"开始日期",
datetime.strptime(st.session_state.start_date, "%Y%m%d")
)
st.session_state.start_date = start_date.strftime("%Y%m%d")
with col2:
end_date = st.date_input(
"结束日期",
datetime.strptime(st.session_state.end_date, "%Y%m%d")
)
st.session_state.end_date = end_date.strftime("%Y%m%d")
# 如果选择了板块,立即获取成分股列表
if sector_name and sector_name != sector_list[0]:
with st.spinner("正在获取板块成分股..."):
sector_stocks = get_sector_stocks(sector_name)
if not sector_stocks.empty:
st.session_state.sector_stocks_df = sector_stocks
# 在右侧显示成分股列表
with right_column:
st.markdown("### 板块成分股列表")
# 创建两列:一列用于显示表格,一列用于显示选中的股票信息
table_col, info_col = st.columns([2, 1])
with table_col:
# 添加多选框
selected_stocks = st.multiselect(
"选择要查看的股票",
options=sector_stocks['代码'].tolist(),
format_func=lambda x: f"{x} ({sector_stocks[sector_stocks['代码']==x]['名称'].iloc[0]})",
key="stock_selector"
)
# 当选择股票时,立即更新股票代码输入框并重置板块选择
if selected_stocks != st.session_state.selected_stocks:
st.session_state.selected_stocks = selected_stocks
# 将新选择的股票代码添加到现有代码中
current_codes = set(stock_codes)
new_codes = set(selected_stocks)
all_codes = current_codes.union(new_codes)
st.session_state.stock_codes_input = " ".join(sorted(all_codes))
# 设置重置板块选择的标志
st.session_state.reset_sector = True
st.experimental_rerun()
st.dataframe(sector_stocks, use_container_width=True)
with info_col:
if selected_stocks:
st.markdown("#### 已选股票")
selected_df = sector_stocks[sector_stocks['代码'].isin(selected_stocks)]
for _, row in selected_df.iterrows():
st.markdown(f"- {row['代码']} ({row['名称']})")
# 添加分隔线
st.markdown("---")
# 获取数据按钮
if st.button("获取数据", use_container_width=True, key="query_button"):
if not stock_codes and not sector_name:
st.warning("请至少输入一个股票代码或选择板块")
return
with st.spinner("正在获取数据..."):
# 获取数据
results = get_stock_data(stock_codes, sector_name, st.session_state.start_date, st.session_state.end_date)
# 生成Excel文件下载按钮
if results:
excel_data = save_to_excel(results)
st.download_button(
label="下载Excel文件",
data=excel_data,
file_name=f"stock_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True
)
# 在右侧显示数据
with right_column:
# 显示所有数据
for name, df in results.items():
if not df.empty:
if name == "新闻公告":
st.markdown("### 新闻公告")
# 为新闻添加可点击的链接
col1, col2 = st.columns([7, 3])
with col1:
st.write("新闻标题")
with col2:
st.write("发布时间")
for _, row in df.iterrows():
col1, col2 = st.columns([7, 3])
with col1:
st.markdown(f"- [{row['新闻标题']}]({row['新闻链接']})")
with col2:
st.write(row['发布时间'])
else:
st.markdown(f"### {name}")
st.dataframe(df, use_container_width=True)
st.markdown("---")
if __name__ == '__main__':
main()