【MQ02】基础简单消息队列应用

基础简单消息队列应用

在上一课中,我们已经学习到了什么是消息队列,有哪些消息队列,以及我们会用到哪个消息队列。今天,就直接进入主题,学习第一种,最简单,但也是最常用,最好用的消息队列模式。

最简单的队列功能

最简单的队列功能,无非就是将我们在数据结构与算法中学过的那个队列结构,变成一个外部功能组件。让各种语言和各种应用程序都可以通过这个队列来进行数据操作。这样的一个队列系统就称之为“消息队列中间件”。

一般,我们会将生产消息的程序,或者说,将数据放入到队列的一方称为 P (生产者,Producer);然后将队列称为Q(Queue);最后,将守候在队列前,等待从队列中获取数据的应用、程序或者代码段称为 C(消费者/客户端,Consumer)。

27021c06f480119d7c4c2696a2109768.png

这是 RabbitMQ 官网手册上的图,后面的相关图示我们也将直接使用它们的。在这个图中,有字母的部分就不多解释了。中间红色的一格一格的部分代表的就是 Q 。

通常来说,P 只管将数据放到 Q 中,Q 负责中间存储数据,然后 C 取出数据。数据的进出方式遵循经典数据结构队列中的规则,也就是 FIFO 先进先出。

是不是就和我们之前说过的一样,最简单的理解,它就是将队列这个数据结构抽成了一个第三方组件。这样就可以跨平台、跨应用、跨语言地进行数据存取了。

RebbitMQ 实现

好了,先来看 RabbitMQ 的实现。你需要先安装好 RabbitMQ ,我这里是使用的 Docker 安装的。

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

然后,也要安装好 PHP 的 Composer 组件,用于操作 RabbitMQ 消息队列,PHP 需要开启 sockets 扩展。

composer require php-amqplib/php-amqplib

5672 是 RabbitMQ 的服务端口,15672 则是它自带的一个管理工具的访问端口。具体的内容大家可以到官方文档中进行更加深入的学习。当然,也可以使用虚拟机方式来搭建测试环境,这个大家看自己的喜好吧。

接下来,我们先实现 P 端的代码,也就是生产者向消息队列中添加数据。

// 2.rq.p.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道

// 定义队列
$channel->queue_declare('hello', false, false, false, false);

// 创建消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // 将消息放入队列中

echo "生产者向消息队列中发送信息:Hello World!";

$channel->close();
$connection->close();

注释很清晰了吧,如果你之前没有用 PHP 操作过 RabbitMQ 的话,那么直接复制这段代码就可以了。其中比较特殊的是  channel ,它是共享单个 TCP 连接的轻量级信道。这个概念可能是 RabbitMQ 相较于其它消息队列系统比较特别的。然后就是消息内容是通过一个 AMQPMessage 对象承载的,这个 AMQP 其实就是 RabbitMQ 的核心协议。协议是通信双方能够相互看明白对方的基础,能够建立通信的基础,就像我们之前在 Redis 中学习过的 RESP 协议一样。这里大家只需要知道 RabbitMQ 是使用这个协议就好了,而且它也支持其它的一些协议。发送完消息之后,记得关闭连接哦。

好了,接下来是我们的消费者/客户端实现。

// 2.rq.c.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道

// 定义队列
$channel->queue_declare('hello', false, false, false, false);

echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;

// 定义接收数据的回调函数
$callback = function ($msg) {
    echo '接收到数据: ', $msg->body, PHP_EOL;
};
// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 频道是开启状态时,挂起程序,不停地执行
while ($channel->is_open()) {
    // 等待并监听频道中的队列信息
    // 发现上方 basic_consume 定义的队列有消息后
    // 就调用它对应的 callback
    $channel->wait();
}

看着好像多很多东西呀?其实上面一半和生产者是一样的。从中间开始,我们使用的是 Channel 对象的 basic_consume() 方法,这个方法最后有一个回调函数参数。然后在下面通过 wait() 方法持续监听队列中是否有数据。如果有数据了,就调用指定的回调函数。并将消息内容交给回调函数的参数。

注意哦,一般来说,消息队列的消费者,或者说是客户端,或者说是 C 端。大部分情况下可能都会是这样通过一个死循环挂起的。目的就是当我们运行起程序之后,可以不停地,不间断地一直处理队列中的消息数据。之前在学习 Swoole 时,另外如果你学习过 Go 语言的话,也会发现它们的 Http 服务中也是有类似的死循环代码来实现服务端挂起的。这个大家可以到我的 Swoole 系列中看看哦。

测试一下吧,运行一下生产者代码。

➜  source git:(master) ✗ php 2.rq.p.php 
生产者向消息队列中发送信息:Hello World!%

执行结束了,只是输出了一句话,没啥别的效果。那么我们再来运行一下消费者代码。

> php 2.rq.c.php 
等待消息,或者使用 Ctrl+C 退出程序。
接收到数据: Hello World!

可以看到,消费者先是输出了接收到的数据,这个数据其实是上一步我们运行生产者插入到队列中的数据。现在,这条数据打印出来了,其实就是相当于已经被我们消费了。当然,在实际业务中,你可能会对这些数据进行更复杂的业务操作。但在演示时,我这里只是打印了一下。然后,消费者会继续挂在这里等待下一条消息的到来。这时,你可以再次运行生产者代码,然后就会看到消费者这边直接就已经消费了。

Redis 实现

对于 Redis 的实现,其实非常简单,我们之前也已经学过的,那就是使用 List 这个数据结构。先来看生产者,直接 push 数据就好了。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

echo "生产者向消息队列中发送信息:Hello World!";

$redis->lpush('hello', 'Hello World!');

消费者呢?当然就是我们之前已经学过的 pop 啦。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;

while(1){
    $data = $redis->rpop('hello');
    if ($data){
        echo '接收到数据: ', $data, PHP_EOL;
    }
}

在这里,我们使用的是 lpush() + rpop() 的方式,当然你也可以使用 rpush() + lpop() 的方式,大家思考一下,如果是 lpush() + lpop() ,是实现的什么数据结构呢?

同样地,在 Redis 的消费者中,我们也需要通过一个死循环挂起消费者,然后不停地获取数据进行处理。剩下的测试过程就和上面的 RabbitMQ 一样了。

我的实践

之前我就说过,我的消息队列实践不多。唯一的业务场景下实现的高并发消息队列应用其实就是上面的 Redis 这两段代码。真的,核心就是这点东西。但为了抗高并发,我是使用的 Swoole ,生产者是在 Hyperf 框架中通过控制器接收到数据后,直接就放到 Redis 里。然后消费者就是一个命令行,接着开 100 个协程,将获取到的消息数据丢给协程处理。

业务应用是游戏的日志上报,最高并发 20000+ ,入库日志量 3000万+ 。使用的是阿里云最便宜的那个 Redis 服务,4G 大小单实例的那款。

是不是见到了消息队列的恐怖能力。这个量级,对于任何消息队列应用来说问题都不大,RabbitMQ 被认为是比较慢的,但是,它的处理能力是每秒几万次请求。而 Redis ,就是以 Redis 的读写性能为基础的,大概每秒11万的读和8万的写。这个在之前的 Redis 学习中都已经说过了。

总结

今天通过代码,我们其实就已经学习到了整个消息队列中最核心的内容。没错,消息队列就是这么地简单,但又这么地实用。我的业务例子其实是异步解耦的一种实现。而对于另一种常见的场景,秒杀,大家想一想,是不是也可以直接通过这样一个简单的队列就能够实现了。当然,可能还比较简陋,也需要考虑更多的东西,但是,一秒内处理几万条请求和一秒内让几万条请求入队,这个差别可大了去了。

其实,从队列的思想就可以看出,我们用数据库也可以实现队列,插入数据是入队,然后倒序查询出来一条就可以视为出队。但是呢,数据库的性能往往和专业的消息队列以及 NoSQL 工具都是有很大的差距的。因此,其实还是那句话,把握本质和思想,工具用啥都好说。

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/352807.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nginx安装以及具体应用

文章目录 Centos7安装NginxNginx命令Nginx具体应用反向代理 location指令说明负载均衡动静分离 Nginx.conf配置详解 Centos7安装Nginx 下载地址:nginx: download 中间这个就是tar.gz包 Centos7安装Nginx 下载nginx-1.16.1.tar.gz上传到Centos7中的/user/local目…

漏洞攻击中怎么去做最全面覆盖的sql注入漏洞攻击?表信息是如何泄露的?预编译就一定安全?最受欢迎的十款SQL注入工具配置及使用

漏洞攻击中怎么去做最全面覆盖的sql注入漏洞攻击?表信息是如何泄露的?预编译就一定安全?最受欢迎的十款SQL注入工具配置及使用。 SQL注入是因为后台SQL语句拼接了用户的输入,而且Web应用程序对用户输入数据的合法性没有判断和过滤,前端传入后端的参数是攻击者可控的,攻击…

一位网友开始设计一个叫 VisionPro 的VR现实交互操作系统,可以将电脑屏幕的图片拖拽到现实空间摆放

在 Figma 中创建的资源和组件可以在 ShapesXR 中导入和同步,这样您就可以在 Mixedreality 中开始设计,而无需任何 3d 专业技能 Figma是一个矢量图形编辑器和原型设计工具,主要基于网页进行工作,通过macOS或Windows的桌面应用程序…

Docker命令---搜索镜像

介绍 使用docker命令搜索镜像。 命令 docker search 镜像命令:版本号示例 以搜索ElasticSearch镜像为例 docker search ElasticSearch

Redis的应用问题

目录 一、缓存穿透 问题描述 解决方案 缓存击穿 问题描述 解决方案 缓存雪崩 问题描述 解决方案 二、分布式锁 问题描述 解/决方案:使用redis实现分布式锁 优化之设置锁的过期时间 优化之UUID防误删 LUA脚本保证删除的原子性 LUA脚本 LUA脚本在Red…

Dockerfile入门指南:轻松创建定制化Docker镜像

镜像的定制实际上就是定制每一层所添加的配置、文件。如果我们可以把每一层修改、安装、构建、操作的命令都写入一个脚本,用这个脚本来构建、定制镜像,那么无法重复的问题、镜像构建透明性的问题、体积的问题就都会解决。这个脚本就是 Dockerfile。 Doc…

(大众金融)SQL server面试题(3)-客户已用额度总和

今天,面试了一家公司,什么也不说先来三道面试题做做,第三题。 那么,我们就开始做题吧,谁叫我们是打工人呢。 题目是这样的: DEALER_INFO经销商授信协议号码经销商名称经销商证件号注册地址员工人数信息维…

Windows上安装Linux系统

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、WSL是什么?二、WSL安装步骤1.开启wsl支持2.安装wsl3.运行wsl4.环境配置 三、WSL删除引用 前言 提示:这里可以添加本文要记录的大概…

故障树分析蒙特卡洛仿真程序(附MATLAB完整代码)

故障树是一种特殊的倒立树状逻辑因果关系图,它用事件符号、逻辑门符号和转移符号描述系统中各种事件之间的因果关系,通过对引起系统故障的各种因素进行逻辑因果分析,确定导致故障发生的各种可能的原因,并通过定性和定量分析找出系…

MybatisPlus二级映射和关联对象ResultMap

文章目录 一、业务背景1. 数据库表结构2. 需求 二、使用映射直接得到指定结构三、其他文件1. Mapper2. Service3. Controller 四、概念理解一级映射二级映射聚合 五、标签使用1. \<collection\> 标签2. \<association\> 标签 在我们的教程中&#xff0c;我们设计了…

三十四岁的程序员转行送外卖的不在少数啊

疫情过后的就业形势&#xff0c;对于很多人来说&#xff0c;真的变得更为严峻。我有很多之前的朋友&#xff0c;他们待业的时间长达半年&#xff0c;这对他们的生活和心理都造成了很大的压力。我是一名程序开发人员&#xff0c;虽然相对于其他行业来说&#xff0c;我们的薪资待…

如何保证接口幂等性

接口幂等性是指对同一操作发起的一次或多次请求结果是一致的&#xff0c;并且不会因为重复请求而产生副作用。 例如前端应用对后端发出请求&#xff0c;可能由于网络原因&#xff0c;前端并未接收到后端响应&#xff0c;前端进行重试&#xff0c;对后端同一接口发出多次请求 假…

Tortoise-tts Better speech synthesis through scaling——TTS论文阅读

笔记地址&#xff1a;https://flowus.cn/share/a79f6286-b48f-42be-8425-2b5d0880c648 【FlowUs 息流】tortoise 论文地址&#xff1a; Better speech synthesis through scaling Abstract: 自回归变换器和DDPM&#xff1a;自回归变换器&#xff08;autoregressive transfo…

SpringBoot中阿里云OSS的使用

目录 1 登录/注册阿里云并进入控制台 2 进入OSS控制台 3 创建bucket 4 查看bucket 5 获取AccessKey 6 查看帮助文档 7 添加Maven依赖 8 获取示例代码并改造成工具类 9 测试 1 登录/注册阿里云并进入控制台 2 进入OSS控制台 3 创建bucket 4 查看bucket 5 获取AccessKe…

套接字的多种可选项(修改IO缓冲区大小及TCP_NODELAY)

标题套接字的多种可选项 我们进行套接字编程时往往只关注数据通信&#xff0c;而忽略了套接字具有的不同特性。但是&#xff0c;理解这些特性并根据实际需要进行更改也十分重要。 从上表可以看出&#xff0c;套接字可选项是分层的。IPPROTOIP层可选项是IP协议相关事项&#x…

全局视角,搞懂“新零售支付”

文章首发于微信公众号:PenguinPay &#xff0c;欢迎关注。 零售业就是商家将商品或服务在场所中卖给消费者。其中的关键词就是 人&#xff0c;货&#xff0c;场。 一、 初始新零售 1.1 传统零售货找人 传统零售业,只能在固定场所销售商品或服务&#xff0c;以商家店为中心只…

探究SpringWeb对于请求的处理过程

探究目的 在路径归一化被提出后&#xff0c;越来越多的未授权漏洞被爆出&#xff0c;而这些未授权多半跟spring自身对路由分发的处理机制有关。今天就来探究一下到底spring处理了什么导致了才导致鉴权被绕过这样严重的问题。 DispatcherServlet介绍 首先在分析spring对请求处…

AI编译器的前端优化策略

背景 工作领域是AI芯片工具链相关&#xff0c;很多相关知识的概念都是跟着项目成长建立起来&#xff0c;但是比较整个技术体系在脑海中都不太系统&#xff0c;比如项目参与中涉及到了很多AI编译器开发相关内容&#xff0c;东西比较零碎&#xff0c;工作中也没有太多时间去做复盘…

上升子序列的最大长度,递归-记忆化搜索-动态规划三步走

题目描述&#xff1a; 小明有一个数组&#xff0c;他想从数组任意元素开始向后遍历&#xff0c;找出所有上升子序列&#xff0c;并计算出最长的上升子序列的长度。 数据范围&#xff1a; 每组数据长度满足 1≤n≤200 1≤n≤200 &#xff0c; 数据大小满足 1≤val≤350 1≤val≤…

C++基础入门

前言&#xff1a;哈喽小伙伴们&#xff0c;从这篇文章开始&#xff0c;博主将开启新篇章的讲解——C语言&#xff0c;那么C是一门怎么样的语言呢&#xff1f;&#xff1f;&#xff1f;它的语法又是怎么样的呢&#xff1f;&#xff1f;&#xff1f;这篇文章将给你一一解答。 目录…