1.拉取镜像
docker pull rabbitmq:3.9.15-management
2.运行容器
docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.15-management
3.访问地址
安装ip加端口号
http://192.168.123.3:15672/
客户端如下:
登录账号密码:
username:guest
password:guest
4.新增用户
创建管理员账号:
admin
admin
点击add user保存
5.新增虚拟空间
名字要以/开头
/mqname1
创建成功
查看是否授予权限
授权给guest用户权限,根据自己需要授权
授权成功
6.原生RabbitMq代码实现
加入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
package com.mq.pruducer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: 简单模式生产者
* @Date: 2024/01/29/15:16
* @Description: good good study,day day up
*/
public class SimpleProducer {
/**
* 简单模式消息的生产者发送消息
* @param args
*/
public static void main(String[] args) throws Exception{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ服务主机地址
connectionFactory.setHost("192.168.3.123");
//设置RabbitMQ服务端口,默认5672
connectionFactory.setPort(5672);
//设置虚拟主机名字,默认/
connectionFactory.setVirtualHost("/mqname1");
//设置用户连接名,默认guest
connectionFactory.setUsername("admin");
//设置连接密码,默认guest
connectionFactory.setPassword("admin");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列
/**
* 1.队列的名字
* 2.持久化
* 3.是否独占队列,ture:只有这个对象可以操作这个队列,其他的对象如果要操作,只能等这个队列操作结束,相当于加锁
* 4.在本次连接释放以后,是否删除队列---类似数据库临时表
* 5.队列的附加属性
*/
channel.queueDeclare("simple_queue", true, false, false, null);
for (int i = 0; i < 10; i++) {
//创建消息
String message = "这是RabbitMQ的第" + i + "条消息!";
//消息发送
/**
* 1.交换机
* 2.routingkey是什么:简单模式下和队列的名字保持一致
* 3.消息的附加属性是什么
* 4.消息的内容是什么
*/
channel.basicPublish("","simple_queue", null, message.getBytes());
//关闭资源
}
channel.close();
connection.close();
}
}
查看发送消息
发送了10条消息
消费消息
package com.mq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* @Author: 简单模式消息消费者
* @Date: 2024/01/29/15:31
* @Description: good good study,day day up
*/
public class SimpleConsumer {
/**
* 简单模式消息消费者接受消息
* @param args
*/
public static void main(String[] args) throws Exception{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ服务主机地址,默认localhost
connectionFactory.setHost("192.168.3.123");
//设置RabbitMQ服务端口,默认5672
connectionFactory.setPort(5672);
//设置虚拟主机名字,默认/
connectionFactory.setVirtualHost("/mqname1");
//设置用户连接名,默认guest
connectionFactory.setUsername("admin");
//设置连接密码,默认guest
connectionFactory.setPassword("admin");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列
/**
* 1.队列的名字
* 2.持久化
* 3.是否独占队列
* 4.在本次连接释放以后,是否删除队列---临时表
* 5.队列的附加属性
*/
channel.queueDeclare("simple_queue", true, false, false, null);
//创建消费者,并设置消息处理:自定义的操作
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 真实自定义处理消息的逻辑
* @param consumerTag:消息的标签
* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
* @param properties
* @param body:消息的内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {
String s = new String(body, "UTF-8");
System.out.println("收到的消息的内容为:" + s);
long deliveryTag = envelope.getDeliveryTag();//消息的编号
String exchange = envelope.getExchange();//交换机的信息
String routingKey = envelope.getRoutingKey();//routingKey的信息
System.out.println("收到的消息的编号为:" + deliveryTag);
System.out.println("收到的消息的所属的为:" + exchange);
System.out.println("收到的消息所属的队列为:" + routingKey);
//保存消息到数据库
}
};
//消息监听
/**
* 1.监听队列的名字
* 2.是否自动确认消息
*/
channel.basicConsume("simple_queue", true, defaultConsumer);
//关闭资源(不建议关闭,建议一直监听消息)
}
}
已经消费
广播模式
package com.mq.pruducer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: 广播模式生产者
* @Date: 2024/01/29/15:58
* @Description: good good study,day day up
*/
public class FanoutProducer {
/**
* 广播模式消息的生产者发送消息
* @param args
*/
public static void main(String[] args) throws Exception{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ服务主机地址,默认localhost
connectionFactory.setHost("192.168.3.123");
//设置RabbitMQ服务端口,默认5672
connectionFactory.setPort(5672);
//设置虚拟主机名字,默认/
connectionFactory.setVirtualHost("/mqname1");
//设置用户连接名,默认guest
connectionFactory.setUsername("admin");
//设置连接密码,默认guest
connectionFactory.setPassword("admin");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列
/**
* 1.队列的名字
* 2.持久化
* 3.是否独占队列
* 4.在本次连接释放以后,是否删除队列---临时表
* 5.队列的附加属性
*/
channel.queueDeclare("fanout_queue_1", true, false, false, null);
channel.queueDeclare("fanout_queue_2", true, false, false, null);
//声明交换机
/**
* 1.交换机的名字
* 2.交换机的类型
*/
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
//绑定
/**
* 1.队列
* 2.交换机
* 3.routingkey
*/
channel.queueBind("fanout_queue_1", "fanout_exchange", "");
channel.queueBind("fanout_queue_2", "fanout_exchange", "");
for (int i = 0; i < 10; i++) {
//创建消息
String message = "这是广播模式的第" + i + "条消息!";
//消息发送
/**
* 1.交换机
* 2.routingkey是什么:简单模式下和队列的名字保持一致
* 3.消息的附加属性是什么
* 4.消息的内容是什么
*/
if(i % 3 == 0){
channel.basicPublish("fanout_exchange","", null, message.getBytes());
}else{
channel.basicPublish("fanout_exchange","", null, message.getBytes());
}
//关闭资源
}
channel.close();
connection.close();
}
}
消费者
package com.mq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author: 广播模式消费者1
* @Date: 2024/01/29/16:03
* @Description: good good study,day day up
*/
public class FanoutConsumer1 {
/**
* 广播模式消息消费者接受消息
* @param args
*/
public static void main(String[] args) throws Exception{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ服务主机地址,默认localhost
connectionFactory.setHost("192.168.3.123");
//设置RabbitMQ服务端口,默认5672
connectionFactory.setPort(5672);
//设置虚拟主机名字,默认/
connectionFactory.setVirtualHost("/mqname1");
//设置用户连接名,默认guest
connectionFactory.setUsername("admin");
//设置连接密码,默认guest
connectionFactory.setPassword("admin");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列
/**
* 1.队列的名字
* 2.持久化
* 3.是否独占队列
* 4.在本次连接释放以后,是否删除队列---临时表
* 5.队列的附加属性
*/
channel.queueDeclare("fanout_queue_1", true, false, false, null);
//创建消费者,并设置消息处理:自定义的操作
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 真实自定义处理消息的逻辑
* @param consumerTag:消息的标签
* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
* @param properties
* @param body:消息的内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到的消息的内容为:" + s);
long deliveryTag = envelope.getDeliveryTag();//消息的编号
String exchange = envelope.getExchange();//交换机的信息
String routingKey = envelope.getRoutingKey();//routingKey的信息
System.out.println("收到的消息的编号为:" + deliveryTag);
System.out.println("收到的消息的所属的为:" + exchange);
System.out.println("收到的消息所属的队列为:" + routingKey);
//保存消息到数据库
}
};
//消息监听
/**
* 1.监听队列的名字
* 2.是否自动确认消息
*/
channel.basicConsume("fanout_queue_1", true, defaultConsumer);
//关闭资源(不建议关闭,建议一直监听消息)
}
}
package com.mq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author: 广播模式消费者2
* @Date: 2024/01/29/16:03
* @Description: good good study,day day up
*/
public class FanoutConsumer2 {
/**
* 广播模式消息消费者接受消息
* @param args
*/
public static void main(String[] args) throws Exception{
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ服务主机地址,默认localhost
connectionFactory.setHost("192.168.3.123");
//设置RabbitMQ服务端口,默认5672
connectionFactory.setPort(5672);
//设置虚拟主机名字,默认/
connectionFactory.setVirtualHost("/mqname1");
//设置用户连接名,默认guest
connectionFactory.setUsername("admin");
//设置连接密码,默认guest
connectionFactory.setPassword("admin");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列
/**
* 1.队列的名字
* 2.持久化
* 3.是否独占队列
* 4.在本次连接释放以后,是否删除队列---临时表
* 5.队列的附加属性
*/
channel.queueDeclare("fanout_queue_2", true, false, false, null);
//创建消费者,并设置消息处理:自定义的操作
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 真实自定义处理消息的逻辑
* @param consumerTag:消息的标签
* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
* @param properties
* @param body:消息的内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到的消息的内容为:" + s);
long deliveryTag = envelope.getDeliveryTag();//消息的编号
String exchange = envelope.getExchange();//交换机的信息
String routingKey = envelope.getRoutingKey();//routingKey的信息
System.out.println("收到的消息的编号为:" + deliveryTag);
System.out.println("收到的消息的所属的为:" + exchange);
System.out.println("收到的消息所属的队列为:" + routingKey);
//保存消息到数据库
}
};
//消息监听
/**
* 1.监听队列的名字
* 2.是否自动确认消息
*/
channel.basicConsume("fanout_queue_2", true, defaultConsumer);
//关闭资源(不建议关闭,建议一直监听消息)
}
}
广播模式队列
广播模式交换机
7.springboot整合RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
server:
port: 19012
spring:
rabbitmq:
host: 192.168.3.123
port: 5672
virtual-host: /mqname1
username: admin
password: admin
配置类
package com.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: 配置类
* @Date: 2024/01/29/16:37
* @Description: good good study,day day up
*/
@Configuration
public class RabbitMQConfig {
//创建队列
@Bean("myQueue")
public Queue myQueue(){
return QueueBuilder.durable("springboot_queue").build();
}
//创建交换机
@Bean("myExchange")
public Exchange myExchange(){
return ExchangeBuilder.topicExchange("springboot_exchange").build();
}
//创建绑定
@Bean
public Binding myBinding(@Qualifier("myQueue") Queue myQueue,
@Qualifier("myExchange") Exchange myExchange){
return BindingBuilder.bind(myQueue).to(myExchange).with("user.#").noargs();
}
}
发送消息测试
package com.mq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: 测试类
* @Date: 2024/01/29/13:36
* @Description: good good study,day day up
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/one")
public String one(){
rabbitTemplate.convertAndSend("springboot_exchange","user.insert","1新增类型的消息");
rabbitTemplate.convertAndSend("springboot_exchange","user.update","2修改类型的消息");
rabbitTemplate.convertAndSend("springboot_exchange","user.delete","3删除类型的消息");
return "发送成功";
}
}
新建一个监听服务
package com.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: 监听类mq
* @Date: 2024/01/29/17:19
* @Description: good good study,day day up
*/
@Component
public class MessageListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}