Python 全栈系列243 S2S flask_celery

说明

按现有的几个架构部件,构建数据流。

S = Redis Stream。这个可以作为缓冲队列和简单任务队列,速度非常快,至少是万条/秒的速度。
Q = RabbitMQ。这个作为任务队列,消息也主要是元数据。读速比较慢,但有一些特性,然后自带前端,作为任务队列比较合适。
M = Mongo。这个作为数据主库还是比较合适的。具有丰富的数据操作模式,同时性能也不错。
C = ClickHouse。这个特别适合作为任务数据库。因为列式存储的特性,其吞吐性能,简单统计功能甚至逼近了程序处理的速度。例如,存储10万条数据,大约也就3秒;统计900万数据某个字段的长度,时间也不到5秒。(过去在处理上,基本上按照100万条/秒来评估默认的程序处理能力)

RabbitMQ和Redis Streams都是流行的队列系统,用于处理消息传递任务,但它们在效率和应用场景上有所不同。
RabbitMQ是基于AMQP(高级消息队列协议)的开源消息代理,它提供了可靠的消息传递机制,能够保证消息的持久性,即使在发送或接收过程中出现故障也不会丢失消息。RabbitMQ适用于需要高可靠性和复杂路由策略的生产环境,特别是在分布式系统中,它能够很好地处理复杂的异步消息传递任务。
另一方面,Redis Streams是Redis 5.0版本引入的新特性,它提供了一个持久化的消息队列系统。Redis Streams的设计理念在于提供高性能的发布/订阅模型,尤其适合于即时消息处理场景。与RabbitMQ相比,Redis Streams在性能上具有优势,因为它利用了Redis本身的高性能特性,使得消息的读写速度非常快。
在效率方面,Redis Streams通常被认为比RabbitMQ更快,特别是在处理大量实时数据流时。这是因为Redis作为一个内存中的键值存储系统,本身就具有很高的读写速度,而Streams作为其一部分,也继承了这种高效性。此外,Redis Streams的无锁设计进一步提升了其性能,使得它在处理并发请求时更加高效。
然而,RabbitMQ在某些情况下可能更适合使用,特别是当需要处理复杂的异步任务、保证消息的顺序性以及实现可靠的消息传递时。RabbitMQ的这些特性使得它在金融、医疗等关键行业中被广泛应用。
总之,在选择RabbitMQ还是Redis Streams时,应考虑到具体的应用场景、性能需求和可靠性要求。如果追求极致的性能和实时性,Redis Streams可能是更好的选择;而如果需要更高的可靠性和复杂的路由功能,RabbitMQ可能更为合适。

本次目标是搭建一个worker,可以通过参数化方式,完成两个S间的流转。除了M和C之前一般不会直接流转,那么应该有 4*3 - 2 = 10 种组件间的流转。

内容

整体的实现逻辑顺序为:

  • 1 使用QManager完成S2S的动作(函数)
  • 2 将函数定义为celery task
  • 3 将flask-celery发布为systemd服务

1 S2S 函数

S2S应该是一种最常见的任务

首先是QManager, 这个是对RedisAgent进行封装和集成的对象,本质上是个二传手。

QManager 集成了:

  • 1 判断队列是否可以写入
  • 2 并行写入
  • 3 fetch和range两种方式取数
  • 4 删除消息
import requests as req 
class QManager:
    def __init__(self , batch_size = 1000, 
                        redis_agent_host = 'http://172.17.0.1:24021/',
                        redis_connection_hash =None,
                        q_max_len = 100000):
        self.batch_size = batch_size
        self.redis_agent_host = redis_agent_host
        self.redis_connection_hash = redis_connection_hash
        self.q_max_len = q_max_len

    def auto_connect(self, db_server_name):
        print('这里应该根据某个参数值,自动切换为合适的连接')

    def info(self):
        return req.post(self.redis_agent_host + 'info/',json = {'connection_hash':self.redis_connection_hash}).json()

    # redis没有提供命令来列出streams

    # def qname_list(self, stream_name = '*'):
    #     return req.post(self.redis_agent_host + 'info_stream/',json = {'stream_name':stream_name}).json()
    
    # 查看队列长度
    def stream_len(self, stream_name):
        cur_len_resp = req.post(self.redis_agent_host + 'len_of_queue/',
                                json ={'stream_name':stream_name,'connection_hash':self.redis_connection_hash}).json()
        return cur_len_resp['data']

    # 创建队列和分组
    def ensure_group(self, stream_name, group_name ='group1', start_point='0'):
        return req.post(self.redis_agent_host +'ensure_group/',json ={'stream_name':stream_name,
                                                                      'group_name':group_name,
                                                                      'start_point':start_point}).json()

    # 判断队列是否可以插入
    def _is_q_available(self,stream_name):
        cur_len = self.stream_len(stream_name)
        if cur_len + self.batch_size >=self.q_max_len:
            return False 
        else:
            return True 

    #  基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右
    def parrallel_write_msg(self,stream_name, data_listofdict = None, time_out = 30,
                            is_return_msg_id_list=False):
            
        resp_dict = req.post(self.redis_agent_host + 'batch_add_msg/',json ={'connection_hash':self.redis_connection_hash,
                                                                    'stream_name':stream_name,
                                                                    'msg_dict_list':data_listofdict,
                                                                    'maxlen':self.q_max_len,
                                                                    'is_return_msg_id_list':is_return_msg_id_list},
                                                            timeout=time_out).json()
        return resp_dict

    # 读取
    # 批量获取数据 get
    def xrange(self, stream_name, count = None):
        cur_count = count or self.batch_size 
        recs_resp = req.post(self.redis_agent_host + 'xrange/',
                json ={'connection_hash':self.redis_connection_hash, 
                        'stream_name':stream_name,
                        'count':cur_count}).json()
        return recs_resp
    # 批量获取数据 fetch
    def xfetch(self, stream_name, count = None,group_name = 'group1' , consumer_name = 'consumer1'):
        cur_count = count or self.batch_size

        return req.post(self.redis_agent_host + 'fetch_msg/',json = {'connection_hash':self.redis_connection_hash,
                                                                'stream_name':stream_name,
                                                                'group_name':group_name,
                                                                'consumer_name':consumer_name,
                                                                'count':cur_count}).json()

    # 批量删除消息
    def xdel(self,stream_name,mid_or_list =None):
        if len(mid_or_list):
            return req.post(self.redis_agent_host  + 'del_msg/',
                    json ={'connection_hash':self.redis_connection_hash, 
                            'stream_name':stream_name,
                            'mid_or_list':mid_or_list}).json()

    @staticmethod
    def extract_msg_id(some_msg_list):
        return [x['_msg_id'] for x in some_msg_list]

基于此,稍微修改就可以完成S2S的任务

按照边的方式,给到left和right的参数信息。使用这些信息分别初始化left和right的QManager。最后按照配置里的约定,执行n次同步。每次执行时,都会看下目标队列是否已满,若已满则放弃写入,否则执行写入,然后删除消息。

# local
cfg = {'target_q_max_len': 10,'source_read_batch_num':1,'target_write_batch_num':1,
        'source_redis_agent_host':'http://172.17.0.1:24021/','source_connection_hash':None,
        'target_redis_agent_host':'http://172.17.0.1:24021/','target_connection_hash':None,
        'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),
        'target_stream':'.'.join(['STREAM','test','test', 'stream_out'])
            }


# read
source_qm = QManager(batch_size =cfg['source_read_batch_num'],
                     redis_agent_host = cfg['source_redis_agent_host'],
                     redis_connection_hash = cfg['source_connection_hash']
                    )
# write
target_qm = QManager(batch_size =cfg['target_write_batch_num'],
                     redis_agent_host = cfg['target_redis_agent_host'],
                     redis_connection_hash = cfg['target_connection_hash']
                    )

# 确保队列的存在
if True:
    source_qm.ensure_group(cfg['source_stream'])
    target_qm.ensure_group(cfg['target_stream'])

'''
主逻辑:

- 1 判断目标队列是否满,如果是,那么直接退出
- 2 从源队列取数(采用xrange方法),如果没有数据,直接退出【每对stream之间,只会有一个 sniffer 】
- 3 将源队列数据写入目标队列
- 4 从源队列中删除这些数据

'''

print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))

for _ in range(cfg['max_exec_cnt']):
    if target_qm._is_q_available(cfg['target_stream']):
        print('target q ok')
        msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])
        msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']
        if len(msg_list) == 0:
            print('source q empty')
            break
        else:
            # 写入目标队列
            target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)
            # 将写入的消息从源队列删除
            to_del_msg_id_list = source_qm.extract_msg_id(msg_list)
            source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)

    else:
        break
    

2 Celery Task

然后将上述功能函数写入Flask-Celery

第一部分是在 celery的修饰器下,将任务函数搬进去。然后在app下定义了任务的调用,主要是用到了delay方法,实现异步调用。

# =======================以下是正式的内容
@celery_.task
def s2s_handler(cfg_dict = None):
    cfg = cfg_dict
    # read
    source_qm = QManager(batch_size =cfg['source_read_batch_num'],
                        redis_agent_host = cfg['source_redis_agent_host'],
                        redis_connection_hash = cfg['source_connection_hash']
                        )
    # write
    target_qm = QManager(batch_size =cfg['target_write_batch_num'],
                        redis_agent_host = cfg['target_redis_agent_host'],
                        redis_connection_hash = cfg['target_connection_hash']
                        )
    print('source q len ', source_qm.stream_len(cfg['source_stream']))
    print('target q len ', target_qm.stream_len(cfg['target_stream']))

    for _ in range(cfg['max_exec_cnt']):
        if target_qm._is_q_available(cfg['target_stream']):
            print('target q ok')
            msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])
            msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']
            if len(msg_list) == 0:
                print('source q empty')
                break
            else:
                # 写入目标队列
                target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)
                # 将写入的消息从源队列删除
                to_del_msg_id_list = source_qm.extract_msg_id(msg_list)
                source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)

        else:
            break

# 执行任务的路由 POST
@app.route("/s2s/", methods=['GET','POST'] )
def s2s():
    input_data = request.get_json()
    # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果
    result = s2s_handler.delay(input_data)
    return result.id

调用测试,存入一万条消息(之前还有70条残留),任务执行后,source_q中的数据将会逐渐流转到target_q

# debug - 样例数据写入源队列
data_listofdict = [{'msg_id': i, 'data':'test'} for i in range(10000)]
source_qm.parrallel_write_msg(cfg['source_stream'], data_listofdict= data_listofdict)

print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))

source q len  10070
target q len  230

import requests as req 
# 假设是发往本机: 注意,地址是127.0.0.1
cfg1 = {'target_q_max_len': 100000,'source_read_batch_num':1,'target_write_batch_num':1,
        'source_redis_agent_host':'http://127.0.0.1:24021/','source_connection_hash':None,
        'target_redis_agent_host':'http://127.0.0.1:24021/','target_connection_hash':None,
        'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),
        'target_stream':'.'.join(['STREAM','test','test', 'stream_out']),
        'max_exec_cnt':10}

resp = req.post('http://127.0.0.1:24104/s2s/',json = cfg1 )

# 返回任务号
In [9]: resp.text
Out[9]: '177e57b7-09c5-43f0-ae1f-0cbe8e41dbf5'

# 流转了10条消息
In [10]: print('source q len ', source_qm.stream_len(cfg['source_stream']))
    ...: print('target q len ', target_qm.stream_len(cfg['target_stream']))
source q len  10060
target q len  240

3 Systemd Service

由于服务是在宿主机启动的,而且是基础服务,所以使用systemd配置自启动。启动命令有点小坑,可参考 一次搞定 Linux systemd 服务脚本

本次要点就在于要用forking启动【采用sh脚本启动其他进程时Type须为forking】,因为要启动flask和celery两个服务才行。

[Unit]   
Description=test        # 简单描述服务
After=network.target    # 描述服务类别,表示本服务需要在network服务启动后在启动
Before=xxx.service      # 表示需要在某些服务启动之前启动,After和Before字段只涉及启动顺序,不涉及依赖关系

[Service] 
Type=forking            # 设置服务的启动方式
User=USER               # 设置服务运行的用户
Group=USER              # 设置服务运行的用户组
WorkingDirectory=/PATH  # 设置服务运行的路径(cwd)
KillMode=control-group  # 定义systemd如何停止服务
Restart=no              # 定义服务进程退出后,systemd的重启方式,默认是不重启
ExecStart=/start.sh     # 服务启动命令,命令需要绝对路径(采用sh脚本启动其他进程时Type须为forking)

[Install]   
WantedBy=multi-user.target  # 多用户

然后就好了
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/613574.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【激活函数--中】激活函数和阶跃函数的可视化及对比

文章目录 一、Python中绘制阶跃函数的图形二、实现和可视化Sigmoid函数2.1 Python实现2.2 可视化Sigmoid函数 三、比较Sigmoid函数与阶跃函数3.1 Sigmoid函数与阶跃函数的差异3.2 Sigmoid函数与阶跃函数的共同点 一、Python中绘制阶跃函数的图形 在Python中实现阶跃函数的代码…

智能AI个人名片小程序源码系统 带完整的安装代码包以及搭建部署教程

在当今数字化时代,个人名片不再仅仅是一张简单的纸质卡片,而是演变成了一种更加智能、便捷的数字化工具。为了满足这一需求,小编给大家分享一款智能AI个人名片小程序源码系统,该系统不仅提供了完整的安装代码包,还附带…

(41)5.6-5.8数据结构(栈和队列的应用和数组)

1.栈在括号匹配中的应用 #define _CRT_SECURE_NO_WARNINGS #define MaxSize 10 typedef struct { char data[MaxSize];//静态数组存放栈中元素 int top; //栈顶指针 }SqStack;//初始化栈 void InitStack(SqStack& S);//判断栈是否为空 bool StackEmpty(SqStack S…

轮式机器人简介

迄今为止,轮子一般是移动机器人学和人造交通车辆中最流行的运动机构。它可达到很高的效率, 如图所示, 而且用比较简单的机械就可实现它的制作。 另外,在轮式机器人设计中,平衡通常不是一个研究问题。 因为在所有时间里,轮式机器人一般都被设计成在任何时间里所有轮子均与地接…

vue3父子组件相互调用方法详解

💟 上一篇文章 Vue2中父子组件互相传值和方法调用 📝 系列专栏 vue从基础到起飞 目录 1、前言 2、子组件调用父组件方法(setup组合式) 2.1 父组件Father.vue 2.2 子组件Child.vue 3、父组件调用子组件方法(setup组…

每天五分钟计算机视觉:使用极大值抑制来寻找最优的目标检测对象

本文重点 在目标检测领域,当模型预测出多个候选框(bounding boxes)时,我们需要一种方法来确定哪些候选框最有可能表示真实的目标。由于模型的不完美性和图像中目标的重叠性,往往会有多个候选框对应于同一个目标。此时,极大值抑制(Non-Maximum Suppression,NMS)技术就…

租用香港Windows服务器要注意的几种安全防护措施

在网络世界里,永远没有绝对的安全,但我们可以通过采取适当的措施使风险降低。对于选择香港Windows服务器租赁的企业和个人来说,保护数据的安全性与隐私至关重要。下面将介绍几种关键的租用香港Windows服务器时应注意的安全防护措施。 1.使用本…

汽车线控转向系统介绍

汽车线控转向系统由方向盘总成、转向执行总成和主控制器(ECU)三个主要部分以及自动防故障系统、电源等辅助系统组成。 线控转向系统(Steering-By-Wire),取消了方向盘和转向车轮之间的机械连接部件,彻底摆脱了机械固件的限制,完全由电能来实现…

【qt】联合容器和集合容器

联合容器和集合容器 一.QMap1.应用场景2.添加数据3.删除数据4.修改数据5.查找数据6.数据个数7.是否包含8.返回所有的键名 二.QHash1.应用场景: 三.QMultiMap四.QMultiHash五.QSet1.应用场景2.交集3.并集4.差集 总结: 一.QMap 1.应用场景 QMap的底层实现…

智能座舱语音助手产品方案

一、用户调研与痛点分析 1.目标用户分析 用户画像 性别女性年龄50地域2-3线城市职业退休或退居二线教育中专、 大专、 本科财务家庭财务管理者爱好享受生活、 照顾家庭标签有闲有小钱二、产品定位与卖点提炼 购车目的 愉悦自我, 专属于自己的座驾: 家…

接口自动化入门: Requests请求头设置详解!

在进行接口自动化测试时,设置请求头是非常重要的一步。请求头可以包含各种信息,例如身份验证、内容类型、接受语言等。在实际的测试中,我们使用Python的Requests库来发送HTTP请求,并设置请求头来模拟不同的场景和需求。 下面将通…

【系统架构师】-案例篇(八)数据流图

数据流:数据流是系统中数据的流动,它可以是输入、输出或存储在系统中的数据。 数据处理过程:数据处理过程是对数据进行处理的单元,可以是一个物理设备或软件模块。 数据存储:数据存储是系统中存储数据的单元&#xff0…

共享云桌面如何助力企业信息化和数字化?

随着科技的飞速发展,信息化和数字化已经成为企业转型的重要方向。共享云桌面作为一种新兴的信息化工具,正以其独特的优势助力企业实现信息化和数字化的目标。本文将详细探讨共享云桌面如何助力企业信息化和数字化的过程,以及它所带来的效益。…

使用图网络和视频嵌入预测物理场

文章目录 一、说明二、为什么要预测?三、流体动力学模拟的可视化四、DeepMind神经网络建模五、图形编码六、图形处理器七、图形解码器八、具有不同弹簧常数的轨迹可视化九、预测的物理编码和推出轨迹 一、说明 这是一篇国外流体力学专家在可视化流体物理属性的设计…

vcomp140.dll丢失怎么修复,四种vcomp140.dll丢失的修复办法

vcomp140.dll文件丢失可能会导致一些程序无法正常运行。这些程序通常是使用Microsoft Visual Studio 2015开发的,并且依赖于该动态链接库文件来处理并行计算相关的功能。一旦vcomp140.dll文件丢失或损坏,这些程序在启动或执行特定任务时可能会遇到各种问…

视频批量剪辑神器,一键修改尺寸,轻松打造专业视觉盛宴!

视频已经成为我们生活中不可或缺的一部分。无论是制作精美的短视频,还是编辑专业的宣传片,视频剪辑都是一项必不可少的工作。然而,面对大量的视频素材,如何高效地进行批量剪辑,特别是修改视频尺寸,成为了许…

【大华可见光摄像头】ffmpeg获取视频流并下载mp4 报错‘subtype‘ 不是内部或外部命令,也不是可运行的程序

我现在要通过ffmpeg获取大华摄像头视频流并下载成mp4,但我在cmd窗口运行下面命令的时候,发现报错: D:\Java\ffmpeg\ffmpeg-master-latest-win64-gpl\bin\ffmpeg.exe -y -i rtsp://admin:123xxx.xxx.xxx.xxx/cam/realmonitor?channel1&s…

[Kubernetes] Istio on Kubernetes 实践

文章目录 1.Kubernetes 创建2.Istio 部署2.1 下载 Istio2.2 安装 Istio 3.Istio on Kubernetes 实践3.1 部署 Bookinfo 示例应用3.2 确定入站 IP 和端口 1.Kubernetes 创建 主机名内部ip外部ipmaster192.168.66.2139.198.36.40node1192.168.66.3139.198.1.192node2192.168.66.…

SG-PEG-SG能与许多生物分子如蛋白质和核酸等进行有效结合

【试剂详情】 英文名称 SG-PEG-SG 中文名称 聚乙二醇二琥珀酰亚胺戊二酸酯, 琥珀酰亚胺酯-聚乙二醇-琥珀酰亚胺酯 外观性状 由分子量决定,固体或者液体。 分子量 0.4k,0.6k,1k,2k,3.4k,5…

(已解决)org.springframework.amqp.rabbit.support.ListenerExecutionFailedException

报错截图 解决方案 1、登录rabbitMQ网址,删除所有队列 2、重启rabbitMQ 亲测有效!!!亲测有效!!!亲测有效!!!