交换机-Exchanges

交换机

Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

Exchanges 的类型

  • 直接:direct 路由模式
  • 主题:topic
  • 标题:headers(不常用)
  • 扇出:fanout 广播模式,发布订阅模式
无名exchange

第一个参数是交换机的名称,空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
绑定bindings

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。

在这里插入图片描述

Fanout模式介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。
在这里插入图片描述

代码示例
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

import java.util.Scanner;

public class Producer {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明扇出类型交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //向交换机发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.printf("消息:%s发送成功!",message);
        }
    }
}
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});
    }
}
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});
    }
}

Direct模式介绍

Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct(直接) 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
在这里插入图片描述

  • 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green
  • 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 black/green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
多重绑定

在这里插入图片描述

如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多

代码实战
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;
import java.util.UUID;

public class Producer {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.232");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明路由模式交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //向交换机发送消息
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNext()){
            String message = UUID.randomUUID().toString();
            String routingKey = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.printf("发送消息:%s成功!RoutingKey:%s\n",message,routingKey);
        }
    }
}
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"error");
        channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});
    }
}
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"warn");
        channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});
    }
}

Topics主题模式介绍

存在的问题:尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节

  • *星号可以代替一个单词

  • #井号可以替代零个或多个单词
    在这里插入图片描述

  • 当一个队列绑定键是#那么这个队列将接收所有数据,就有点像fanout了

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

代码实现
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

import java.util.HashMap;
import java.util.Map;

public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        System.out.println("连接RabbitMQ服务器成功!");
        Channel channel = connection.createChannel();
        //声明主题模式交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        System.out.println("交换机创建成功!");
        Thread.sleep(15*1000);
        //发布消息
        HashMap<String,String> msgMap = new HashMap<>();
        msgMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        msgMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        msgMap.put("quick.orange.fox","被队列 Q1 接收到");
        msgMap.put("lazy.brown.fox","被队列 Q2 接收到");
        msgMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        msgMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        msgMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        msgMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
        for (Map.Entry<String, String> entry : msgMap.entrySet()) {
            channel.basicPublish(EXCHANGE_NAME,entry.getKey(),null,entry.getValue().getBytes("UTF-8"));
        }
    }
}
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");

        DeliverCallback deliverCallback = (tag,msg)->{
            String message = new String(msg.getBody());
            System.out.println(message+"\tRouting Key:"+msg.getEnvelope().getRoutingKey());
        };

        CancelCallback cancelCallback = (tag)->{

        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q2";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

        DeliverCallback deliverCallback = (tag,msg)->{
            String message = new String(msg.getBody());
            System.out.println(message+"Routing Key:"+msg.getEnvelope().getRoutingKey());
        };

        CancelCallback cancelCallback=(tag)->{

        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/17434.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

正则表达式-基本元字符和语法规则

© Ptw-cwl 文章目录 字符匹配元字符.元字符[]元字符[^]元字符*元字符元字符?元字符{}元字符|元字符()元字符^元字符$元字符\元字符\d元字符\w元字符\s元字符\b元字符\B元字符*?、?、??、{n,m}?元字符(?)、(?!)元字符(?:)元字符\1、\2等元字符^、$元字符&#x…

JavaSE基础(二)—— 类型转换、运算符、键盘录入

目录 一、类型转换 1. 自动类型转换 1.1 自动类型转换的底层原理&#xff1a; ​1.2 自动类型转换的其他形式​编辑 2. 表达式的自动类型转换 3. 强制类型转换 3.1 强制类型转换底层原理​编辑 3.2 注意事项 二、运算符 1. 算数运算符 1.1 案例&#xff1a;数值拆分…

PCA主成成分分析例题详解

主成分分析是一种降维算法&#xff0c;它能将多个指标转换为少数几个主成分&#xff0c;这些主成分是原始变量的线性组合&#xff0c;且彼此之间互不相关&#xff0c;其能反映出原始数据的大部分信息 需要了解具体细节可看此视频&#x1f449;&#xff1a;什么是主成成分分析PC…

Linux安装MongoDB数据库,并内网穿透远程连接

文章目录 前言1. 配置Mongodb源2. 安装MongoDB3. 局域网连接测试4. 安装cpolar内网穿透5. 配置公网访问地址6. 公网远程连接7. 固定连接公网地址8. 使用固定地址连接 转载自Cpolar Lisa文章&#xff1a;Linux服务器安装部署MongoDB数据库 - 无公网IP远程连接「内网穿透」 前言 …

SpringBoot访问静态资源

SpringBoot项目中没有WebApp目录&#xff0c;只有src目录。在src/main/resources下面有static和templates两个文件夹。SpringBoot默认在static目录中存放静态资源&#xff0c;而templates中放动态页面。 static目录 SpringBoot通过/resources/static目录访问静态资源&#xff…

完成A轮融资,倍思如何发力场景化为品牌创造广阔未来?

凛冬过后的消费电子正在重新凝聚资本的目光。 近日&#xff0c;深圳市倍思科技有限公司宣布完成由深创投、中金资本联合领投&#xff0c;越秀产业基金、高榕资本跟投&#xff0c;金额数亿元人民币的A轮融资。 分析人士指出&#xff0c;消费电子的行业景气度在逐渐恢复&#x…

中国社科院与美国杜兰大学金融管理硕士项目——迎接立夏,切莫忘记自我成长

五月的风吹走了春季&#xff0c;今天我们迎来立夏。作为夏季的第一个节气&#xff0c;立夏常被人们当做万物蓄满能量&#xff0c;即将加速生长的标志。而在职的我们&#xff0c;也应该跟这世间万物一样&#xff0c;在季节交替之时沉淀自己、努力向上成长。在社科院与杜兰大学金…

“人工智能教父”从谷歌离职 称后悔发展AI,为世人敲响警钟?

在加入谷歌的第十年、深度学习迎来爆发式发展的当下&#xff0c;被誉为“人工智能教父”的Geoffrey Hinton已从谷歌离职&#xff0c;只是为了告诫人们AI已经变得很危险。 公开资料显示&#xff0c;Geoffrey Hinton在2013年加入谷歌&#xff0c;曾任副总裁&#xff0c;研究机器学…

成为数据分析师,需要具备哪些技能?

随着互联网的发展&#xff0c;数据分析师的特点越来越明显&#xff0c;对数据分析师综合素质的要求也较高。 1、较强的数据挖掘、信息整理、和逻辑分析能力 数据分析&#xff0c;也是数据分析师的一个方向。 制作日常性的经营报表&#xff0c;对公司或者行业KPI指标进行拆解…

Mysql索引(3):索引分类

1 索引分类 在MySQL数据库&#xff0c;将索引的具体类型主要分为以下几类&#xff1a;主键索引、唯一索引、常规索引、全文索引。 分类含义特点关键字主键索引针对于表中主键创建的索引 默认自动创建, 只能有一个 PRIMARY 唯一索引 避免同一个表中某数据列中的值重复可以有多…

【Android入门到项目实战-- 8.4】—— 如何解析JSON格式数据

目录 一、准备工作 二、使用JSONObject 三、使用GSON 比起XML&#xff0c;JSON的主要优势在于它的体积更小&#xff0c;在网络上传输的时候可以更省流量&#xff0c;但缺点是语义性较差&#xff0c;看起来不直观。 一、准备工作 还是使用前面文章的方法&#xff0c;在服务器…

每日学术速递4.29

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.LG 1.A Cookbook of Self-Supervised Learning 标题&#xff1a;自监督学习食谱 作者&#xff1a;Randall Balestriero, Mark Ibrahim, Vlad Sobal, Ari Morcos, Shashank Shekhar, Tom…

【黑马程序员 C++教程从0到1入门编程】【笔记8】 泛型编程——模板

https://www.bilibili.com/video/BV1et411b73Z?p167 C泛型编程是一种编程范式&#xff0c;它的核心思想是编写通用的代码&#xff0c;使得代码可以适用于多种不同的数据类型。 而模板是C中实现泛型编程的一种机制&#xff0c;它允许我们编写通用的代码模板&#xff0c;然后在需…

【Spring篇】IOC/DI注解开发

&#x1f353;系列专栏:Spring系列专栏 &#x1f349;个人主页:个人主页 目录 一、IOC/DI注解开发 1.注解开发定义bean 2.纯注解开发模式 1.思路分析 2.实现步骤 3.注解开发bean作用范围与生命周期管理 1.环境准备 2.Bean的作用范围 3.Bean的生命周期 4.注解开发依赖…

相当Python程序员,选择培训班还是自学?我结合自己的经历谈谈看法

前几天我写了一篇文章&#xff0c;分享了自己当上程序员的经历。然后&#xff0c;我收到了很多小伙伴的提问&#xff0c;都在问同一个问题&#xff0c;即如何选择报培训班还是自学。今天&#xff0c;我结合自己的个人经历&#xff0c;来谈一下个人的看法。 我认为这个问题的第…

Linux线程:死锁

1. 死锁 &#xff08;1&#xff09;概念 死锁&#xff08;DeadLock&#xff09;指两个或两个以上的进程或线程执行时&#xff0c;由于竞争临界资源而造成阻塞的现象&#xff1b;若不干涉&#xff0c;则无法推进下去。 &#xff08;2&#xff09;死锁的原因 ① 竞争临界资源…

06_Uboot顶层Makefile分析_前期所做内容

目录 U-Boot顶层Makefile分析 版本号 MAKEFLAGS变量 命令输出 静默输出 设置编译结果输出目录 代码检查 模块编译 获取主机架构和系统 设置目标架构、交叉编译器和配置文件 调用scripts/Kbuild.include 交叉编译工具变量设置 导出其他变量 U-Boot顶层Makefile分析…

TCP/IP网络编程(一)

TCP/IP网络编程读书笔记 第1章 理解网络编程和套接字1.1 理解网络编程和套接字1.1.1 构建打电话套接字1.1.2 编写 Hello World 套接字程序 1.2 基于Linux的文件操作1.2.1 底层访问和文件描述符1.2.2 打开文件1.2.3 关闭文件1.2.4 将数据写入文件1.2.5 读取文件中的数据1.2.6 文…

操作系统考试复习——第四章 存储器管理 4.1 4.2

存储器的层次结构&#xff1a; 存储器的多层结构&#xff1a; 存储器至少分为三级&#xff1a;CPU寄存器&#xff0c;主存和辅存。 但是一般分为6层为寄存器&#xff0c;高速缓存&#xff0c;主存储器&#xff0c;磁盘缓存&#xff0c;固定磁盘&#xff0c;可移动存储介质。…

( “ 图 “ 之 拓扑排序 ) 207. 课程表 ——【Leetcode每日一题】

❓207. 课程表 难度&#xff1a;中等 你这个学期必须选修 numCourses 门课程&#xff0c;记为 0 到 numCourses - 1 。 在选修某些课程之前需要一些先修课程。 先修课程按数组 prerequisites 给出&#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示如果要学习课…