RabbitMQ 学习笔记

RabbitMQ学习笔记

 

一些概念

Broker :RabbitMQ服务。

virtual host: 其实就是分组。

Connection:连接,生产者消费者与Broker之间的TCP连接。

Channel:网络信道,轻量级的Connection,使用Channel可以减少Connection的建立,减少开销。

Message:消息,由 PropertiesBody组成,Properties可以对消息的优先级、延迟等特性进行记录,Body存储消息体的内容。

Exchange:交换机,没有消息存储功能,负责分发消息。

BindingExchangeQueue之间的虚拟连接,其中可以包含Routing Key

Routing Key:路由规则,用于确定如何分发、接收消息。

Queue:消息队列,保存消息并将其转发给消费者进行消费。

安装

Windows安装

安装erLang语言

进入官网

image-20220723085850289

 

 

下载完之后一直下一步安装即可,安装完成后进入目录,配置环境变量

image-20220723092150573

image-20220723092301127

安装RabbitMQ服务端

Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server (github.com)

image-20220723091828280

一直下一步安装即可

安装完成后打开安装目录,进入到这个文件夹打开命令行

image-20220723093324568

输入命令安装插件

rabbitmq-plugins enable rabbitmq_management

完成后双击rabbitmq-server.bat

打开http://localhost:15672/

用户名密码是guest/guest

image-20220723093515104

image-20220723093550183

Linux下使用 Docker 安装

直接拉取最新版

docker pull rabbitmq

运行容器

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

进入容器

docker exec -it rabbitmq /bin/bash

开启管理插件

rabbitmq-plugins enable rabbitmq_management

image-20220723103556298

打开管理网站 http://localhost:15672/

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

用户名密码均为 guest

image-20220723103a414689

实操

官网例子

简单模式

11111

配置文件 application-easy.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /
    queue: easy-queue

生产者:

package com.gettler.rabbitmq.easy;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        /*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello, this is an easy message";
        /*
          发送一个消息
          1.发送到那个交换机(空代表默认交换机)
          2.路由key
          3.其他的参数信息
          4.发送消息的消息体
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        logger.info("消息发送完毕");
    }
}

消费者:

package com.gettler.rabbitmq.easy;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerTest.class);

    @Test
    public void testConsumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20240620161232526

工作模式

在这里插入图片描述

配置文件 application-work.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /
    queue: work-queue

生产者:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Scanner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Test
    public void testProducer() throws Exception {
        System.out.println(this.host);
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        /*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /*
              发送一个消息
              1.发送到那个交换机(空代表默认交换机)
              2.路由key
              3.其他的参数信息
              4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20240620161656576

路由模式

配置文件 application-direct.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);


    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("info", "普通 info 信息");
        messageMap.put("warning", "警告 warning 信息");
        messageMap.put("error", "错误 error 信息");
        messageMap.put("debug", "调试 debug 信息");
        for (Map.Entry<String, String> mes : messageMap.entrySet()) {
            String routingKey = mes.getKey();
            String message = mes.getValue();
            channel.basicPublish("direct", routingKey, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 创建channel
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        // 声明临时队列
        channel.queueDeclare("console", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("console", "direct", "info");
        channel.queueBind("console", "direct", "warning");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("console", true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        // 声明临时队列
        channel.queueDeclare("disk", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("disk", "direct", "error");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("disk", true, deliverCallback, cancelCallback);
    }
}

image-20240620161838310

广播模式

配置文件 application-fanout.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 发送10条消息
        for (int i = 0; i < 10; i++) {
            String message = i + "";
            channel.basicPublish("fanout", "", null, message.getBytes());
            logger.info("消息发送完毕" + message);
        }
    }
}

消费者A:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列与交换机
        channel.queueBind(queueName, "fanout", "");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列与交换机
        channel.queueBind(queueName, "fanout", "");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

image-20240620162526952

主题模式

配置文件 application-topic.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("class1.DB.exam", "一班数据库考试通知");
        messageMap.put("class1.OS.exam", "一班操作系统考试通知");
        messageMap.put("class2.DB.exam", "二班数据库考试通知");
        messageMap.put("class2.OS.exam", "二班操作系统考试通知");
        for (Map.Entry<String, String> mes : messageMap.entrySet()) {
            String routingKey = mes.getKey();
            String message = mes.getValue();
            channel.basicPublish("topic", routingKey, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A(模拟一班的学生):

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class StudentOfClass1Consumer {
    private static final Logger logger = LoggerFactory.getLogger(StudentOfClass1Consumer.class);

    @Test
    public void testStudentOfClass1Consumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        // 创建Q1队列
        channel.queueDeclare("student_of_class1", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("student_of_class1", "topic", "class1.#");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("student_of_class1", true, deliverCallback, cancelCallback);
    }
}

消费者B(模拟操作系统老师):

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TeacherConsumer {
    private static final Logger logger = LoggerFactory.getLogger(TeacherConsumer.class);

    @Test
    public void testTeacherConsumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        // 创建Q1队列
        channel.queueDeclare("teacher_of_OS", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("teacher_of_OS", "topic", "#.OS.#");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("teacher_of_OS", true, deliverCallback, cancelCallback);
    }
}

image-20240620162754734

谷粒商城 RabbitMQ 学习笔记

新建Maven项目

添加依赖

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.0.0</version>
</dependency>

编写发送端

package org.example;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send  
{  
    //队列名称  
    private final static String QUEUE_NAME = "helloMQ";  
  
    public static void main(String[] argv) throws java.io.IOException, TimeoutException  
    {  
        /** 
         * 创建连接连接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        //设置MabbitMQ所在主机ip或者主机名  
        factory.setHost("localhost");  
        //创建一个连接  
        Connection connection = factory.newConnection();  
        //创建一个频道  
        Channel channel = connection.createChannel();  
        //指定一个队列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //发送的消息  
        String message = "hello world!";  
        //往队列中发出一条消息  
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'");  
        //关闭频道和连接  
        channel.close();  
        connection.close();  
     }  
}  

编写接收端

package org.example;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "helloMQ";

    public static void main(String[] argv) throws Exception {

        // 打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

运行接收端

image-20220723101156639

运行发送端,每运行一次发送一次消息

image-20220723101246973

管理网站上有接收端的连接(发送端发送后便断开连接了)

image-20220723101256826

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring.rabbitmq.host=192.168.3.200
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
创建Exchange
public void createExchange() {
    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
}
创建Queue
public void createQueue() {
    Queue queue = new Queue("hello-java-queue", true, false, false);
    amqpAdmin.declareQueue(queue);
}
连接Queue和Exchange
public void createBinding() {
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
    amqpAdmin.declareBinding(binding);
}
发送消息
public void sendMessage() {
    String msg = "hello world";
    List<String> s = new ArrayList<>();
    s.add(msg);
    s.add("List");
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", s, new CorrelationData(UUID.randomUUID().toString()));
}
接收消息

想要接受对象消息,需使用JSON序列化机制,进行消息转换

编写MyRabbitConfig配置类

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

使用RabbitListener注解监听队列,该注解参数可以是Object content, Message message, Channel channel。

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message) {
    System.out.println("接受到消息内容:" + message);
}
可靠抵达

编写配置文件

# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 抵达队列后以异步发送优先回调抵达队列后的回调returnconfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

将MyRabbitConfig修改为

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 使用JSON序列化机制,进行消息转换
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @PostConstruct // MyRabbitConfig对象创建完成后执行该方法
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 消息抵达节点的话ack就为true
             * @param correlationData   当前消息的唯一关联数据(消息唯一ID)
             * @param ack 消息是否成功收到
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirming...correlationData{" + correlationData + "},ack{" + ack + "},cause{" + cause + "}");
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 没抵达队列,触发这个失败回调函数
             * @param message
             * @param replyCode
             * @param replyText
             * @param exchange
             * @param routingKey
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Unreachable...message{" + message + "},replyCode{" + replyText + "},exchange{" + exchange + "},routingKey{" + routingKey + "}");
            }
        });
    }
}

监听队列方法修改为

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, List list, Channel channel) throws IOException {
    System.out.println("接受到消息内容:" + list);
    // channel内按顺序递增
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.println(deliveryTag);
    // 签收
    try {
        channel.basicAck(deliveryTag, false); // 是否批量签收
    } catch (Exception e) {
        // 网络中断
        // b1 = false 丢弃, b1 = true 发回服务器,服务器重新入队。
        channel.basicNack(deliveryTag, false, false);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/728465.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】一个极简但完整的C++程序

一、一个极简但完整的C程序 我们编写程序是为了解决问题和任务的。 1、任务&#xff1a; 某个书店将每本售出的图书的书名和出版社&#xff0c;输入到一个文件中&#xff0c;这些信息以书售出的时间顺序输入&#xff0c;每两周店主会手工计算每本书的销售量、以及每个出版社的…

任务调度框架革新:TASKCTL在Docker环境中的高级应用

Docker&#xff1a;轻量级容器化技术的魅力 Docker 作为一款开源的轻量级容器化技术&#xff0c;近年来在 IT 界掀起了一股热潮。它通过封装应用及其运行环境&#xff0c;使得开发者可以快速构建、部署和运行应用。Docker 的优势在于其轻量级、可移植性和可扩展性&#xff0c;它…

http和https的区别在哪

HTTP&#xff08;超文本传输协议&#xff09;和HTTPS&#xff08;超文本传输安全协议&#xff09;之间存在几个关键区别主要涉及安全性、端口、成本、加密方式、搜索引擎优化&#xff08;SEO&#xff09;、身份验证等方面 1、安全性&#xff1a;HTTP&#xff08;超文本传输协议…

Python | Leetcode Python题解之第171题Excel列表序号

题目&#xff1a; 题解&#xff1a; class Solution:def titleToNumber(self, columnTitle: str) -> int:number, multiple 0, 1for i in range(len(columnTitle) - 1, -1, -1):k ord(columnTitle[i]) - ord("A") 1number k * multiplemultiple * 26return n…

CASS界址点顺序重排

1、绘制一个宗地&#xff0c;如下&#xff1a; 2、注记界址点号&#xff0c;如下 3、【地籍】--【调整宗地内界址点顺序】&#xff0c;如下&#xff1a; 重排完成后&#xff0c;点击工具栏的【重】按钮&#xff0c;即可刷新标注。

Docker 拉取镜像失败处理 配置使用代理拉取

解决方案 1、在 /etc/systemd/system/docker.service.d/http-proxy.conf 配置文件中添加代理信息 2、重启docker服务 具体操作如下&#xff1a; 创建 dockerd 相关的 systemd 目录&#xff0c;这个目录下的配置将覆盖 dockerd 的默认配置 代码语言&#xff1a;javascript 复…

Golang | Leetcode Golang题解之第171题Excel列表序号

题目&#xff1a; 题解&#xff1a; func titleToNumber(columnTitle string) (number int) {for i, multiple : len(columnTitle)-1, 1; i > 0; i-- {k : columnTitle[i] - A 1number int(k) * multiplemultiple * 26}return }

Linux——ansible关于“文件操作”的模块

修改文件并将其复制到主机 一、确保受管主机上存在文件 使用 file 模块处理受管主机上的文件。其工作方式与 touch 命令类似&#xff0c;如果不存在则创建一个空文件&#xff0c;如果存在&#xff0c;则更新其修改时间。在本例中&#xff0c;除了处理文件之外&#xff0c;Ansi…

2024广东省职业技能大赛云计算赛项实战——Redis主从架构

Redis主从架构 前言 Redis是一个开源的内存数据结构存储系统&#xff0c;一般用于作为数据库、缓存和消息代理使用&#xff0c;而主从架构是许多分布式系统中常见的设计模式&#xff0c;用来提高系统的性能、可靠性和扩展性。 虚拟机使用的是自行创建的CentOS7&#xff0c;如…

自定义平台后台登录地址前缀的教程

修改平台后台地址默认的 admin 前缀 修改后端 config/admin.php 配置文件,为自定义的后缀 修改 平台后台前端源码中 src/settings.js 文件,修改为和上面一样的配置 修改后重新打包前端代码,并且覆盖到后端的 public 目录下 重启 swoole 服务即可

关于如何得到Mindspore lite所需要的.ms模型

关于如何得到Mindspore lite所需要的.ms模型 一、.ckpt模型文件转.mindir模型二、.mindir模型转.ms模型三、其它3.1 代码3.2 数据 四、参考文档 一、.ckpt模型文件转.mindir模型 由于要得到ms模型&#xff0c;mindspore的所有模型里面&#xff0c;是必须要用mindir模型才可以进…

计算机毕业设计Python深度学习房价预测 房价可视化 链家爬虫 房源爬虫 房源可视化 卷积神经网络 大数据毕业设计 机器学习 人工智能 AI

基于python一/二手房数据爬虫分析预测系统可视化 商品房数据Flask框架&#xff08;附源码&#xff09; 项目介绍python语言、Flask框架、MySQL数据库、Echarts可视化 sklearn机器学习 多元线性回归预测模型、requests爬虫框架 链家一手房 一手房数据商品房数据、分析可视化预测…

Unexpected tokens (use ‘;‘ to separate expressions on the same line)

idear配置gradle时,出现这样的错误&#xff1a; 一、dependencies dependencies { testImplementation(platform("org.junit.jupiter:junit-jupiter-api:5.9.1")) testImplementation("org.junit.jupiter:junit-jupiter-engine:5.9.1") // https://mvnr…

苹果电脑如何清理磁盘空间 苹果电脑如何清理系统数据

你是否遇到过电脑磁盘空间不足的情况呢&#xff1f;Mac电脑有着流畅的操作系统&#xff0c;但是随着日常使用&#xff0c;可能电脑里的垃圾文件越来越多&#xff0c;导致磁盘空间不足&#xff0c;随之会出现电脑卡顿、软件闪退等情况。及时清理磁盘空间可以有效避免电脑这些问题…

腾讯云开端口

轻量服务器 由于开发者计划&#xff0c;这些腾讯云 阿里云什么的小vps&#xff0c;是非常之便宜&#xff0c;甚至到了白送的地步&#xff08;小阿&#xff09;&#xff0c;但是作为一个web安全学习者必要的vps操作还是要会的 开启端口 腾讯云的轻量服务器是没有安全组的&…

【总线】AXI4第二课时:深入AXI4总线的基础事务

大家好,欢迎来到今天的总线学习时间!如果你对电子设计、特别是FPGA和SoC设计感兴趣&#xff0c;那你绝对不能错过我们今天的主角——AXI4总线。作为ARM公司AMBA总线家族中的佼佼者&#xff0c;AXI4以其高性能和高度可扩展性&#xff0c;成为了现代电子系统中不可或缺的通信桥梁…

数据结构之“算法的时间复杂度和空间复杂度”

&#x1f339;个人主页&#x1f339;&#xff1a;喜欢草莓熊的bear &#x1f339;专栏&#x1f339;&#xff1a;数据结构 目录 前言 一、算法效率 1.1算法的复杂度概念 1.2复杂度的重要性 二、时间复杂度 2.1时间复杂度的概念 2.2大O的渐进表示法 2.3常见的时间复杂度…

Spring Boot -- 图书管理系统(登录、展示+翻页、添加/修改图书)

文章目录 一、应用分层二、数据库的设计三、登录功能四、展示列表&#xff08;使用虚构的数据&#xff09;五、翻页 展示功能六、添加图书七、修改图书 一、应用分层 为什么我们需要应用分层&#xff1a;当代码量很多时&#xff0c;将其全部放在一起查找起来就会很麻烦&#…

【SQL每日一练】HackerRan-Basic Join-Challenges练习

文章目录 题目题析题解1.sqlserver 题目 编写一个查询来打印 hacker _ id、 name 和每个学生创建的挑战的总数。按照挑战的总数按降序对结果进行排序。如果不止一个学生创建了相同数量的挑战&#xff0c;那么按 hacker _ id 对结果进行排序。如果不止一个学生创建了相同数量的…

Node.js单点登录SSO详解:Session、JWT、CORS让登录更简单

文章目录 一、SSO介绍1、使用SSO的好处 二、中间件介绍1、Express安装导入使用 2、cors安装导入配置 3、express-session安装导入配置使用 4、jsonwebtoken安装导入使用 5、jwt和session对比 三、SSO实现方案1、安装依赖2、结构3、实现原理 三、示例代码1、nodejs端 server/ind…