tp8 使用rabbitMQ(3)发布/订阅

发布/订阅

当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

下面的图才是 rabbitmq 的完整模式, 中间是有交换机的
在这里插入图片描述

交换机

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

我们之前的 简单队列和工作队列中,没有提来交换机的概念。

默认交换机

当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "

交换机的种类有多种

直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了

$channel->basic_publish($msg, '', 'hello');

发布订阅模式中,我们使用 扇形交换机 fanout 代码如下

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称


因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)

交换机和队列的绑定(这里应该是在消费者代码中出现的)

我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中

$channel->queue_bind($queue_name, 'hello');  //这样就把队列名称和交换机名称做了绑定

下面的 完整的代码示例

生产者

<?php
declare (strict_types = 1);

namespace app\command;

use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class PubSubMQProduce extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('pubsubmqproduce')
            ->setDescription('发布订阅模式的生产者');
    }

    protected function execute(Input $input, Output $output)
    {
        //获取连接
        $connection = $this->getConnectRabbitMQ();
        //创建通道
        $channel = $connection->channel();
        //创建交换机
        /**
         * params exchange  自定义交换机名称
         * params type  交换机的类型, 一般都会使用 扇形(fanout)
         * params passive 是否消极声名
         * params durable 是否持久化
         * params auto_delete 是否自动删除
         * params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
         * params  nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行
         */
        $channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);
        //现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)

        for ($i = 0; $i < 20; $i++) {
            $msgArr = [
                "name"=>"haha".$i,
                "age"=>'10'.$i,
                "sex"=>"female".$i
            ];
            $msg = new AMQPMessage(json_encode($msgArr),[
                "delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]);
            sleep(1);
            $channel->basic_publish($msg,"exchangeName");
        }

        $channel->close();
        $connection->close();
    }

    protected function getConnectRabbitMQ(){
        try{
            $connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
            return $connection;
        }catch(Exception $e){
            throw new Exception("队列连接失败");
        }

    }
}

消费者

<?php
declare (strict_types = 1);

namespace app\command;

use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class PubSubMQConsumer extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('pubsubmqconsumer')
            ->setDescription('发布订阅模式的消费者');
    }

    protected function execute(Input $input, Output $output)
    {
        $connection = $this->connectRabbitMQ();
        $channel = $connection->channel();
        //创建两个队列
        $channel->queue_declare("queueName1",false,false,false,false,false);
        $channel->queue_declare("queueName2",false,false,false,false,false);
        //绑定交换机和队列,交换机的名称是在生产者中定义的
        $channel->queue_bind("queueName1","exchangeName");
        $channel->queue_bind("queueName2","exchangeName");
        //设置消息处理函数
        $callback1 = function($msg){
            $msgArr = json_decode($msg->body,true);
            echo "这是(显示)处理数据的队列NO1  ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了
        };
        $callback2 = function($msg){
            $msgArr = json_decode($msg->body,true);
            echo "这是(保存)处理数据的队列NO2  ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了
        };
        $channel->basic_consume("queueName1","",false,false,false,false,$callback1);
        $channel->basic_consume("queueName2","",false,false,false,false,$callback2);
        while(count($channel->callbacks)){
            $channel->wait();
        }
    }

    protected function connectRabbitMQ(){
        try{
            $connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
            return $connection;
        }catch(Exception $e){
            throw new Exception("队列连接失败");
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/186565.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

生物识别访问面临风险

安全公司 Blackwing Intelligence 发现了多个允许您绕过Windows Hello 身份验证的漏洞。 戴尔 Inspiron 灵越 15、联想 ThinkPad T14 和 Microsoft Surface Pro X笔记本电脑上会出现这种情况&#xff0c;原因是设备中集成了来自Goodix、Synaptics 和 ELAN的指纹传感器。 所有…

Windows核心编程 跨进程操作

目录 进程A拿到进程B句柄是否能用 句柄的权限 关于句柄表 跨进程使用句柄-继承 CreateProcess&#xff1a;bInheritHandles OpenProcess FindWinodw GetCurrentProcess 跨进程使用句柄-拷贝 跨进程操作内存 WriteProcessMemory VirtualProtectEx ReadProcessMemo…

情感对话机器人的任务体系

人类在处理对话中的情感时&#xff0c;需要先根据对话场景中的蛛丝马迹判断出对方的情感&#xff0c;继而根据对话的主题等信息思考自身用什么情感进行回复&#xff0c;最后结合推理出的情感形成恰当的回复。受人类处理情感对话的启发&#xff0c;情感对话机器人需要完成以下几…

npm pnpm yarn(包管理器)的安装及镜像切换

安装Node.js 要安装npm&#xff0c;你需要先安装Node.js。 从Node.js官方网站&#xff08;https://nodejs.org&#xff09;下载并安装Node.js。 根据你的需要选择相应的版本。 一路Next&#xff0c;直到Finish 打开CMD&#xff0c;输入命令来检查Node.js和npm是否成功安装 nod…

授时小课堂——北斗卫星信号和GPS卫星信号谁更强?

北斗卫星信号好还是GPS信号更胜一筹呢&#xff1f;下面小编带大家一起来比较一下看看吧。 1. 系统覆盖范围 北斗卫星导航系统是中国自主研发的授时定位系统&#xff0c;其覆盖范围包括全球各个地区。但在海外地区&#xff0c;主要还是东南亚、南亚、中亚等地区&#xff0c;北斗…

精通Nginx(18)-FastCGI/SCGI/uWSGI支持

最初用浏览器浏览的网页只能是静态html页面。随着社会发展,动态获取数据、操作数据需要变得日益强烈,CGI应运而生。CGI(Common Gateway Interface)公共网关接口,是外部扩展应用程序与静态Web服务器交互的一个标准接口。它可以使外部程序处理浏览器送来的表单数据并对此作出…

NX二次开发UF_CURVE_ask_curve_struct_data 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_curve_struct_data Defined in: uf_curve.h int UF_CURVE_ask_curve_struct_data(UF_CURVE_struct_p_t curve_struct, int * type, double * * curve_data ) overview…

数据结构与算法编程题20

统计二叉树的叶结点个数。 #define _CRT_SECURE_NO_WARNINGS#include <iostream> using namespace std;typedef char ElemType; #define ERROR 0 #define OK 1 typedef struct BiNode {ElemType data;BiNode* lchild, * rchild; }BiNode,*BiTree;bool Create_tree(BiTre…

JWT和Token之间的区别

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a;每天一个知识点 ✨特色专栏&#xff1a…

【点云surface】 凹包重构

1 处理过程可视化 原始数据 直通滤波过滤后 pcl::ProjectInliers结果 pcl::ExtractIndices结果 凹包结果 凸包结果 2 处理过程分析&#xff1a; 原始点云 ---> 直通滤波 --> pcl::SACSegmentation分割出平面 -->pcl::ProjectInliers投影 --> pcl::ConcaveHull凹包…

模板初阶(1):函数模板,类模板

一、函数模板 1.1 概念 函数模板代表了一个函数家族&#xff0c;该函数模板与类型无关&#xff0c;在使用时被参数化&#xff0c;根据实参类型产生函数的特定类型版本。 格式&#xff1a; template <typename T>或template <class T> template <class T>…

一起学docker系列之九docker运行mysql 碰到的各种坑及解决方法

目录 前言1 Docker 运行mysql命令2 坑一&#xff1a;无法读取/etc/mysql/conf.d目录的问题3 坑二&#xff1a;/tmp/ibnr0mis 文件无法创建/写入的问题4 坑三&#xff1a;Navicat 连接错误&#xff08;1045-access denied&#xff09;5 坑四&#xff1a;MySQL 登录失败问题结语 …

micro_ros

原文链接Supported Hardware | micro-ROS Supported Hardware The main targets of micro-ROS are mid-range 32-bits microcontroller families. Usually, the minimum requirements for running micro-ROS in an embedded platform are memory constraints. Since memory u…

Spring Boot - 瘦身大作战:优雅应对Spring Boot Fat Jar

文章目录 Fat Jar瘦身pom修改copy lib启动 -Dloader.path验证 源码分析前置阅读spring-boot-loader 依赖类继承关系PropertiesLauncher属性配置 附 pom.xml Fat Jar 【pom.xml】 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"ht…

支持Arm CCA的TF-A威胁模型

目录 一、简介 二、评估目标 2.1 假定 2.2 数据流图 三、威胁分析 3.1 威胁评估 3.1.1 针对所有固件镜像的一般威胁 3.1.2 引导固件可以缓解的威胁 3.1.3 运行时EL3固件可缓解的威胁 一、简介 本文针对支持Arm Realm Management Extension (RME)、实现Arm Confidentia…

【Amazon】安装卸载AWS CLI操作流程(Windows 、Linux系统)

AWS 命令行界面&#xff08;AWS CLI&#xff09;是用于管理 AWS 产品的统一工具。只需要下载和配置一个工具&#xff0c;您就可以使用命令行控制多个 AWS 产品并利用脚本来自动执行这些服务。 AWS CLI v2 提供了多项新功能&#xff0c;包括改进的安装程序、新的配置选项&#…

(Matalb分类预测)GA-BP遗传算法优化BP神经网络的多维分类预测

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分代码&#xff1a; 四、本文代码数据说明手册分享 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matalb平台编译&am…

NX二次开发UF_CURVE_ask_curve_inflections 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_curve_inflections Defined in: uf_curve.h int UF_CURVE_ask_curve_inflections(tag_t curve_eid, double proj_matrx [ 9 ] , double range [ 2 ] , int * num_infpt…

如何处理git多分支

本篇文章主要处理以下两种多分支问题 如何将自己在本地的修改上传到一个新的Git分支&#xff08;比如用于测试&#xff0c;不合并进main分支&#xff09;&#xff1f;如何在一个新的本地仓库拉取一个项目的非main分支&#xff0c;并处理他们关联关系&#xff1f; 1. 将自己在…

如何用低代码的思路设计文字描边渐变组件

前言 文字特效设计一直是困扰 Web 前端 Css 世界多年的问题, 比如如何用纯 Css 实现文字描边, 渐变, 阴影等, 由于受限于浏览器兼容性的问题, 我们不得不使用其他替代方案来实现. 平时工作中我们使用 PS 等设计工具能很容易的实现文字渐变等特效, 但是随着可视化技术的成熟, 我…