Python 算法交易实验72 QTV200第一步: 获取原始数据并存入队列

说明

最近的数据流往前进了一步,我觉得基本可以开始同步的推进QTV200了。上次规划了整体的数据流,现在开始第一步。

内容

1 结构位置

这是上次的总体图:
在这里插入图片描述
以下是这次要实现的一小部分:

在这里插入图片描述
从结构上,这个是整体数据流的起点,系统因为这些不断 运行的数据才开始“动”了起来,可以称为源点。

2 规范与约束

源点是基于每分钟的节拍从外界读取数据,这部分目前我没用用付费接口(数据的需求量很小),所以基于自律(类似与吃自助餐)的原则,增加一些规范与约束。

  • 我获取的数据不会多,可以约束在60个ETF之内。
  • 每次请求只会查询当前时刻的前10分钟数据(数据少),每只ETF一天最多有60610 ~ 3600条数据
  • 每次请求10条的目的是为了防止某个时隙程序中断或者失效,通过冗余的数据可以在10分钟之内的终端内无缝恢复(从这角度,用某个云服务器做这件事比较合适,下一版考虑)
  • 任务按照秒进行划分,每秒最多提交6只ETF的请求,数据请求总量为60条
  • 周末一定不会发起查询请求

这样可以确保非必要不请求数据,即使请求数据,请求也被均匀分摊,每次的请求量非常之小(环保)

3 工具与方法

通过FLask-APS执行秒级的任务调度,通过Flask-Celery实现各ETF的异步抓取,确保时效的同时,减少CPU开销。(同步方式会独占一个核,很浪费的)。

有的时候倒也不纯粹是为了节约这点计算成本,而是总体成本。设想,一开始只跟踪3~4个ETF,同步状态下并发,可能抢占4个核一小会,还不至于出现卡顿(主机有32核)。但是如果跟踪60个ETF,那么整个机器就会因为这个原因处于卡顿状态,那就真的很没必要。

即使是现阶段,QTV102与Mongo通信的时候更新少量数据,但是是同步状态的,都让我的CPU负载处于一个很奇怪的状态。
在这里插入图片描述
虽然看着很满,其实我知道很多是处于浪费的状态的。

所以,如果用合适的方法进行调度,那么即使是60个ETF,甚至是600个ETF可能单核都足够了。这种效率的提升是很夸张的。目前在IO密集处理这块可以做到的提升最大。未来QTV200实现后,应该会把QTV102整个卸载掉。

3.1 worker

对于每一个ETF来说,处理的流程是相同的。所以,可以先做一个worker,然后调用的时候按不通的ETF代码进行参数化就可以了。

etf_crawl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandler

logger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)


# 1 允许传入一个参数
import argparse
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    # 制程名
    parser.add_argument('--etf_code')
    parser.add_argument('--assigned_work_dt')
    # 准备解析参数
    args = parser.parse_args()

    res_dict = {}
    res_dict['etf_code'] = args.etf_code
    res_dict['assigned_work_dt'] = args.assigned_work_dt
    return res_dict

arg_dict = get_arg()

etf_code = arg_dict['etf_code']
assigned_work_dt = arg_dict.get('assigned_work_dt')

from Basefuncs import * 
import time

# 2 判断是否是可执行时间
if assigned_work_dt is None:
    ts = time.time()
else:
    if assigned_work_dt.strip() == '':
        ts = time.time()
    else:
        ts = inverse_time_str(assigned_work_dt)

cur_dt_str = get_time_str1(ts)
cur_time = cur_dt_str.split()[-1]
 
morning_start = '09:25:00'
morning_end = '11:41:00'
afternoon_start = '12:55:00'
afternoon_end = '15:11:00'

is_moring_work = False 
is_afternoon_work = False
if cur_time >= morning_start and cur_time < morning_end:
    is_moring_work = True 
if cur_time >= afternoon_start and cur_time < afternoon_end:
    is_afternoon_work = True 

is_work_time = is_moring_work or is_afternoon_work


if is_work_time:
    start_dt = get_time_str1( (ts//60)*60 - 600 )
    end_dt = get_time_str1( ts + 60 )

    # 目标队列设置
    qm = QManager(redis_agent_host = 'http://192.168.0.4:24118/',redis_connection_hash = None)
    # qm.info()

    # 3 执行
    # etf_code = '510300'
    import akshare as ak 
    para_dict  ={}
    para_dict['symbol'] = etf_code
    para_dict['period'] = "1"
    para_dict['adjust'] = ''
    para_dict['start_date'] = start_dt
    para_dict['end_date'] = end_dt
    # 如果时间段不对,那么就是空
    df = ak.fund_etf_hist_min_em(**para_dict)

    # 是否获取到了数据
    is_query_data = True if len(df) else False 

    if is_query_data:
        # ak的变量字典映射
        ak_dict = {}
        ak_dict['时间'] = 'data_dt'
        ak_dict['开盘'] = 'open'
        ak_dict['收盘'] = 'close'
        ak_dict['最高'] = 'high'
        ak_dict['最低'] = 'low'
        ak_dict['成交量'] = 'vol'
        ak_dict['成交额'] = 'amt'

        keep_cols = ['data_dt','open','close','high','low','vol','amt']

        cols = list(df.columns)
        new_cols = [ak_dict.get(x) or x for x in cols ]
        df.columns = new_cols
        df1 = df[keep_cols]
        df1['data_source'] = 'AK'
        df1['code'] = etf_code
        df1['market'] = 'SH'
    
        df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
                                + '_' + df1['data_dt']

        # 调整股和手
        vol_approximal = df1['amt'] / df1['close']
        maybe_wrong = (vol_approximal / df1['vol']) > 10
        if maybe_wrong.sum() > 0:
            df1['vol'] = df1['vol'] * 100

        stream_name = 'YOURS.stream_in'
        # 写入结果队列
        data_listofdict = df1.to_dict(orient='records')
        resp = qm.parrallel_write_msg(stream_name, data_listofdict)
        logger.info('%s %s 【%s】' % (cur_dt_str,'etf_crawl_worker',resp['msg'] ))   

    else:
        logger.info('%s %s 【未获取到数据】' % (cur_dt_str,'etf_crawl_worker'))   

else:
    logger.info('%s %s 【不在工作时间】' % (cur_dt_str,'etf_crawl_worker'))        

worker分为几部分:

  • 1 设定使用rotate日志,记录每次执行的效果
  • 2 get_arg 获取调用时传入的关键字参数
  • 3 判断是否在工作时间。默认情况,使用当前时间;也可接受使用指定的时间
  • 4 如果是在工作时间,那么推算对应时间的前10分钟和后1分钟,作为参数发起请求
  • 5 获取数据后,还有一个判断交易量是手还是股的小逻辑
  • 6 处理完成后,推入队列,然后记录日志,worker执行完毕

两种调用方法:

  • 1 实时获取
python3 etf_crawl_worker.py --etf_code=510300
  • 2 指定历史时间的获取
python3 etf_crawl_worker.py --etf_code=510300 --assigned_work_dt='2024-06-21 10:00:00'

获取日志查看,第三条数据是因为指定了工作时间。

/var/log/workers.log

└─ $ cat workers.log
2024-06-23 20:27:07 etf_crawl_worker 【不在工作时间】
2024-06-23 20:40:41 etf_crawl_worker 【不在工作时间】
2024-06-23 20:57:48 etf_crawl_worker 【ok,add 12 of 12  messages】

3.2 shell

exe_etf_crawl_worker.sh
chmod +x exe_etf_crawl_worker.sh

本来想让脚本可以接受第二个参数的,但是似乎隔空传带空格的参数很麻烦,另外实时调用(脚本)的时候没有必要再筛选时间了。

#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etf_crawl_worker.py --etf_code=$1

3.3 flask_celery

将flask_celery升级为可执行脚本的版本

In [1]: import requests as req
   ...: param_dict = {'the_cmd': 'bash /home/test_exe.sh'}
   ...: resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )

In [2]: resp
Out[2]: <Response [200]>

└─ $ cat tem.log
2024-06-23 22:16:25 - 脚本执行

3.4 将任务发布为flask_aps任务

任务参数如下

# 任务6:执行脚本-qtv200 510300 get 
task006 = {}
task006['machine'] = 'm4'
task006['task_id'] = 'task006'
task006['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取510300的数据。在秒0执行'
task006['pid'] = '.'.join([task006['machine'],task006['task_id']  ])
task006['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task006['set_to_status'] = 'running'
task006['running_status'] = ''
task006['start_dt'] = '2024-05-01 00:00:00'
task006['end_dt'] = '2099-06-01 00:00:00'
task006['task_kwargs'] = {'para_dict': 
                                {'url':'http://172.17.0.1:24104/exe_sh/',
                                 'json_data':
                                    {
                                    'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 510300'
                                    }

                                }
                            }
task006['interval_para'] ={'second':'0',
                            'day_of_week':'0-4',
                            'hour':'9-16'}
task006 = TaskTable(**task006)
task006.save()

ok,明天等着看吧

In [11]: the_task_obj = TaskTable.objects(machine='m4',task_id ='task006').first()
    ...: exe_a_task(the_task_obj)
set to status running  current state  init
Publish a task
Out[11]: 1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/737469.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

每日AI资讯-20240622

1. 可灵AI全新功能上线&#xff01; 可灵AI全新功能上线&#xff01;图生视频和视频续写来啦&#xff01; 图生视频&#xff1a;上传任意图片&#xff0c;生成5秒精彩视频。支持添加提示词控制图像运动视频续写&#xff1a;对生成视频一键续写4&#xff5e;5秒&#xff0c;支持…

LeetCode:经典题之1491、896 题解与延伸

系列目录 88.合并两个有序数组 52.螺旋数组 567.字符串的排列 643.子数组最大平均数 150.逆波兰表达式 61.旋转链表 160.相交链表 83.删除排序链表中的重复元素 389.找不同 1491.去掉最低工资和最高工资后的工资平均值 896.单调序列 206.反转链表 92.反转链表II 141.环形链表 …

基于uni-app和图鸟UI开发上门服务小程序

一、技术栈选择 uni-app&#xff1a;我们选择了uni-app作为开发框架&#xff0c;因为它基于Vue.js&#xff0c;允许我们编写一次代码&#xff0c;发布到多个平台&#xff0c;包括iOS、Android、Web以及各种小程序。uni-app的丰富组件库、高效的状态管理以及便捷的预览调试功能&…

LightGBM算法详解

LightGBM算法详解 LightGBM&#xff08;Light Gradient Boosting Machine&#xff09;是由微软开发的高效梯度提升决策树&#xff08;GBDT&#xff09;实现。它以速度和效率著称&#xff0c;特别适用于大规模数据集和高维特征的场景。本文将详细介绍LightGBM的原理、特点、常用…

用于世界上最先进的医疗应用的精密电阻器

EAK的高性能电阻器使医疗产品设计人员能够继续改善全球患者的生活质量。我们的电阻器专为用于医疗诊断、治疗和预防的各种产品而设计。从小型植入式和非侵入性设备到大型诊断成像设备&#xff0c;医疗制造商之所以选择EAK 电阻器&#xff0c;是因为操作环境是高电压和磁场&…

AI-算力产业链之存力

在数字经济大潮下&#xff0c;数据已经成为新型的生产资料。 目前数据中心有三大力量&#xff1a;计算的力量——算力、存储的力量——存力、运输的力量——运力&#xff0c;即网络的力量。 算力产业链正在火热发展的同时&#xff0c;存力的需求也大幅度提升。2023年上半年&…

总结 CSS 选择器的常见用法

一&#xff0c;什么是css 在前端网页中&#xff0c;css就相当于化妆术&#xff0c;把一个很生硬的网页页面变得排版有序起来。 CSS可以对网页中的元素位置进行像素级精准控制&#xff0c;实现美化页面的效果&#xff0c;也能做到页面的样式和结构分离。 二&#xff0c;css的基…

MySQL中的ibd2sdi—InnoDB表空间SDI提取实用程序

ibd2sdi 是一个用于从 InnoDB 表空间文件中提取序列化字典信息&#xff08;Serialized Dictionary Information, SDI&#xff09;的实用程序。这个实用程序可以用于提取存储在持久化 InnoDB 表空间文件中的 SDI 数据。 可以对以下类型的表空间文件使用 ibd2sdi&#xff1a; 每…

消息认证码解析

1. 什么是消息认证码 消息认证码(Message Authentication Code)是一种确认完整性并进行认证的技术&#xff0c;取三个单词的首字母&#xff0c;简称为MAC。 消息认证码的输入包括任意长度的消息和一个发送者与接收者之间共享的密钥&#xff0c;它可以输出固定长度的数据&#x…

C语言之详解预处理

前言&#xff1a; 预处理也叫预编译&#xff0c;是编译代码时的第一步&#xff0c;经过预处理后生成一个.i文件&#xff0c;如果不明白编译与链接作用的小伙伴可以先看看博主的上一篇博客—— &#xff0c;不然知识连贯性可能会显得很差哦。 正文目录&#xff1a; 预定义符号#…

discuz迪恩cul!教育课程培训网站模板

Discuz x3.2模板 迪恩cul!教育课程培训 GBK&#xff0c;程序包中内附详细的安装教程&#xff0c;下载后按照教程安装即可 discuz迪恩cul!教育课程培训网站模板

qemu 安装ubuntu22.04虚拟机 -纯命令行-可ssh-带网络-编译安装 linux kernel-编译安装 kernel module

tar -xjf xxx.tar.bz2 1&#xff0c;预备系统盘数据 1.1 下载光盘 注意需要 liver-server $ wget https://releases.ubuntu.com/22.04.4/ubuntu-22.04.4-live-server-amd64.iso 1.2 挂载并拷贝 $ sudo mkdir /mnt/iso_ubuntu-22.04.4-live-server-amd64 $ sudo mount u…

星闪指向遥控,做家电交互的破壁人

“面壁者罗辑&#xff0c;我是你的破壁人。” 科幻小说《三体》中&#xff0c;当人类的基础科学被三体人封锁&#xff0c;变得停步不前&#xff0c;人类启动了自救的面壁计划&#xff0c;通过一次又一次破壁&#xff0c;找到战胜三体人的办法。 现实中&#xff0c;有一点已经成…

html--好看的手机充值单页

<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>线上充值-首页</title><meta content"widthdevice-width,initial-scale1.0,maximum-scale1.0,user-scalable0" name"viewport&…

【Vue-Vben-Admin】1、初次运行和介绍

【Vue-Vben-Admin】1、初次运行和介绍 Vben-Admin 初次运行和介绍 小小的介绍规定版本文件树安装依赖运行项目 小小的介绍 一款 Vue3 Typescript4 Vite2 后台管理项目&#xff0c;功能挺多的&#xff0c;还有组件库 规定版本 此个人文档规定版本为 2.8.0&#xff0c;可能版本…

AI大模型企业应用实战(16)-langchain核心组件

1 stuff 将文档列表插入到提示词中&#xff0c;适合文档较小或少量文档的应用。 2 refine 通过循环输入文档并迭代更新答案来构建响应&#xff0c;一次只传递给LLM一个文档&#xff0c;适合LLM上下文大小不能容纳的小文档。 参考&#xff1a; https://js.langchain.com/v0.1…

QT中利用qss来创建一个圆角矩形窗口,并利用Qt::WA_TranslucentBackground属性解决留白问题

1、效果 2、实现 QWidget#centralwidget {border-radius: 30px solid default;border-image: url(:/images/bk<

【Golang - 90天从新手到大师】Day06 - 数组

系列文章合集 Golang - 90天从新手到大师 数组是golang中最常用的一种数据结构,数组就是同一类型数据的有序集合 定义一个数组 格式: var name [n]type n为数组长度,n>0 且无法修改,type为数组的元素类型如: var a [2]int上面的例子定义了一个长度为2,元素类型为int的数组…

MySQL数据库(三):读取数据库数据

上一节&#xff0c;我们介绍了数据库的基本操作&#xff0c;以及最后演示了如何使用库来连接数据库&#xff0c;在实际应用中&#xff0c;我们通常需要按照指定的条件对数据库进行操作&#xff0c;即增删改查操作&#xff0c;这是非常重要的&#xff01;这一节我们继续通过一个…

Open3D 点云FPS最远点下采样

目录 一、概述 二、代码实现 2.1实现原理 2.2完整代码 三、实现效果 3.1原始点云 3.2采样后点云 3.3数据对比 一、概述 最远点采样&#xff08;Farthest Point Sampling, FPS&#xff09;是一种有效的下采样方法&#xff0c;用于从大量点云数据中选择具有代表性的子集。…