目录
写在前面
下载源码并解压
创建python项目
环境
过程
编译vnpy_ctp源码
验证可用性
写在前面
window系统中必须安装有Visual Studio ,后面源码安装时需要进行C++编译
下载源码并解压
GitHub - vnpy/vnpy_ctp: VeighNa框架的CTP交易接口
下载zip压缩包
解压
要在python中能执行,要有.pyd文件,解压后的文件夹内没有.pyd文件
创建python项目
新建一个python项目,项目在一个新的虚拟环境中执行。
环境
操作系统:window10 64位
开发工具:pycharm
python版本:python3.8.10
过程
编译vnpy_ctp源码
打开项目下方的terminal面板
cd 到解压后setup.py 所在文件夹
执行 python setup.py build
大概2到3分钟,编译完毕,在setup.py所在文件夹下多出一个build文件夹
在build下找到与操作系统和python版本对应的文件夹,以本文为例,操作系统是64位,那文件夹名称中就会有amd64,python版本3.8,文件夹名称中就会包含3.8,所以本文的文件夹名为lib.win-amd64-3.8
我们需要的是 build>lib.win-amd64-3.8>vnpy_ctp文件夹下的api文件夹,我们把api文件夹复制到项目目录下
验证可用性
创建Md调用的类和Td调用的类
import datetime,sys,os,time,pytz
from api import (
TdApi,
MdApi
)
class CtpMdApi(MdApi):
def __init__(self)->None:
super().__init__()
self.reqid:int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.date.today().strftime('%Y%m%d')
pass
def connect(self, address: str, userid: str, password: str, brokerid: str)->None:
self.userid = userid
self.password = password
self.brokerid = brokerid
if not self.connect_status:
con_body_path = CTP_MD_CON_DIR
con_body_path = con_body_path.replace('/','\\')
self.createFtdcMdApi(con_body_path)
self.registerFront(address)
self.init()
self.connect_status = True
pass
pass
def login(self)->None:
ctp_req: dict = {
'UserID':self.userid,
'Password':self.password,
'BrokerID':self.brokerid
}
self.reqid += 1
self.reqUserLogin(ctp_req,self.reqid)
pass
def subscribe(self,req:dict):
if self.login_status:
self.subscribeMarketData(req['symbol'])
self.subscribed.add(req['symbol'])
pass
def close(self)->None:
if self.connect_status:
self.exit()
pass
def update_date(self)->None:
self.current_date = datetime.date.today().strftime('%Y%m%d')
pass
def onFrontConnected(self)->None:
self.login()
pass
def onFrontDisconnected(self,reason:int)->None:
self.login_status = False
pass
def onRspUserLogin(self,data:dict,error:dict,reqid:int,last:bool)->None:
if not error['ErrorID']:
self.login_status = True
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
pass
else:
print(f"行情服务器登录失败。{error['ErrorID']}.{error['ErrorMsg']}")
pass
def onRspError(self, error: dict, reqid: int, last: bool)->None:
print('行情接口报错。', error['ErrorID'], error['ErrorMsg'])
pass
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
if not error or not error['ErrorID']:
return
print('行情订阅失败。',error['ErrorID'],error['ErrorMsg'])
pass
def onRtnDepthMarketData(self,data:dict)->None:
if not data['UpdateTime']:
return
print('tick返回',data['InstrumentID'],data['LastPrice'])
pass
pass
class CtpTdApi(TdApi):
def __init__(self)->None:
super().__init__()
self.reqid: int = 0
self.order_ref: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.auth_status: bool = False
self.login_failed: bool = False
self.auth_failed: bool = False
self.contract_inited: bool = False
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.auth_code: str = ""
self.appid: str = ""
self.frontid: int = 0
self.sessionid: int = 0
pass
def connect(self,address:str,userid:str,password:str,brokerid:str,auth_code:str,appid:str)->None:
self.userid = userid
self.password = password
self.brokerid = brokerid
self.auth_code = auth_code
self.appid = appid
if not self.connect_status:
con_body_path = CTP_TD_CON_DIR + self.userid + os.path.sep
if not os.path.exists(con_body_path):
os.mkdir(con_body_path)
con_body_path = con_body_path.replace('/','\\')
self.createFtdcTraderApi(con_body_path)
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
self.registerFront(address)
self.init()
self.connect_status = True
pass
else:
self.authenticate()
pass
def authenticate(self)->None:
if self.auth_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"BrokerID": self.brokerid,
"AuthCode": self.auth_code,
"AppID": self.appid
}
self.reqid += 1
self.reqAuthenticate(ctp_req, self.reqid)
pass
def login(self)->None:
if self.login_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid,
"AppID": self.appid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
pass
def close(self)->None:
if self.connect_status:
self.exit()
pass
def onFrontConnected(self)->None:
if self.auth_code:
self.authenticate()
else:
self.login()
pass
def onFrontDisconnected(self,reason:int)->None:
self.login_status = False
pass
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool)->None:
if not error['ErrorID']:
self.auth_status = True
self.login()
else:
self.auth_failed = True
print('交易服务器验证失败。',error['ErrorID'],error['ErrorMsg'])
pass
def onRspUserLogin(self,data: dict, error: dict, reqid: int, last: bool)->None:
if not error["ErrorID"]:
self.frontid = data["FrontID"]
self.sessionid = data["SessionID"]
self.login_status = True
# 自动确认结算单
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqSettlementInfoConfirm(ctp_req, self.reqid)
else:
self.login_failed = True
print("交易服务器登录失败", error['ErrorID'], error['ErrorMsg'])
pass
def onRspSettlementInfoConfirm(self,data: dict, error: dict, reqid: int, last: bool)->None:
while True:
self.reqid += 1
n: int = self.reqQryInstrument({}, self.reqid)
if not n:
break
else:
time.sleep(1)
pass
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
print(data['ProductClass'],data['InstrumentID'],data['ProductID'],reqid,last)
if last:
self.contract_inited = True
print('合约信息查询完毕')
pass
pass
执行代码
if __name__ == '__main__':
investorid = ""
brokerid=""
password= ""
appid= ""
auth_code= "0000000000000000"
md_ip= "180.168.146.187:10211"
trader_ip= "180.168.146.187:10201"
temp_api = CtpTdApi()
address = f"tcp://{trader_ip}"
temp_api.connect(address,investorid,password,brokerid,auth_code,appid)
import keyboard
keyboard.wait('esc')
sys.exit()
pass
能登录td服务器,并正常查询合约,可用