Python 算法交易实验74 QTV200第二步(改): 数据清洗并写入Mongo

说明

之前第二步是打算进入Clickhouse的,实测下来有一些bug
在这里插入图片描述
可以看到有一些分钟数据重复了。简单分析原因:

  • 1 起异步任务时,还是会有两个任务重复的问题,这个在同步情况下是不会出现的
  • 2 数据库没有upsert模式。clickhouse是最近刚应用的库,我还没有完善其操作模式。

解决思路:

  • 1 既然采用了异步,就没有办法去控制其前置的依赖和顺序,否则就会退回到同步状态。而且从效率上,n次异步IO的cpu开销,可能也只相当于1次的同步开销。可以认为,异步是更轻松,但是更’粗心’的工作状态。所以在设计上,如果每次的操作都是“无害”的,那么就没问题。这里的数据同步任务,最重要的是不重不漏,所以只要能够确保数据不重不漏即可。
  • 2 每次负责crawl的worker不直接操作数据库是对的,这可以避免过多的数据库操作开销。在同步结果的队列中,每个周期执行一次Mongo操作是完全没问题的。同步队列中可以有一些冗余的数据,在整合数据时就删除了。剩余的部分,可以直接采用upsert的方式存入。

结论:使用Mongo作为第一个数据节点的持久化。

反思点:

  • 1 对于数据的集成,可能还是Mongo更合适。因为不必事先定义表结构,而且之前做了一些开发,Mongo的操作方式非常完善。擅长在记录级数据的复杂度操作。
  • 2 clickhouse更适合用于在我的数据系统中直接输出的数据,特别是空间数据,按UCS方式规范。擅长在块级别数据的效率操作。

内容

1 目标数据库准备

采用m4.24086

很巧,QTV102的数据也在这里,所以QTV200的数据可以继续放在这里 。

回顾一下WMongo的操作,有好一阵子没用了。

from Basefuncs import *
# analysis 
target_server = 'm4.24086'
# machine_name = 'm4'
machine_name = get_machine_name()

# 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
try:
    target_w = from_pickle(target_server)
    color_print('【Loading target_w】from pickle')
except:
    w = WMongo('w')
    target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
    to_pickle(target_w, target_server)

有一些设计是好的,只要给出目标服务器名称,对象就会自动寻找合适的连接方式(local、lan、wan)来完成连接,对应的连接保存为本地文件。之后可以考虑通过GlobalBuffer来简化判断,还有neo4j来存储和管理关系。

进入队列的字段名,不允许有 _msg_id 字段
Wmongo_v9000.012
设置当前连接 local
>>> Switching To Mymeta
设置当前连接 local
在CN001访问mymeta,通用
当前机器的名称: m4
1.当前使用的MongeAgent:http://172.17.0.1:24011/
2.Tier1:meta, Tier2:servers
3.ConnectionHash:e8d1bc791049988d89465d5ce24d993b
4.FilterDict:{'my_server_pkey': 'm4.24086'}
5.Limits:1
6.Sort:
7.Skip:0
>>> Hit Records
当前机器的局网: my.cn001
【I】目标服务的机器:m4, 目标服务的机器局网:my.cn001
【I】采用local方式连接目标主机
Wmongo_v9000.012
设置当前连接 local
获取已有连接
target connection hash: d35632b63b77f17d4d12808fb707cb1f
data save to pickle:  ./m4.24086.pkl

然后就可以通过对象操作了

target_w.cname_recs()
{'data': {'QTV102': {'log_monitor': 264276,
   'log_sniffer': 792827,
   'log_worker': 264276,
   'stats': 5177,
   'step1_mongo_in': 2895114,
   'step1_mongo_meta': 2895114,
   'step1_mongo_out': 2895114},
  'QTV102_Capital_Data': {'capital_daily': 32787},
  'QTV102_Model_Signal': {'log_monitor': 264277,
   'log_sniffer': 792827,
   'log_worker': 0,
   'stats': 5218,
   'step1_mongo_in': 2436678,
   'step1_mongo_meta': 0,
   'step1_mongo_out': 16860560},
  'QTV102_Strategy': {'strategy_online': 58,
   'trade_orders': 128,
   'trade_strategy': 23},
  'QuantData001': {'log_monitor': 264276,
   'log_sniffer': 792830,
   'log_worker': 264276,
   'stats': 5178,
   'step1_mongo_in': 460297,
   'step1_mongo_meta': 460297,
   'step1_mongo_out': 460297},
  'QuantData_510500': {'log_monitor': 264277,
   'log_sniffer': 792831,
   'log_worker': 264278,
   'stats': 5178,
   'step1_mongo_in': 657702,
   'step1_mongo_meta': 657702,
   'step1_mongo_out': 657702},
  'SmartQuant_512660': {'log_monitor': 264277,
   'log_sniffer': 792830,
   'log_worker': 0,
   'stats': 5219,
   'step1_mongo_in': 460297,
   'step1_mongo_meta': 0,
   'step1_mongo_out': 450098},
  'Strategy_512660': {'capitals': 261,
   'monthly_report': 66,
   'orders': 130,
   'slog': 430056,
   'summary_report': 124,
   'yearly_report': 8},
  'test_for_mongo_engine': {'user': 2}},
 'msg': 'ok',
 'status': True}

很早前随便做的一版,看起来业务效果还是不错的。这部分内容,以后就不必放在mongo,在clickhouse里一个查询就好了。
在这里插入图片描述

仍然(在逻辑上)设置表的结构为:qtv200.market_data,需要的索引有:

  • 1 pid: 主键。这个是确定的主键,对后续的基础操作来说是必须的。
  • 2 UCS(shard、part、block、brick): 管理块级数据的键,在后续的块级任务来说非常重要。
  • 3 code: 业务筛选字段
  • 4 ts: 时间,排序字段

mongo方便之处就在于:当你的逻辑明确了,建立好索引,一切就好了

# 主键 pkey
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'pid')
# UCS
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'shard')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'part')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'block')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'brick')
# 业务
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'code')
# 排序
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'ts')

Out[4]: {'data': {'ts_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}

在这里插入图片描述

改造1:修改获取最大最小值的部分 etl_worker

变的简单了,不需要关心数据库里有什么,只要把当前有重复的pid去掉就可以了

...
    data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

    keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
    data_df2 =data_df1[keep_cols1].drop_duplicates(['pid'])

    output_df = data_df2
    output_data_listofdict = output_df.to_dict(orient='records')
    output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)
    for some_data_listofdict in output_data_listofdict2:
        qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

在脚本里做相应修改

#conda init
conda activate base

cd  /home/workers && python3 etl_worker_mongo.py

改造2:修改入库的部分 s2mongo

暂时先以脚本方式执行,不固化到接口中。

现在可以采用一些更好的方式来初始化队列。

from Basefuncs import * 
import logging
from logging.handlers import RotatingFileHandler
def get_logger(name , lpath = '/var/log/' ):
    logger = logging.getLogger(name)
    fpath = lpath + name + '.log'
    handler = RotatingFileHandler(fpath , maxBytes=100*1024*1024, backupCount=10)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

logger = get_logger('etf_raw_data')

# IO
machine_host = '192.168.0.4'
source_redis_agent_host = f'http://{machine_host}:24118/'

stream_cfg = StreamCfg(q_max_len = 1000000, batch_size = 10000, redis_agent_host = source_redis_agent_host)
qm = QManager(**stream_cfg.dict())
# qm.info()

# analysis 
target_server = 'm4.24086'
target_w = from_pickle(target_server)
# machine_name = 'm4'
# machine_name = get_machine_name()
# # 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
# try:
#     target_w = from_pickle(target_server)
#     color_print('【Loading target_w】from pickle')
# except:
#     w = WMongo('w')
#     target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
#     to_pickle(target_w, target_server)
# target_w.cname_recs()

# Name
ss_name = 'xxx'
t_tier1 = 'xxx'
t_tier2 = 'xxx'

keep_cols =['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
num_cols = ['open','close','high','low', 'vol','amt','ts']
# Process 
## 1 read source -- 这里本身也可以用pydantic 
ss_data_list = qm.xrange(ss_name)['data']
if len(ss_data_list):
    ss_data_df0 = pd.DataFrame(ss_data_list)
    msg_id_list = list(ss_data_df0['_msg_id'])
    ss_data_df = ss_data_df0[keep_cols].dropna()
    for the_col in num_cols:
        ss_data_df[the_col] = ss_data_df[the_col].apply(float)
    # 写入mongo
    resp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = ss_data_df.to_dict(orient='records'), key_name ='pid')
    qm.xdel(ss_name,msg_id_list)
    logger.info(get_time_str1() + 'efl_s2mongo insert recs %s' % len(ss_data_df))
else:
    logger.info(get_time_str1() + 'efl_s2mongo insert not recs')

以上,规定了几部分。

  • 1 IO部分。队列和数据的handler现在通过pydantic的对象,可以非常简洁的定义。然后约定好入队列和目标数据库表的必要信息。
  • 2 处理。主要就是将需要保留的字段,以及需要转数值的字段明确。然后就是读取,保留,转换,插入,最后删除。

在测试中,就一次的数据反复插了几次,数据是不会重复的。

在这里插入图片描述
对应的日志可以看到一开始插入过n次,后面加入了定时任务,然后就转入运行了

└─ $ cat /var/log/etf_raw_data.log
2024-06-29 18:38:27efl_s2mongo insert recs 12
2024-06-29 18:40:53efl_s2mongo insert recs 12
2024-06-29 18:41:31efl_s2mongo insert recs 12
2024-06-29 18:41:42efl_s2mongo insert recs 12
2024-06-29 18:43:27efl_s2mongo insert recs 12
2024-06-29 18:44:31efl_s2mongo insert recs 12
2024-06-29 18:46:49efl_s2mongo insert recs 12
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:31efl_s2mongo insert not recs
2024-06-29 18:48:01efl_s2mongo insert not recs
2024-06-29 18:48:31efl_s2mongo insert not recs
2024-06-29 18:49:01efl_s2mongo insert not recs
2024-06-29 18:49:02efl_s2mongo insert not recs
2024-06-29 18:49:31efl_s2mongo insert not recs
2024-06-29 18:49:32efl_s2mongo insert not recs
2024-06-29 18:50:01efl_s2mongo insert not recs
2024-06-29 18:50:02efl_s2mongo insert not recs
2024-06-29 18:50:31efl_s2mongo insert not recs
2024-06-29 18:50:32efl_s2mongo insert not recs
2024-06-29 18:51:01efl_s2mongo insert not recs
2024-06-29 18:51:02efl_s2mongo insert not recs
2024-06-29 18:51:31efl_s2mongo insert not recs
2024-06-29 18:51:32efl_s2mongo insert not recs
关于定时任务

我偷了个懒,就是把这脚本和etl脚本放在一起。这两个任务被绑在一起串行了。主要是懒的再去定一个定时任务。

└─ $ cat exe_qtv200_etl_worker.sh
#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etl_worker_mongo.py
cd  /home/workers && python3 etf_raw_data_s2mongo.py

对于后续其他的etl,每一个还是应该另起一个任务,这样才能利用异步来确保多个etf的数据及时获取。

【调整完毕】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/757797.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

除了重塑千行百业,生成式AI还能改善运动健康

飞速发展的生成式AI与大模型技术,不但正在重塑千行百业,而且还能有效改善人们的运动健康。 生成式AI技术应用的挑战 随着生活品质的不断提升,人们对于健康问题也越来越重视。作为一家以“AI重塑健康与美”为使命的AI数字健康解决方案提供商&a…

langchain学习总结

大模型开发遇到的问题及langchain框架学习 背景: 1、微场景间跳转问题,无法实现微场景随意穿插 2、大模型幻读(推荐不存在的产品、自己发挥) 3、知识库检索,语义匹配效果较差,匹配出的结果和客户表述的…

解决指南:如何应对错误代码 0x80070643

在使用Windows操作系统过程中,用户可能会遭遇各种错误代码,其中错误 0x80070643是比较常见的一种。这个错误通常在安装更新或某些软件时发生,尤其是在微软的Windows Defender或其他Microsoft安全产品以及.NET Framework更新过程中更为常见。本…

动画重定向——当给一个人物模型用别人物的动画时,会遇到人物与动画不匹配问题,怎么解决呢?

每日一句:实践出真知,试错方确信 目录 最开始我想的原因! 分析一下动画相关参数 Animator组件参数详解: 人物模型的导入设置参数: Skinned Mesh Renderer组件详解: Skinned Mesh Renderer工作原理 设置Skinned …

【吴恩达深度学习笔记系列】Logistic Regression 【理论】

Binary Classification: Logistic Regression: y ^ σ ( w T x b ) \hat{y}\sigma{(w^T xb)} y^​σ(wTxb) using sigmoid function σ 1 1 e − z \sigma \frac{1}{1e^{-z}} σ1e−z1​. 【torch.sigmoid(x)】 Sigmoid ( x ) 1 1 e − x \text{Sigmoid}(x)\frac{1}{…

nginx优势以及应用场景,编译安装和nginx

一. Nginx是什么? 1. Nginx概述 高性能、轻量级Web服务软件系统资源消耗低对HTTP并发连接的处理能力高单台物理服务器可支持30,000~50,000个并发请求Nginx(发音同 “engine x”)是一个高性能的反向代理和Web服务器软件&#xff0c…

【05】从0到1构建AI生成思维导图应用 -- 前端交互实现

【05】从0到1构建AI生成思维导图应用 – 前端交互实现 大家好!最近自己做了一个完全免费的AI生成思维导图的网站,支持下载,编辑和对接微信公众号,可以在这里体验:https://lt2mind.zeabur.app/ 上一章:http…

【C++】初识C++(一)

一.什么是C C语言是结构化和模块化的语言,适合处理较小规模的程序。对于复杂的问题,规模较大的程序,需要高度 的抽象和建模时,C语言则不合适。为了解决软件危机, 20世纪80年代, 计算机界提出了OOP(object o…

Mathematica训练课(46)-- 一些常用的画图函数

在前面的课程中,我们已经梳理了Plot的画图用法,今天就详细梳理一下其他的画图函数用法; 1. 画一条直线 2. Circle(圆) 3. Disk(圆盘) 4. 画出一个矩形 5. 箭头

itext生成pdf文件demo示例

需求 在PDF文件中植入一些信息(pdf模版) 制作模版 可以看到下面红色箭头标注位置,这都是我们需要动态写入数据的表单域,可以使用wps等工具来制作 点击编辑表单,可以给对应空间添加表单域,表单域名称是ke…

ic基础|功耗篇04:门级低功耗技术

大家好,我是数字小熊饼干,一个练习时长两年半的IC打工人。我在两年前通过自学跨行社招加入了IC行业。现在我打算将这两年的工作经验和当初面试时最常问的一些问题进行总结,并通过汇总成文章的形式进行输出,相信无论你是在职的还是…

UE5材质之HLSL:深度

UE4/5的Custom节点:在VScode使用HLSL(新手入门用)_vscode写hlsl-CSDN博客 效果: 材质节点: 自定义节点代码: float3 rayStepViewDir*-1; float4 inputTexTexture2DSample(TexObject,TexObjectSampler,uv)…

AGPT•intelligence:带你领略全新量化交易的风采

随着金融科技的快速发展,量化交易已经成为了投资领域的热门话题。越来越多的投资者开始关注和使用量化交易软件来进行投资决策。在市场上有许多量化交易软件可供选择。 Delaek,是一位资深的金融科技专家,在 2020年成立一家专注于数字资产量化…

【第三方JSON库】org.json.simple用法初探—Java编程【Eclipse平台】【不使用项目管理工具】【不添加依赖解析】

本文将重点介绍,在不使用项目管理工具,不添加依赖解析情况下,【第三方库】JSON.simple库在Java编程的应用。 JSON.simple是一种由纯java开发的开源JSON库,包含在JSON.simple.jar中。它提供了一种简单的方式来处理JSON数据和以JSO…

计算机类主题会议推荐之——AIIIP 2024

【ACM出版 |IEEE&ACM院士、CCF杰出会员担任组委| 往届会后4个月检索 】 第三届人工智能与智能信息处理国际学术会议(AIIIP 2024) 2024 3rd International Conference on Artificial Intelligence and Intelligent Information Processing 中国-天…

QT QSlider控件-主介绍 触发函数常用函数

QSlider控件是Qt库中用于提供一个可拖动滑块以选择数值或范围的界面元素。它广泛应用于需要用户进行数值调节的场景,如音量控制、亮度调整等。 一、QAbstractSlider的6个信号量触发函数: 1、void actionTriggered (int action): 当滑块上的某个可定义动…

EXCEL 复制后转置粘贴

nodepad 转置参考: https://editor.csdn.net/md/?articleId140014651 1. WPS复制后转置粘贴 复制-》右键-》顶部第一行-》粘贴行列转置,如下图: 2. Excel office365 本地版 2. Excel office365 在线版

module java.base does not “opens java.lang“ to unnamed module

目录 原因:解决方法:方法一:方法二:方法三: SpringBoot项目运行报如下错误 Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make protected final java.lang.Class java.lang.ClassLoader.def…

关于组织赴俄罗斯(莫斯科)第 28 届国际汽车零部件、汽车维修设备和商品展览会商务考察的通知

关于组织赴俄罗斯(莫斯科) 第 28 届国际汽车零部件、汽车维修设备和商品展览会商务考察的通知 展会名称:俄罗斯(莫斯科)第 28 届国际汽车零部件、汽车零部件、汽车维修设备和商品展览会 时间:2024 年 8 月…

仓库管理系统19--盘存管理

原创不易,打字不易,截图不易,多多点赞,送人玫瑰,留有余香,财务自由明日实现 1、什么是盘存 盘存也叫盘库,盘库是指对一个仓库、库房或者商店的库存进行全面清点和核对的过程。在盘库过程中&am…