『笔记』可扩展架构设计之消息队列

前言

众所周知,开发低耦合系统是软件开发的终极目标之一。低耦合的系统更加容易扩展,低耦合的模块更加容易复用,更易于维护和管理。我们知道,消息队列的主要功能就是收发消息,但是它的作用不仅仅只是解决应用之间的通信问题这么简单。消息队列作为常用的中间件,经常被用来对系统解耦,对模块解耦。增强系统的可扩展性和模块的可复用性。

除了对用于对系统、模块解耦,消息队列还有以下几种通途:

  • 服务异步处理
  • 流量控制
  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
  • 连接流计算任务和数据
  • 用于将消息广播给大量接收者

事物的存在总会有对立的一面,引入消息队列可能会带来延迟问题、产生数据不一致的问题、增加系统复杂度的问题等等。

EDA 架构之生产者与消费者模式

事件驱动架构(Event Driven Architecture, EDA)

EDA 架构原理

事件驱动架构由事件发起者和事件使用者组成。事件的发起者检测或感知事件,并以消息的形式来表示事件。它并不知道事件的使用者或事件引起的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

生产者-消费者模型

操作系统中常见的 EDA 架构就是生产者-消费者模型。消息队列常用来作为生产者和消费者之间的缓冲带,平衡生产者和消费者的处理能同时对服务进行解耦。有了这层缓冲带,生产者和消费者可能都不知道对方的存在。

生产者与消费者模式

以下为生产者-消费者模型的简单实现,(内存消息队列)

import time

from queue import Queue
from random import randint
from threading import Thread

class Producer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            productA = randint(0, 10)
            productB = randint(90, 100)
            print('Produce A「number」: {}, Produce B「number」: {}'.format(productA, productB))
            self.queue.put((productA, productB))
            time.sleep(2)

class Consumer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            # block=True, if queue is empty, block(阻塞)
            products_tuple = self.queue.get(block=True)
            print(f'Consume products: {products_tuple[0]} & {products_tuple[1]}')
            time.sleep(randint(0, 10))

def main():
    queue = Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)

    producer.start()
    consumer.start()

main()
"""
Produce A「number」: 8, Produce B「number」: 95
Consume products: 8 & 95
Produce A「number」: 4, Produce B「number」: 92
Consume products: 4 & 92
Produce A「number」: 9, Produce B「number」: 90
... """

基于ZeroMQ PubSub模式的观察者模式实例

ZeroMQ

PubSub模式

# publisher1.py
import time
import zmq

def publisher1():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")

    while True:
        count = 99

        while True:
            time.sleep(1)
            socket.send_string('publisher1 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher1()
# publisher2.py

import time
import zmq

def publisher2():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    while True:
        count = 1

        while True:
            time.sleep(1)
            socket.send_string('publisher2 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher2()
# subscriber1.py

import zmq

def subscriber1():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber1()
# subscriber2.py

import zmq

def subscriber2():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber2()

秒杀系统的架构设计与消息队列

某秒杀系统的主要处理步骤如下:

  • 风险控制
  • 库存锁定
  • 生成订单
  • 短信通知
  • 更新统计数据

使用消息队列进行异步处理

由于秒杀成功的关键取决于风险控制、库存锁定这两步骤,所以 server 端处理了这两步之后可以给 client 端返回结果了,后续的步骤可放入消息队列中异步执行。不一定要在秒杀请求中完成。集中资源处理关键步骤(同步),碎片时间(全部秒杀请求处理结束)处理次要步骤(异步)。

demo

使用消息队列进行流量控制(削峰)

秒杀开始后,将超过 server 端处理上限(短时间内)的秒杀请求放入消息队列中,后续有能力处理时再对消息队列中消费请求进行处理。对于超时的请求可以直接丢弃(秒杀失败)。

demo

参考

  • 大型网站技术架构
  • 什么是事件驱动架构
  • 为什么需要消息队列-极客时间
  • ZeroMQ
  • pyzmq

本文由博客群发一文多发等运营工具平台 OpenWrite 发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/488427.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据库高级查询【mysql】

数据库高级查询【数据库】 前言版权推荐数据库高级查询行转列统计数据 CASE WHEN 条件 THEN 结果1 ELSE 结果2 END数据库查询带排名建表查询带排名,排名连续查询带排名,排名不连续查询带排名,按行号 Mysql数据库函数常用函数最后 前言 2024-…

(四)图像的%2线性拉伸

环境:Windows10专业版 IDEA2021.2.3 jdk11.0.1 OpenCV-460.jar 系列文章: (一)PythonGDAL实现BSQ,BIP,BIL格式的相互转换 (二)BSQ,BIL,BIP存储格式的相互转换算法 (三…

SQLite中的原子提交(四)

返回:SQLite—系列文章目录 上一篇:SQLite数据库成为内存中数据库(三) 下一篇:SQLite使用的临时文件(二) 1. 引言 SQLite等事务数据库的一个重要特性 是“原子提交”。 原子提交意味着所有数据库都在…

【文献分享】WimPyDD 程序:用于计算 WIMP 直接检测信号的面向对象的 Python 代码

题目:WimPyDD: An object–oriented Python code for the calculation of WIMP direct detection signals 链接:DOI: 10.1016/j.cpc.2022.108342 Program Title: WimPyDD (first release: v1.6.1) CPC Library link to program files: https://doi.…

【哈希专题】【蓝桥杯备考训练】:星空之夜、模拟散列表、字符串哈希、四平方和、扫雷【已更新完成】

目录 1、星空之夜(usaco training 5.1) 2、模拟散列表(模板) 3、字符串哈希(模板) 4、四平方和(第七届蓝桥杯省赛C A组/B组 & JAVA B组/C组) 5、扫雷(Google Ki…

AOI检测是如何逐步渗透进半导体领域

欢迎关注GZH《光场视觉》 一直以来AOI检测都是制造业视觉检测系统产业的核心要素。 AOI检测技术应运而生的背景是:电子元件集成度与精细化程度高,检测速度与效率更高、检测零缺陷的发展需求。 在制造业视觉检测系统中下游应用领域中,AOI检测…

linux-开发板移植MQTT

将源码复制到共享文件夹 链接:https://pan.baidu.com/s/1kvvO-HhDMDXkQ_wlNtyW_A?pwd332i 提取码:332i 以下步骤教程里都写了,我这里边进行,方便大家对照 pc端 1.进入mqtt_lib, 解压open压缩包 2.按照教程复制这一句并运行&…

EAK研发制造片式厚膜高压电阻

用于制造的工艺是丝网印刷和模板印刷。使用的修整是磨料或激光。 使用的电阻材料是氧化钌浆料,阻值范围:-10GQ超出阻值范围可协商订货 阻值精度:士0.5% ~士10%(根据需要可生产士0.1%) 温度系数范围:25ppm/c~80ppmC(25C~105℃)其它要求可协商订货 最高工作温度:22…

ESCTF-密码赛题WP

*小学生的爱情* Base64解码获得flag *中学生的爱情* 社会主义核心价值观在线解码得到flag http://www.atoolbox.net/Tool.php?Id850 *高中生的爱情* U2FsdG开头为rabbit密码,又提示你密钥为love。本地toolfx密码工具箱解密。不知道为什么在线解密不行。 *大学生的爱情* …

各种排序介绍

1.排序的概念 排序 :所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 稳定性 :假定在待排序的记录序列中,存在多个具有相同的关键字的记录,若经过排…

UE4_旋转节点总结一

一、Roll、Pitch、Yaw Roll 围绕X轴旋转 飞机的翻滚角 Pitch 围绕Y轴旋转 飞机的俯仰角 Yaw 围绕Z轴旋转 飞机的航向角 二、Get Forward Vector理解 测试: 运行: 三、Get Actor Rotation理解 运行效果: 拆分旋转体测试一&a…

一文整合工厂模式、模板模式、策略模式

为什么使用设计模式 今天终于有时间系统的整理一下这几个设计模式了, 这几个真是最常用的,用好了它们,你就在也不用一大堆的if else 了。能更好的处理大量的代码冗余问题。 在我们的实际开发中,肯定会有这样的场景:我…

MySQL中char与varchar的区别

文章连接 : MySQL中char与varchar的区别:存储机制、性能差异 | 毛英东的个人博客 (maoyingdong.com) varchar和char在MySQL层的区别 根据MySQL的官方文档The CHAR and VARCHAR Types中的描述, varchar和char的区别主要有: 最大长度:char是2…

基于OneAPI+ChatGLM3-6B+FastGPT搭建LLM大语言模型知识库问答系统

搭建大语言模型知识库问答系统 部署OneAPI部署一个LLM模型部署嵌入模型部署FastGPT新建FastGPT对话应用新建 FastGPT 知识库应用 部署OneAPI 拉取镜像 docker pull justsong/one-api创建挂载目录 mkdir -p /usr/local/docker/oneapi启动容器 docker run --name one-api -d …

官宣了?百度将为苹果今年国行iPhone16、Mac和iOS18提供AI功能!

大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,所以创建了“AI信息Gap”这个公众号,专注于分享AI全维度知识…

Android视角看鸿蒙第九课-鸿蒙的布局

鸿蒙的四大布局 导读 前面八篇文章描述了鸿蒙app的配置文件,关于版本号,开发版本,桌面图标等等配置方式。从这一篇文章开始学习鸿蒙的UI使用方式。 前面我们学习到鸿蒙有ability和page的区分,ability类似Activity但又不完全一样…

目前国内体验最佳的AI问答助手:kimi.ai

文章目录 简介图片理解长文档解析 简介 kimi.ai是国内初创AI公司月之暗面推出的一款AI助手,终于不再是四字成语拼凑出来的了。这是一个非常存粹的文本分析和对话工具,没有那些东拼西凑花里胡哨的AIGC功能,实测表明,这种聚焦是对的…

ESCTF-Web赛题WP

0x01-初次见面-怦然心动:your name? 随便输入一个字 根据提示可以看到 我们需要看源代码 直接 搜索 secret 关键字或者 ESCTF flag ESCTF{K1t0_iS_S0_HAPPy} 0x02-小k的请求 更安全的传参 post 参数为ESCTF 值为 love 自己的ip 同时还有个要求 是需要从度娘转过来 https://www…

说说Loader和Plugin的区别?编写Loader,Plugin的思路?

文章目录 一、区别二、编写loader三、编写plugin参考文献 一、区别 前面两节我们有提到Loader与Plugin对应的概念,先来回顾下 loader 是文件加载器,能够加载资源文件,并对这些文件进行一些处理,诸如编译、压缩等,最终…

KDB+Q | D1 | 学习资源 基础数据类型

官网会是主要的学习资源:https://code.kx.com/q/ 中文教程可能读起来会快一点: https://kdbcn.gitee.io/ 参考了还不错的学习经验帖:https://www.jianshu.com/p/488764d42627 KDB擅长处理时序数据, KDB数据库是后端数据库&…