RabbitMQ中4种交换机的Java连接代码

目录

1.直连交换机(Direct)

生产者代码示例

消费者代码示例

2.RabbitMQ连接工具类

3.Fanout交换机(扇出交换机,广播)

生产者

消费者

4.Topic交换机(主题交换机)

生产者

消费者

5.Header交换机(头部交换机)

生产者

消费者

6.附录说明

相关依赖


1.直连交换机(Direct)

直连交换机通过routingKey绑定交换机和队列,同时在发送消息时,也是通过routingKey找到相对应的队列,特点是一对一发送

生产者代码示例
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectProducer {


    public static void main(String[] args) throws IOException, TimeoutException {


        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("xxx.xxx.xxx.xxx");// 这里写上你自己的ip

        connectionFactory.setUsername("admin");  // 这里写上你的MQ服务器对应的账号

        connectionFactory.setPassword("123456");// 这里写上你的MQ服务器对应的密码

        connectionFactory.setPort(5672);

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        // 创建交换机,名称为"direct_exchange_test", 并交换机类型为direct(即下面第二个参数)
        channel.exchangeDeclare("direct_exchange_test", BuiltinExchangeType.DIRECT,true,false,null);


        // 创建队列,名称为“direct_queue_test”
        channel.queueDeclare("direct_queue_test",true,false,false,null);

        // 绑定队列,并设置routingKey的名称为 “direct_routing”
        channel.queueBind("direct_queue_test","direct_exchange_test","direct_routing");

        String message = "消息发送成功!";

        //开启监听
        channel.basicPublish("direct_exchange_test","direct_routing",null,message.getBytes());

        channel.close();

        connection.close();

    }


}
消费者代码示例
import com.rabbitmq.client.*;

import java.io.IOException;

public class DirectConsumer {

    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("xxx.xxx.xxx.xxx");

        connectionFactory.setUsername("admin");

        connectionFactory.setPassword("123456");

        connectionFactory.setPort(5672);

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        //deliverCallback  消息接收后的回调

        DeliverCallback deliverCallback = (new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {

                System.out.println("接收到的消息:" + new String(delivery.getBody(),"UTF-8"));

            }
        });


        //cancelCallback  消费者取消时的回调
        CancelCallback cancelCallback = (new CancelCallback() {
            @Override
            public void handle(String s) throws IOException {

                System.out.println("消息被拒绝");
            }
        });

        channel.basicConsume("direct_queue_test",true,deliverCallback,cancelCallback);

        channel.close();

        connection.close();

    }

}

2.RabbitMQ连接工具类

可以看到,我们在上面编写direct类型的交换机代码时,无论是生产者还是消费者的一方,都需要先连接上RabbitMQ服务器,并再最后关闭信道和连接,对于每次都需要连接和关闭的重复性代码,我们可以将其封装,打包成工具类:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitUtil {

     private final static String ip = "xxx.xxx.xxx.xxx";  // 你的RabbitMQ服务器ip

     private final static String username = "admin";  //  你的RabbitMQ服务器的用户名

     private final static String password = "123456"; // 你的RabbitMQ服务器的密码

     private final static String port = "5672"; // 你的RabbitMQ服务器的端口

    // 获取连接工厂
    public static Connection getConnectionFactory() throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(ip);

        connectionFactory.setPort(Integer.parseInt(port));

        connectionFactory.setUsername(username);

        connectionFactory.setPassword(password);

        Connection connection = connectionFactory.newConnection();


        return connection;
    }

    // 创建通道
    public static Channel getChannel(Connection connection){

        try {
            return connection.createChannel();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

     // 关闭信道和连接
    public static void close(Channel channel, Connection connection) {
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}

3.Fanout交换机(扇出交换机,广播)

Fanout交换机如其名,特点是会广播,即只要发送消息到其中一个交换机中的一个队列,则同个交换机中的其它队列也会收到消息,因此就减少了routingKey设置的必要

生产者
import com.rabbitmq.client.*;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {

    public static void main(String[] args) throws IOException, TimeoutException {


        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = RabbitUtil.getChannel(connection);

        channel.exchangeDeclare("fanout_exchange_test", BuiltinExchangeType.FANOUT,true, false, null);


        // 创建队列
        channel.queueDeclare("fanout_queue_test",true,false,false,null);
        channel.queueDeclare("fanout_queue_test2",true,false,false,null);

        channel.queueBind("fanout_queue_test","fanout_exchange_test", "");
        channel.queueBind("fanout_queue_test2","fanout_exchange_test", "");

        String message = "这条消息来自Fanout交换机中的队列!";



        // 发送消息到交换机(广播到所有队列)
        channel.basicPublish("fanout_exchange_test","fanout_queue_test",null,message.getBytes("UTF-8"));
        // 上面我把消息指定发送到
        // fanout_queue_test队列,所以这条消息发送到了所有的队列!等价于
        
       //  channel.basicPublish("fanout_exchange_test","",null,message.getBytes("UTF-8"));
        

        RabbitUtil.close(channel,connection);

    }

}
消费者
import com.rabbitmq.client.*;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {


        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = RabbitUtil.getChannel(connection);

        DeliverCallback deliverCallback = (new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {

                System.out.println("接收到的消息是:" + new String(delivery.getBody(),"UTF-8"));
            }
        });

        CancelCallback cancelCallback = (new CancelCallback() {
            @Override
            public void handle(String s) throws IOException {

                System.out.println("消息被取消消费了!");
            }
        });


        // 测试是否两个队列都可以收到消息
        channel.basicConsume("fanout_queue_test",true,deliverCallback,cancelCallback);
        channel.basicConsume("fanout_queue_test2",true,deliverCallback,cancelCallback);

        channel.close();

        connection.close();

    }

}

4.Topic交换机(主题交换机)

Topic交换机与Direct交换机是相对的,direct通过routingKey做到了一对一消息发送,而topic交换机更像是通过routingKey的设置来做模糊查询

  1. Topic中,将routingkey通过"."来分为多个部分
  2. "*":代表一个部分(不能为空)
  3. "#":代表0个或多个部分(如果绑定的路由键为 "#" 时,则接受所有消息,因为路由键所有都匹配)

例如:

 

然后发送一条信息,routingkey为"key1.key2.key3.key4",那么根据"."将这个路由键分为了4个部分,此条路由键,将会匹配:

1.key1.key2,key3.*      成功匹配      2.key1.#     成功匹配

3.*.key2.*.key4       成功匹配           4.#.key3.key4  成功匹配

如果发送消息routingkey为"key1",那么将只能匹配中key1.#,#可以代表0个部分

生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = RabbitUtil.getChannel(connection);

        // 创建交换机

        channel.exchangeDeclare("topic_exchange_test", BuiltinExchangeType.TOPIC,true, false, false, null);

        String routingKey = "*.com.zhan";

        String routingKey2 = "#.zhan";

        String routingkey3 = "zhan.com";

        // 创建队列

        channel.queueDeclare("topic_queue_test1",true,false,false,null);

        channel.queueDeclare("topic_queue_test2",true,false,false,null);

        channel.queueDeclare("topic_queue_test3",true,false,false,null);

        // 队列绑定到交换机上

        channel.queueBind("topic_queue_test1", "topic_exchange_test", routingKey);

        channel.queueBind("topic_queue_test2", "topic_exchange_test", routingKey2);

        channel.queueBind("topic_queue_test3", "topic_exchange_test", routingkey3);

        String message = "这条消息来自Fanout交换机";

        // 发送消息到交换机,routingKeyzhan
        channel.basicPublish("topic_exchange_test", "com.com.zhan", null, message.getBytes());
        // 预期结果:
        // topic_queue_test1 接收到消息
        // topic_queue_test2 接收到消息

        // topic_queue_test3 接收不能到消息

       RabbitUtil.close(channel,connection);

    }

}
消费者
import com.rabbitmq.client.*;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = RabbitUtil.getChannel(connection);

        DeliverCallback deliverCallback = (new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("接收到的消息"+new String(delivery.getBody()));
            }
        });

        CancelCallback cancelCallback = (new CancelCallback() {
            @Override
            public void handle(String s) throws IOException {
                System.out.println("消息被拒绝");
            }
        });

        channel.basicConsume("topic_queue_test1",true,deliverCallback,cancelCallback);

        channel.basicConsume("topic_queue_test2",true,deliverCallback,cancelCallback);

        channel.basicConsume("topic_queue_test3",true,deliverCallback,cancelCallback);

        // 预期结果:
        // test1 和  test2 队列里都会消息

        // test3 不会收到

        RabbitUtil.close(channel,connection);

    }

}

5.Header交换机(头部交换机)

Header交换机与Direct交换机的区别在于,前者使用map来作为消息发送的标识,类似于HTTP协议中的消息头,而后者是通过routingKey,此外 基本没区别,但Header交换机性能差很多,如今基本用不上

消费方指定的headers中必须包含一个"x-match"的键。

键"x-match"的值有2个

x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息

假设现在发送消息,携带的参数是{"name":"xiaomingXX"}想一想上述queue1和queue2哪个能收到消息?

答案是queue2 

虽然queue1和queue的map中要求的参数信息基本一致,但匹配规则不同,queue1是“x-match:all” ,即需要完全匹配,而后者是“x-match:any”,即只要一个满足就可发送到,从图中可以看到queue2里 " "sex":男" "可以匹配到的。

生产者
import com.rabbitmq.client.*;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class HeaderProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = RabbitUtil.getChannel(connection);

        channel.exchangeDeclare("header_exchange_test", BuiltinExchangeType.HEADERS,true, false, false, null);

        channel.queueDeclare("header_queue_test1",true,false,false,null);

        channel.queueDeclare("header_queue_test2",true,false,false,null);

        channel.queueDeclare("header_queue_test3",true,false,false,null);

        HashMap<String, Object> map = new HashMap<>();

        map.put("x-match","all");

        map.put("name","zhangsan");

        map.put("age","20");

        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(map);

        String message = "这条消息来自Header交换机";

        channel.basicPublish("header_exchange_test","header_queue_test1",properties.build(),message.getBytes());

        RabbitUtil.close(channel,connection);

    }

}
消费者
import com.rabbitmq.client.*;
import org.Utils.RabbitUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HeaderConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitUtil.getConnectionFactory();

        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {

            System.out.println("Header Consumer 收到消息:" + new String(delivery.getBody()));

        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };


        channel.basicConsume("header_queue_test1", true, deliverCallback, cancelCallback);


        RabbitUtil.close(channel, connection);
    }


}

6.附录说明

相关依赖
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.12.0</version>
    </dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/445602.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据库-第六/七章 关系数据理论和数据库设计【期末复习|考研复习】

前言 总结整理不易&#xff0c;希望大家点赞收藏。 给大家整理了一下数据库系统概论中的重点概念&#xff0c;以供大家期末复习和考研复习的时候使用。 参考资料是王珊老师和萨师煊老师的数据库系统概论(第五版)。 数据库系统概论系列文章传送门&#xff1a; 第一章 绪论 第二/…

【Docker】容器的概念

容器技术&#xff1a;容器技术是基于虚拟化技术的&#xff0c;它使应用程序从一个计算机环境快速可靠地转移到另一个计算机环境中&#xff0c;可以说是一个新型地虚拟化技术。 一、docker容器 Docker:是一个开源地容器引擎Docker 是一种轻量级的容器化技术&#xff0c;其主要原…

阿里云服务器租用多少钱一个月?9元1个月?

阿里云服务器租用多少钱一个月&#xff1f;9元1个月&#xff1f;已经降价到5元一个月了。阿里云服务器1个月最低5元/月起&#xff0c;阿里云服务器价格可以按年、按月和按小时购买&#xff0c;本文阿里云服务器网aliyunfuwuqi.com来详细说下阿里云服务器一个月收费价格表&#…

计算机系统结构-中断例题笔记

背景&#xff1a;计算机系统结构考试中&#xff0c;中断处理程序、运行程序的过程示意图是重要考点。 中断概念&#xff1a;CPU中止正在执行的程序&#xff0c;转去处理随机提出的请求&#xff0c;待处理完后&#xff0c;再回到原先被打断的程序继续恢复执行的过程。 考点1.设…

WPF 自定义彩色控制台功能

文章目录 前言环境流内容一个简单的控制台 自动添加数据无法添加数据模板代码添加参数简单的案例添加和清空功能完善代码 额外功能添加移动到底部添加样式 总结 前言 在WPF中添加模拟控制台&#xff0c;可以试试的看到最新的日志信息。但是普通的TextBlock只是纯粹的黑色&…

分布式执行引擎ray入门--(2)Ray Data

目录 一、overview 基础代码 核心API&#xff1a; 二、核心概念 2.1 加载数据 从S3上读 从本地读&#xff1a; 其他读取方式 读取分布式数据&#xff08;spark&#xff09; 从ML libraries 库中读取&#xff08;不支持并行读取&#xff09; 从sql中读取 2.2 变换数据…

html--彩虹马

文章目录 htmljscss 效果 html <!DOCTYPE html> <html lang"en" > <head> <meta charset"UTF-8"> <title>Rainbow Space Unicorn</title> <link rel"stylesheet" href"css/style.css"> &l…

TCP/IP 七层架构模型

传输控制协议&#xff08;TCP&#xff0c;Transmission Control Protocol&#xff09;是一种面向连接的、可靠的、基于字节流的传输层通信协议。 套接字&#xff08;socket&#xff09;是一个抽象层&#xff0c;应用程序可以通过它发送或接收数据&#xff0c;可对其进行像对文…

【Linux】常用操作命令

目录 基本命令关机和重启帮助命令 用户管理命令添加用户&#xff1a;useradd 命令修改密码&#xff1a;passwd 命令查看登录用户&#xff1a;who 命令查看登录用户详细信息 :w切换用户 目录操作命令cdpwd命令目录查看 ls [-al] 目录操作【增&#xff0c;删&#xff0c;改&#…

NUMA(Non-Uniform Memory Access)架构的介绍

1. NUMA由来 最早的CPU是以下面这种形式访问内存的&#xff1a; 在这种架构中&#xff0c;所有的CPU都是通过一条总线来访问内存&#xff0c;我们把这种架构叫做SMP架构&#xff08;Symmetric Multi-Processor&#xff09;&#xff0c;也就是对称多处理器结构。可以看出来&…

Uniapp开发模板unibest

&#x1f3e0;简介 unibest 是一个集成了多种工具和技术的 uniapp 开发模板&#xff0c;由 uniapp Vue3 Ts Vite4 UnoCss uv-ui VSCode 构建&#xff0c;模板具有代码提示、自动格式化、统一配置、代码片段等功能&#xff0c;并内置了许多常用的基本组件和基本功能&#…

【PowerMockito:编写单元测试过程中原方法使用@Value注解注入的属性出现空指针】

错误场景 执行到Value的属性时会出现空指针&#xff0c;因为Value的属性为null 解决方法 在测试类调用被测试方法前&#xff0c;提前设置属性值&#xff0c;属性可以先自己定义好 ReflectionTestUtils.setField(endpointConnectionService, "exportUdpList", lis…

Linux 之七:Linux 防火墙 和进程管理

防火墙 查看防火墙 查看 Centos7 的防火墙的状态 sudo systemctl status firewalld。 查看后&#xff0c;看到active(running)就意味着防火墙打开了。 关闭防火墙&#xff0c;命令为&#xff1a; sudo systemctl stop firewalld。 关闭后查看是否关闭成功&#xff0c;如果…

【机器学习】一文掌握逻辑回归全部核心点(上)。

逻辑回归核心点-上 1、引言2、逻辑回归核心点2.1 定义与目的2.2 模型原理2.2.1 定义解析2.2.2 公式2.2.3 代码示例 2.3 损失函数与优化2.3.1 定义解析2.3.2 公式2.3.3 代码示例 2.4 正则化2.4.1 分类2.4.2 L1正则化2.4.3 L2正则化2.4.4 代码示例 3、总结 1、引言 小屌丝&#…

从空白镜像创建Docker hello world

文章目录 写在前面基础知识方法一&#xff1a;使用echo工具方法二&#xff0c;使用c语言程序方法三&#xff0c;使用汇编语言小结 写在前面 尝试搞了下docker&#xff0c;网上的教程大多是让下载一个ubuntu这种完整镜像&#xff0c;寥寥几篇从空白镜像开始创建的&#xff0c;也…

Oracle VM VirtualBox安装Ubuntu桌面版

背景&#xff1a;学习Docker操作 虚拟机软件&#xff1a;Oracle VM VirtualBox 7.0 系统镜像&#xff1a;ubuntu-20.04.6-desktop-amd64.iso 在Oracle VM VirtualBox新建一个虚拟电脑 选择好安装的目录和选择系统环境镜像 设置好自定义的用户名、密码、主机名 选择一下运行内…

执行除法运算返回浮点数结果operator.truediv()返回商的整数部分operator.floordiv()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 执行除法运算 返回浮点数结果 operator.truediv() 返回商的整数部分 operator.floordiv() 下列选项可以执行除法运算并得到浮点数结果的是&#xff08;&#xff09; import operator print(&…

凌鲨微应用架构

微应用是静态网页加上凌鲨提供的扩展能力而形成的一种应用&#xff0c;主要特点是开发便捷&#xff0c;安全。 微应用架构 组件说明 名称 说明 微应用 webview窗口&#xff0c;显示web服务器上的页面 接口过滤器 根据权限配置,屏蔽非授权接口访问 接口提供者 tauri注入…

文件操作上(c语言)

目录 1. 文件的作用2. 什么是文件2.1 程序文件2.2 数据文件2.3 文件名 3. 二进制文件和文本文件4. 文件的打开和关闭4.1 流和标准流4.1.1 流4.1.2 标准流 4.2 文件指针4.3 文件的打开与关闭4.3.1 文件的打开模式4.3.2 实例代码 1. 文件的作用 使用文件可以将数据进行持久化的保…

P1958 上学路线

难度&#xff1a;普及- 题目描述 你所在城市的街道好像一个棋盘&#xff0c;有 a 条南北方向的街道和 b 条东西方向的街道。南北方向的 a 条街道从西到东依次编号为 1 到 a&#xff0c;而东西方向的 b 条街道从南到北依次编号为 1 到 b&#xff0c;南北方向的街道 i 和东西方…