RabbitMQ常用工作模式+整合springboot

目录

1.MQ的相关概念

1.1 什么是MQ消息中间件

1.2 为什么使用MQ

(1) 应用解耦

(2) 异步提速  

(3)削峰填谷

1.3 使用MQ的劣势

1.4 常见的MQ组件​​​​​​​

2. RabbitMQ的概述

2.1 RabbitMQ的概念

2.2 RabbitMQ的原理

2.3 安装RabbitMQ

3. RabbitMQ 的工作模式

3.1 simple (简单模式)

3.2 Work queues(工作模式)

3.3.Publish/Subscribe(发布订阅模式)

3.4.Routing(路由模式)

3.5.Topics(主题模式)

4.springboot整合RabbitMQ

4.1.生产方

4.2.消费方

4.3.通过代码创建交换机和队列


1.MQ的相关概念

1.1 什么是MQ消息中间件

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。它是应用程序和应用程序之间的通信方法。

1.2 为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

MQ总结为三个好处:

(1) 应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。

 

(2) 异步提速  

上面要完成下单需要花费的时间: 20 + 300 + 300 + 300 = 920ms 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

使用MQ可以解决上述问题

 用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。 提升用户体验和系统吞吐量(单位时间内处理请求的数目)

(3)削峰填谷

举个例子,如果订单系统最多能处理一千次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两千次下单操作系统是处理不了的,只能限制订单超过一千后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。 简单来说: 就是在访问量剧增的情况下,但是应用仍然不能停,比如“双十一”下单的人多,但是淘宝这个应用仍然要运行,所以就可以使用消息中间件采用队列的形式减少突然访问的压力

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性

1.3 使用MQ的劣势

  • 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
  • 系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  • 一致性问题 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.4 常见的MQ组件

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征

2. RabbitMQ的概述

2.1 RabbitMQ的概念

  • 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
  • RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message
  • ​ RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

2.2 RabbitMQ的原理

名词解释:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销.
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

2.3 安装RabbitMQ

安装详情------>虚拟机安装RabbitMQ

3. RabbitMQ 的工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

3.1 simple (简单模式)

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

演示:

创建工程

 加依赖

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>
    </dependencies>

 生产者:

package com.wqg.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:简单模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建一个队列对象
        /**
         * String queue, 队列的名称. 如果该名称不存在 则创建  如果存在则不创建
         * boolean durable, 该对象是否持久化  当rabbitmq重启后 队列就会消失
         * boolean exclusive, 该队列是否被一个消费者独占
         * boolean autoDelete,当没有消费者时,该队列是否被自动删除
         * Map<String, Object> arguments: 额外参数的设置
         *
         */
        channel.queueDeclare("simple-queue",true,false,false,null);

        //发送信息
        /**
         * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机
         * String routingKey, 路由标识  如果是简单模式起名为队列的名称
         * BasicProperties props, 消息的属性设置。 设置为null
         * byte[] body: 消息的内容
         */
        String msg = "Hello RabbitMQ~~~";
        channel.basicPublish("","simple-queue",null,msg.getBytes());

        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

消费者:

3.2 Work queues(工作模式)

 Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者:

package com.wqg.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:工作模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class WorkProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建一个队列对象
        /**
         * String queue, 队列的名称. 如果该名称不存在 则创建  如果存在则不创建
         * boolean durable, 该对象是否持久化  当rabbitmq重启后 队列就会消失
         * boolean exclusive, 该队列是否被一个消费者独占
         * boolean autoDelete,当没有消费者时,该队列是否被自动删除
         * Map<String, Object> arguments: 额外参数的设置
         *
         */
        channel.queueDeclare("Work-Queue", true, false, false, null);

        //发送信息
        /**
         * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机
         * String routingKey, 路由标识  如果是简单模式起名为队列的名称
         * BasicProperties props, 消息的属性设置。 设置为null
         * byte[] body: 消息的内容
         */
        for (int i=0; i<10; i++){
            String msg = "Hello RabbitMQ~~~工作模式";
            channel.basicPublish("", "Work-Queue", null, msg.getBytes());
        }


        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:工作模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class WorkConsumer01 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Work-Queue", true, consumer);
    }
}

总结: 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

例如:短信服务部署多个,只需要有一个节点成功发送即可。

3.3.Publish/Subscribe(发布订阅模式)

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    1. Fanout:广播,将消息交给所有绑定到交换机的队列

    2. Direct:定向,把消息交给符合指定routing key 的队列

    3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者:

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:发布订阅模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class PublishProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        /**
         * String exchange, 交换机的名称 如果不存在则创建 存在则不创建
         * BuiltinExchangeType type, 交换机的类型
         * boolean durable: 是否持久化。
         */
        channel.exchangeDeclare("Publish-exchange", BuiltinExchangeType.FANOUT,true);

        //创建队列
        channel.queueDeclare("Publish-Queue01", true, false, false, null);
        channel.queueDeclare("Publish-Queue02", true, false, false, null);

        //队列和交换机绑定
        /**
         * String queue,
         * String exchange,
         * String routingKey: 发布订阅模式 没有routingKey 则写为""
         */
        channel.queueBind("Publish-Queue01","Publish-exchange","");
        channel.queueBind("Publish-Queue02","Publish-exchange","");

        //发送信息
        String msg = "Hello RabbitMQ~~~发布订阅模式";
        channel.basicPublish("Publish-exchange","", null, msg.getBytes());


        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

 消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:发布订阅模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class PublishConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        //Publish-Queue02和Publish-Queue01只有一个里面有数据
        channel.basicConsume("Publish-Queue02", true,consumer);
    }
}
  1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

  2. 发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

3.4.Routing(路由模式)

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

生产者:

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:路由模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class RouterProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare("Router-exchange", BuiltinExchangeType.DIRECT,true);

        //创建队列
        channel.queueDeclare("Router-queue001",true,false,false,null);
        channel.queueDeclare("Router-queue002",true,false,false,null);

        //队列和交换机绑定
        channel.queueBind("Router-queue001","Router-exchange","error");
        channel.queueBind("Router-queue002","Router-exchange","error");
        channel.queueBind("Router-queue002","Router-exchange","info");
        channel.queueBind("Router-queue002","Router-exchange","warning");
        String msg = "Hello RabbitMQ~~~路由模式";
        channel.basicPublish("Router-exchange","info",null,msg.getBytes());

        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:路由模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class RouterConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Router-queue002", true,consumer);
    }
}

总结:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

3.5.Topics(主题模式)

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.” 分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词, 例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

生产者:  

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:主题模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class TopicsProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare("Topics-exchange", BuiltinExchangeType.TOPIC,true);

        //创建队列
        channel.queueDeclare("Topics-queue001",true,false,false,null);
        channel.queueDeclare("Topics-queue002",true,false,false,null);

        //队列和交换机绑定
        channel.queueBind("Topics-queue001","Topics-exchange","*.orange.*");
        channel.queueBind("Topics-queue002","Topics-exchange","*.*.rabbit");
        channel.queueBind("Topics-queue002","Topics-exchange","lazy.#");
        String msg = "Hello RabbitMQ~~~主题模式";
        channel.basicPublish("Topics-exchange","lazy.rabbit.orange",null,msg.getBytes());

        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:路由模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class TopicsConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Topics-queue002", true,consumer);
    }
}

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,

只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

4.springboot整合RabbitMQ

4.1.生产方

创建springboot项目----生产者
(1) 引入 rabbitmq 整合的依赖
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
(2) 再配置文件中添加 rabbit 服务的信息
#rabbitmq的配置
spring.rabbitmq.host=192.168.75.129
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
(3) 调用 RabbitTemplate 中发送消息的方法
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("Topics-exchange","lazy.aaa","Hello RabbitMQ...");
    }

}

4.2.消费方

创建springboot项目----消费者

(1)引入rabbitmq整合的依赖------同上

(2)再配置文件中添加rabbit服务的信息------同上

(3) 创建类 --- 创建监听方法即可 @RabbitListener
@Component
public class MyListener {

    //queues:表示你监听的队列名
    @RabbitListener(queues = {"Topics-queue002"})
    public void h(Message message){
        //把监听到的消息封装到Message类对象中
        byte[] body = message.getBody();
        System.out.println("消息内容==="+new String(body));
    }
}

4.3.通过代码创建交换机和队列

package com.wqg.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ fileName:RabbitConfig
 * @ description:
 * @ author:wqg
 * @ createTime:2023/7/12 19:41
 */
@Configuration
public class RabbitConfig {
    //定义了一个名为EXCHANGE_NAME的常量,用于表示交换器的名称
    public static final String EXCHANGE_NAME = "Topics-queue002";

    @Bean
    public Exchange exchange() {
        // 创建一个Topic类型的Exchange,名称为EXCHANGE_NAME,持久化
        Exchange topic_exchange02 = ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        return topic_exchange02;
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列,名称为"Topics-queue003"
        Queue topic_queue03 = QueueBuilder.durable("Topics-queue003").build();
        return topic_queue03;
    }

    @Bean
    public Binding binding() {
        // 创建一个绑定关系,将队列绑定到Exchange上,并指定routing key为"qy165.#",不使用任何参数
        Binding noargs = BindingBuilder.bind(queue()).to(exchange()).with("qy165.#").noargs();
        return noargs;
    }

    //如果交换机要绑定多个队列 需要再写一个bind方法

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/38011.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【NLP】Word2Vec原理和认识

一、介绍 Word2Vec是NLP领域的最新突破。Tomas Mikolov是捷克计算机科学家&#xff0c;目前是CIIRC&#xff08;捷克信息学&#xff0c;机器人和控制论研究所&#xff09;的研究员&#xff0c;是word2vec研究和实施的主要贡献者之一。词嵌入是解决NLP中许多问题不可或缺的一部分…

基于B/S架构SaaS服务的实验室信息系统(LIS)

实验室信息系统LIS源码 实验室信息系统&#xff08;Laboratory Information System&#xff09;&#xff0c;简称LIS&#xff0c;是一个全面基于网络化应用&#xff0c;能够帮助用户按照规范内容和规范流程进行多角色、多层次检验信息及资源管理的系统。通过条码管理系统从HIS…

云计算的学习(三)

三、云计算中的网络基础知识 1.虚拟化中网络的架构 1.1虚拟化中网络的架构 二层交换机作为接入交换机使用&#xff0c;三层交换机可以作为汇聚交换机或核心交换机&#xff0c;在抛开网络安全设备时&#xff0c;路由器直接连接在互联网上。 1.2广播和单播 物理服务器内部主要…

Iceberg从入门到精通系列之十七:Apache InLong往Iceberg同步数据

Iceberg从入门到精通系列之十七&#xff1a;Apache InLong往Iceberg同步数据 一、概览二、版本支持三、依赖项四、SQL API 用法五、多表写入六、动态表名映射七、动态建库、建表八、动态schema变更九、Iceberg Load 节点参数十、数据类型映射 一、概览 Apache Iceberg是一种用…

Flutter系列文章-Flutter环境搭建和Dart基础

Flutter是Google推出的一个开源的、高性能的移动应用开发框架&#xff0c;可以用一套代码库开发Android和iOS应用。Dart则是Flutter所使用的编程语言。让我们来看看如何搭建Flutter开发环境&#xff0c;并了解Dart语言的基础知识。 一、Flutter环境搭建 1. 安装Flutter SDK …

springboot+ElasticSearch+Logstash+Kibana实现日志采集ELK

ElasticSearchLogstashKibana日志管理 一、什么是ELK? ELK是Elasticsearch、Logstash、Kibana的简称&#xff0c;这三者是核心套件&#xff0c;但并非全部。一般情况下我们可以把日志保存在日志文件当中&#xff0c;也可以把日志存入数据库当中。但随着业务量的增加&#xf…

搭建 Java 部署环境,部署 Web 项目到 Linux

为了进行部署&#xff0c;把写好的 java web 程序放到 Linux 上&#xff0c;需要先把对应的依赖的软件 (环境) 搭建好&#xff0c;安装一些必要的软件程序 JDKTomcatMySqL jdk 直接使用包管理器进行安装(基于yum安装) 一、yum 1、认识 yum yum (Yellow dog Updater, Modified…

6. Java + Selenium 环境搭建

前提&#xff1a;Java 版本最低要求为 8&#xff1b;推荐使用 chrome 浏览器 chrome Java 1. 下载 chrome 浏览器&#xff08;推荐&#xff09; 2. 查看 chrome 浏览器版本 重点记住前两位即可。 3. 下载 chrome 浏览器驱动 下载链接&#xff1a; https://chromedriver.…

我爱学QT-仿写智能家居界面 上 中 下

学习链接&#xff1a; 仿写一个智能家居界面&#xff08;上&#xff09;_哔哩哔哩_bilibili 上 给QT工程添加资源文件 在这里 然后选这个&#xff0c;choose后会有起名&#xff0c;之一千万不能是中文&#xff0c;要不就等报错吧 然后把你要添加的图片托到文件夹下&#xf…

云计算基础教程(第2版)笔记——基础篇与技术篇介绍

文章目录 前言 第一篇 基础篇 一 绪论 1.1 云计算的概念以及特征 1.1.1云计算的基本概念 1.1.2云计算的基本特征 1.2 云计算发展简史 1.3 三种业务模式介绍 1. 基础设施即服务&#xff08;IaaS&#xff09; 2. 平台即服务&#xff08;PaaS&#xff09; 3. 软…

F#奇妙游(12):并行编程与π

核越多&#xff0c;越快乐 多核CPU对于计算机程序的开发带来了很大的挑战&#xff0c;但是也带来了很大的机遇。在多核CPU上&#xff0c;程序的性能可以通过并行化来提升&#xff0c;但是并行化的难度也随之提升。本文将介绍多核CPU的基本概念&#xff0c;以及如何在多核CPU上…

OpenCv图像基本变换

目录 一、图像翻转 二、图像旋转 三、仿射变换之平移 四、仿射变换之获取变换矩阵 五、仿射变换之透视变换 一、图像翻转 图像翻转不等同于旋转&#xff0c;类似于一些视频的拍摄&#xff0c;拍摄后实际是左右颠倒的&#xff0c;通过图像翻转可进行还原 案例代码如下: …

Android之Intent

意图介绍 一个意图(Intent)对象包含了目标组件、动作、数据、类别、附加数据、标志六个部分。 目标组件 目标组件可以帮助应用发送显式意图调用请求。在创建Intent时&#xff0c;可以通过setComponent方法来设置一个组件&#xff0c;如&#xff1a; //设置组件 intent.setC…

Linux - CentOS 二进制安装 MySQL 8.0.31(非常实用)

一、下载 mysql-8.0.31-linux-glibc2.12-x86_64.tar.xz 下载地址&#xff1a;MySQL :: Download MySQL Community Server (Archived Versions) 具体如下图所示&#xff1a; 二、将 mysql-8.0.31-linux-glibc2.12-x86_64.tar.xz 放入到服务器的 /usr/local &#xff08;路径可…

C++万字自学笔记

[TOC] 一、 C基础 C的IDE有CLion、Visual Studio、DEV C、eclipse等等&#xff0c;这里使用CLion进行学习。 0. C初识 0.1 第一个C程序 编写一个C程序总共分为4个步骤 创建项目创建文件编写代码运行程序 #include <iostream>int main() {using namespace std;cout…

SpringCloud Alibaba——Ribbon底层怎样实现不同服务的不同配置

目录 一、Ribbon底层怎样实现不同服务的不同配置二、源码角度分析 一、Ribbon底层怎样实现不同服务的不同配置 为不同服务创建不同的spring上下文&#xff0c;不同的spring上下文中存放对应这个服务所有的配置。 二、源码角度分析 SpringClientFactory中可以获取到所有ribbon…

基于MSP432P401R跟随小车(一)【2022年电赛】

文章目录 一、赛前准备1. 硬件清单2. 工程环境 二、赛题思考三、软件设计1. 路程、时间、速度计算2. 距离测量3. 双机通信4. 红外循迹 四、技术交流 一、赛前准备 1. 硬件清单 主控板&#xff1a; MSP432P401R测距模块&#xff1a; GY56数据显示&#xff1a; OLED电机&#x…

7.5 SpringBoot 拦截器Interceptor实战 统一角色权限校验

文章目录 前言一、定义注解annotation二、拦截角色注解1. 在拦截器哪里拦截&#xff1f;2. 如何拦截角色注解&#xff1f;3. 角色如何读取?4. 最后做角色校验 三、应用&#xff1a;给管理员操作接口加注解四、PostMan测试最后 前言 在【7.1】管理员图书录入和修改API&#xf…

原生js实现将图片内容复制到剪贴板

核心代码 /*复制图片*/ copyImg(dom) {/* 警告&#xff1a;dom不能是img标签&#xff0c;建议用DIV标签包裹img标签&#xff0c;否者会报错&#xff01;不支持复制背景图&#xff01; */dom.style.userSelect auto;let selection getSelection(), range document.createRan…

支持向量机(SVM)的超参数调整 C 和 Gamma 参数

作者:CSDN @ _养乐多_ 支持向量机(Support Vector Machine,SVM)是一种广泛应用的监督式机器学习算法。它主要用于分类任务,但也适用于回归任务。在本文中,我们将深入探讨支持向量机的两个重要参数:C和gamma。在阅读本文前,我假设您对该算法有基本的了解,并专注于这些…