股票量化实时行情接口WebSocket接入Python封装

Python做量化,如果是日内策略,需要更实时的行情数据,不然策略滑点太大,容易跑偏结果。

之前用行情网站提供的level1行情接口,实测平均更新延迟达到了6秒,超过10只股票并发请求频率过快很容易封IP。后面又尝试了买代理IP来请求,成本太高而且不稳定。

在Github上看到一个可转债的Golang高频T+0策略,对接的是WebSocket协议,拿来改了改,封装了一个Python版本的包,记录一下:

#!python3
# -*- coding:utf-8 -*-
import time
import datetime
import websocket
import zlib
import requests
import threading

# 行情订阅推送封装
class Construct:
    __token = ""
    __server_req_url = "http://jvquant.com/query/server?market=ab&type=websocket&token="
    __ws_ser_addr = ""
    __ws_conn = ""
    __lv1_field = ["time", "name", "price", "ratio", "volume", "amount", "b1", "b1p", "b2", "b2p", "b3", "b3p", "b4",
                   "b4p", "b5", "b5p", "s1", "s1p", "s2", "s2p", "s3", "s3p", "s4", "s4p", "s5", "s5p"]
    __lv2_field = ["time", "oid", "price", "vol"]

    def __init__(self, logHandle, token, onRevLv1, onRevLv2):
        if logHandle == "" or token == "" or onRevLv1 == "" or onRevLv2 == "":
            msg = "行情初始化失败:logHandle或token或onRevLv1或onRevLv2必要参数缺失。"
            print(msg)
            exit(-1)
        self.__log = logHandle
        self.__token = token
        self.__deal_lv1 = onRevLv1
        self.__deal_lv2 = onRevLv2
        self.__getSerAddr()
        self.__conn_event = threading.Event()
        self.th_handle = threading.Thread(target=self.__conn)
        self.th_handle.start()
        self.__conn_event.wait()

    def __getSerAddr(self):
        url = self.__server_req_url + self.__token
        try:
            res = requests.get(url=url)
        except Exception as e:
            self.__log(e)
            return
        if (res.json()["code"] == "0"):
            self.__ws_ser_addr = res.json()["server"]
            print("获取行情服务器地址成功:" + self.__ws_ser_addr)
        else:
            msg = "获取行情服务器地址失败:" + res.text
            self.__log(msg)
            exit(-1)

    def __conn(self):
        wsUrl = self.__ws_ser_addr + "?token=" + self.__token
        self.__ws_conn = websocket.WebSocketApp(wsUrl,
                                                on_open=self.__on_open,
                                                on_data=self.__on_message,
                                                on_error=self.__on_error,
                                                on_close=self.__on_close)
        self.__ws_conn.run_forever()
        self.__conn_event.set()
        self.__log("ws thread exited")

    def addLv1(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv1_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def addLv2(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv2_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def dealLv1(self, data):
        self.__deal_lv1(data)

    def dealLv2(self, data):
        self.__deal_lv1(data)

    def __on_open(self, ws):
        self.__conn_event.set()
        self.__log("行情连接已创建")

    def __on_error(self, ws, error):
        self.__log("行情处理error:", error)

    def __on_close(self, ws, code, msg):
        self.__log("行情服务未连接")

    def close(self):
        self.__ws_conn.close()

    def __on_message(self, ws, message, type, flag):
        # 命令返回文本消息
        if type == websocket.ABNF.OPCODE_TEXT:
            self.__log("Text响应:" + message)
        # 行情推送压缩二进制消息,在此解压缩
        if type == websocket.ABNF.OPCODE_BINARY:
            now = datetime.datetime.now()
            nStamp = time.mktime(now.timetuple())
            date = now.strftime('%Y-%m-%d')

            rb = zlib.decompress(message, -zlib.MAX_WBITS)
            text = rb.decode("utf-8")
            # self.__log("Binary响应:" + text)
            ex1 = text.split("\n")
            for e1 in ex1:
                ex2 = e1.split("=")
                if len(ex2) != 2:
                    continue
                code = ex2[0]
                hqs = ex2[1]
                if code.startswith("lv1_"):
                    code = code.replace("lv1_", "")
                    hq = hqs.split(",")
                    if len(hq) == len(self.__lv1_field):
                        hqMap = dict(zip(self.__lv1_field, hq))
                        timeStr = hqMap['time']
                        date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                        tStamp = int(time.mktime(date_obj.timetuple()))
                        if abs(tStamp - nStamp) <= 2:
                            self.__deal_lv1(code, hqMap)

                if code.startswith("lv2_"):
                    code = code.replace("lv2_", "")
                    hqArr = hqs.split("|")
                    for hq in hqArr:
                        hqEx = hq.split(",")
                        if len(hqEx) == len(self.__lv2_field):
                            hqMap = dict(zip(self.__lv2_field, hqEx))
                            timeEx = hqMap['time'].split('.')
                            if len(timeEx) == 2:
                                timeStr = timeEx[0]
                                date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                                tStamp = int(time.mktime(date_obj.timetuple()))
                                if abs(tStamp - nStamp) <= 2:
                                    self.__deal_lv2(code, hqMap)

引用地址:https://github.com/freevolunteer/bondTrader/blob/main/pyscript/jvUtil/HanqQing.py

订阅指令参考:https://jvquant.com/wiki.html#--9

原文地址:https://zhuanlan.zhihu.com/p/6059899873

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/913082.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Cursor的chat与composer的使用体验分享

经过一段时间的试用&#xff0c;下面对 Composer 与 Chat 的使用差别进行总结&#xff1a; 一、长文本及程序文件处理方面 Composer 在处理长文本时表现较为稳定&#xff0c;可以对长文进行更改而不会出现内容丢失的情况。而 Chat 在更改长的程序文件时&#xff0c;有时会删除…

MATLAB课程:AI工具辅助编程——MATLAB+LLMs

给出一些可能有用的方法辅助大家写代码。 方法一&#xff1a;MATLAB软件LLM (不太懂配置的同学们为了省事可以主要用这个方法) 方法一特别针对本门MATLAB教学课程&#xff0c;给出一种辅助ai工具的操作指南。MATLAB中可以安装MatGPT插件&#xff0c;该插件通过调用ChatGPT的API…

2.索引:SQL 性能分析详解

SQL性能分析是数据库优化中重要的一环。通过分析SQL的执行频率、慢查询日志、PROFILE工具以及EXPLAIN命令&#xff0c;能够帮助我们识别出数据库性能的瓶颈&#xff0c;并做出有效的优化措施。以下将详细讲解这几种常见的SQL性能分析工具和方法。 一、SQL 执行频率 SQL执行频率…

使用Go语言编写一个简单的NTP服务器

NTP服务介绍 NTP服务器【Network Time Protocol&#xff08;NTP&#xff09;】是用来使计算机时间同步化的一种协议。 应用场景说明 为了确保封闭局域网内多个服务器的时间同步&#xff0c;我们计划部署一个网络时间同步服务器&#xff08;NTP服务器&#xff09;。这一角色将…

深度学习经典模型之VGGNet

1 VGGNet 1.1 模型介绍 ​ VGGNet是由牛津大学视觉几何小组&#xff08;Visual Geometry Group, VGG&#xff09;提出的一种深层卷积网络结构&#xff0c;他们以7.32%的错误率赢得了2014年ILSVRC分类任务的亚军&#xff08;冠军由GoogLeNet以6.65%的错误率夺得&#xff09;和…

Android的BroadcastReceiver

1.基本概念&#xff1a;BroadCast用于进程间或者线程间通信 本质上是用Binder方法&#xff0c;以AMS为订阅中心&#xff0c;完成注册&#xff0c;发布&#xff0c;监听的操作。 2.简单实现的例子 package com.android.car.myapplication;import android.content.BroadcastRe…

分布式数据库中间件mycat

MyCat MyCat是一个开源的分布式数据库系统&#xff0c;它实现了MySQL协议&#xff0c;可以作为数据库代理使用。 MyCat(中间件)的核心功能是分库分表&#xff0c;即将一个大表水平分割为多个小表&#xff0c;存储在后端的MySQL服务器或其他数据库中。 它不仅支持MySQL&#xff…

Java多线程编程(四)- 阻塞队列,生产者消费者模型,线程池

目录&#xff1a; 一.阻塞队列 二.线程池 一.阻塞队列 1.阻塞队列是⼀种特殊的队列. 也遵守 "先进先出" 的原则 阻塞队列能是⼀种线程安全的数据结构, 并且具有以下特性&#xff1a; 1.1.当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素 1.…

深度剖析JUC中LongAdder类源码

文章目录 1.诞生背景2.LongAdder核心思想3.底层实现&#xff1a;4.额外补充 1.诞生背景 LongAdder是JDK8新增的一个原子操作类&#xff0c;和AtomicLong扮演者同样的角色&#xff0c;由于采用AtomicLong 保证多线程数据同步&#xff0c;高并发场景下会导致大量线程同时竞争更新…

大数据面试题--kafka夺命连环问

1、kafka消息发送的流程&#xff1f; 在消息发送过程中涉及到两个线程&#xff1a;一个是 main 线程和一个 sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给双端队列&#xff0c;sender 线程不断从双端队列 RecordAccumulator 中拉取…

树形结构数据

树形结构数据 树形结构数据是一种基础且强大的数据结构&#xff0c;广泛应用于计算机科学和软件开发的各个领域。它模拟了自然界中树的层级关系&#xff0c;通过节点和它们之间的连接来组织数据。在本文中&#xff0c;我们将深入探讨树形结构数据的概念、特点、类型以及它们在…

dell服务器安装ESXI8

1.下载镜像在官网 2.打开ipmi&#xff08;idrac&#xff09;&#xff0c;将esxi镜像挂载&#xff0c;然后服务器开机 3.进入bios设置cpu虚拟化开启&#xff0c;进入boot设置启动选项为映像方式 4..进入安装引导界面3.加载完配置进入安装 系统提示点击继 5.选择安装磁盘进行…

信息安全数学基础(46)域和Galois理论

域详述 定义&#xff1a; 域是一个包含加法、减法、乘法和除法&#xff08;除数不为零&#xff09;的代数结构&#xff0c;其中加法和乘法满足交换律、结合律&#xff0c;并且乘法对加法满足分配律。同时&#xff0c;域中的元素&#xff08;通常称为数&#xff09;在加法和乘法…

Windows端口占用/Java程序启动失败-进程占用的问题解决

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

Python酷库之旅-第三方库Pandas(204)

目录 一、用法精讲 951、pandas.IntervalIndex.values属性 951-1、语法 951-2、参数 951-3、功能 951-4、返回值 951-5、说明 951-6、用法 951-6-1、数据准备 951-6-2、代码示例 951-6-3、结果输出 952、pandas.IntervalIndex.from_arrays类方法 952-1、语法 952…

AndroidStudio-文本显示

一、设置文本的内容 1.方式&#xff1a; &#xff08;1&#xff09;在XML文件中通过属性&#xff1a;android:text设置文本 例如&#xff1a; <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.andr…

微星爆破弹ddr4wifi接线梳理研究

主板(微星爆破弹ddr4 wifi) mac用久了&#xff0c;windows的键盘都有点不习惯了。 理清了这些接口都是干啥的&#xff0c;接线就非常简单了。

机器视觉基础—双目相机

机器视觉基础—双目相机与立体视觉 双目相机概念与测量原理 我们多视几何的基础就在于是需要不同的相机拍摄的同一个物体的视场是由重合的区域的。通过下面的这种几何模型的目的是要得到估计物体的长度&#xff0c;或者说是离这个相机的距离。&#xff08;深度信息&#xff09…

【GPTs】EmojiAI:轻松生成趣味表情翻译

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 &#x1f4af;GPTs指令&#x1f4af;前言&#x1f4af;EmojiAI主要功能适用场景优点缺点 &#x1f4af;小结 &#x1f4af;GPTs指令 中文翻译&#xff1a; 此 GPT 的主要角色是为英文文本提供幽默…