文章目录
- 概念
- 管理界面简介
- 4中常见交换器类型
- 1.Direct交换器:
- 2.Fanout交换器
- 3.Topic交换器
- 4.headers交换器
- 对象类型消息传递
- 同步等待
- 使用代码创建队列
- 待续......
概念
在微服务架构中项目之间项目A调用项目B 项目B调用项目C项目C调用项目D。。
用户必须等待项目之间内容依次的运行结束后才会给用户返回结果
这是一个同步的调用,用户等待时间可能比较长,用户体验度比较差
消息中间件: 可以理解成一个队列,把用户需要处理的任务交给放到队列,任务在队列中进行排队等待执行,用户无需等待代码的执行的时间.
管理界面简介
1.Overview: 此面板为RabbitMQ基础信息展示面板,列举了服务器的信息,如:节点名称、内存占用、磁盘占用等。
2.Connections: 此面板中展示所有连接到RabbitMQ的客户端链接。只展示基于5672端口的链接。
3.Channels: 此面板中展示各链接中的具体信道。标记方式为链接(编号),如:192.168.91.1:12345(1)。
4.Exchanges: 此面板中展示RabbitMQ中已有的交换器,并注明交换器名称、类型等基本信息。其中只有direct交换器有默认交换器(AMQP default),当使用direct交换器时,如果没有明确指定名称,使用AMQP default交换器,也可明确指定名称。但是其他类型交换器没有默认的,都需要指定名称。
5.Queues: 此面板展示RabbitMQ中的队列信息。
4中常见交换器类型
1.Direct交换器:
direct会根据路由键把消息放入到指定的队列中。
在Queues界面创建队列q1,在Exchanges创建direct交换机fs.direct绑定交换机q1,指定路由key,fs.q1
一.生产者
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
2.配置yml
# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
rabbitmq:
host: localhost # RabbitMQ服务器的IP。默认localhost
port: 32769 # RabbitMQ服务器的端口。
username: guest # RabbitMQ的访问用户名。默认guest。
password: guest # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
3.在测试包下发送消息:
类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
建议使用接口: 优先级是 RabbitOperations > AmqpTemplate
Spring AMQP可以发送的消息类型必须是Message类型。
Spring AMQP可以帮助程序员自动封装消息类型Message对象(默认封装),自动转换封装的消息体类型是Object,只要类型可序列化即可。
消息发送到队列后,无需等待,是异步操作,生产者接着往下执行.
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitOperations rabbitOperations;
@Test
void contextLoads() {
//仅发送,不需要等待返回值,异步,推荐
rabbitOperations.convertAndSend("fs.dire"(交换机名称),"xy.q1"(路由key),"你好 rabbitmq"(消息内容));
//发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,失去了rabbitmq的意义
//rabbitOperations.convertSendAndReceive("","","");
}
}
二.消费者
1.pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.yml
spring:
rabbitmq:
host: localhost
port: 32769
username: guest
password: guest
3.监听队列
修饰符: public
返回值: 异步消息必须是void
方法名: 自定义
参数表: 一个参数,类型可以是Message或者具体的消息体类型(Object)。message是Spring AMQP中消息的唯一类型。代表完整消息,有头和体组成。如果对消息头没有任何处理要求,则直接定义消息体具体类型即可。
方法实现: 根据具体要求,定义即可。
注意:方法可以抛出任意类型的异常。只要抛出异常,则代表消费错误,RabbitMQ不删除队列中的消息。
注解是RabbitListener。
如果有多个消费监听,默认采用轮询消费,消费监听不需要相互等待,并发执行.
@Component
public class StringMessageConsumer {
@RabbitListener(queues = {"q1"})(队列名称可以指定多个)
public void received1(String m){
System.out.println(m);
}
@RabbitListener(queues = {"q1"})(队列名称可以指定多个)
public void received2(String m){
System.out.println(m);
}
}
2.Fanout交换器
扇形交换器: 会把消息发送给所有的绑定在当前交换器上的队列,可以不写路由键
1.在Queues界面创建队列f1,f2,在Exchanges创建fanout交换机fs.fanout绑定交换机f1,f2,不指定路由
2.编写测试代码
@Autowired
private RabbitOperations rabbitOperations;
@Test
void contextLoad1s() {
//没有绑定路由键,设为null即可
rabbitOperations.convertAndSend("fs.fanout",null,"你好 rabbitmq");
}
3.f1,f2队列都多了一个消息
3.Topic交换器
主题交换器: 路由键可包括特殊字符实现通配。特殊字符包括: 星号和 ‘#’。
星号 : 代表一个单词。多个单词使用’.'分割。
‘#’ : 代表0~n个字符,即任意字符串。//如abc.# 代表以 abc. 开头的所有路由key
1.在Queues界面创建队列t1,t2,t3在Exchanges创建Topic交换机fs.topic绑定交换机t1,t2,t3
2.指定t1路由键: abc.t1,指定t2路由键: abc.*,指定t3路由键: abc.#
3.编写测试代码
@Autowired
private RabbitOperations rabbitOperations;
@Test
void contextLoawd1s() {
rabbitOperations.convertAndSend("fs.topic","abc.t1","你好 rabbitmq");//t1,t2,t3
rabbitOperations.convertAndSend("fs.topic","abc.123","你好 rabbitmq");//t2,t3
rabbitOperations.convertAndSend("fs.topic","abc.123.234","你好 rabbitmq");//t3
}
4.headers交换器
headers交换器和direct交换器的主要区别是在传递消息时可以传递header部分消息
生产消息
在Queues界面创建队列h1,在Exchanges创建headers交换机fs.headers绑定交换机h1,指定路由key,fs.h1
@Autowired
private RabbitOperations rabbitOperations;
@Test
void conwtextLoawd1s() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("fs","java");
Message message = new Message("我是消息".getBytes(), messageProperties);
rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
}
消费消息
@Test
void conwtextLoawd1s() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("fs","java");
Message message = new Message("我是消息".getBytes(), messageProperties);
rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
}
对象类型消息传递
1.生产者
创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private final Long serialVersionUID = 1L;
private String name;
private Integer age;
}
发送消息
@Autowired
private RabbitOperations rabbitOperations;
User user = new User("张三",18);
rabbitOperations.convertAndSend("fs.direct","xy.q1",user);
2.消费者
在消费者中创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致
直接用对应类型接收
@Component
public class StringMessageConsumer {
@RabbitListener(queues = {"q1"})
public void received(User(类型和发送来的一致即可) user){
System.out.println(user);
}
}
统一用Message对象接收,常用语Headers交换器
@Component
public class StringMessageConsumer {
@RabbitListener(queues = {"q1"})
public void received(Message message) throws IOException, ClassNotFoundException {
byte[] body = message.getBody();
ByteArrayInputStream bai = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bai);
Object object = ois.readObject();
if(object instanceof User){
User user = (User) object;
System.out.println(user);
}
}
}
同步等待
等待结果返回,才能往下执行
默认等待5s,可以配置,超时返回null
yml配置
spring:
rabbitmq:
host: localhost # RabbitMQ服务器的IP。默认localhost
port: 32769 # RabbitMQ服务器的端口。
username: guest # RabbitMQ的访问用户名。默认guest。
password: guest # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
生产者:
@Test
void contextLowads() {
//发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,等待结果返回,才能往下执行
String aa = (String)rabbitOperations.convertSendAndReceive("fs.direct", "xy.q1", "aa");
System.out.println(aa);
}
消费者:
@Component
public class StringMessageConsumer {
@RabbitListener(queues = {"q1"})
public String received(Message message) {
return "hello";
}
}
使用代码创建队列
一.在生成者端创建队列
是发送消息时创建,而不是启动项目时。
@Configuration
public class RabbitMQConfig {
// 发送消息时如果不存在这个队列,会自动创建这个队列。
// 注意:是发送消息时,而不是启动项目时。
// 相当于:可视化操作时创建一个队列
// 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
@Bean
public Queue queue(){
return new Queue("queue.second");
}
// 如果没有这个交换器,在发送消息创建这个交换器
// 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct.first.ex");
}
// 配置类中方法参数,会由Spring 容器自动注入
@Bean
public Binding directBingding(DirectExchange directExchange,Queue queue){
// with(“自定义路由键名称”)
return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
// withQueueName() 表示队列名就是路由键名称
// return BindingBuilder.bind(queue).to(directExchange).withQueueName();
}
}
二.消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.second",autoDelete = "false", durable = "true"),
exchange = @Exchange(name = "direct.second.ex",autoDelete = "false", type = ExchangeTypes.DIRECT),
key = {"routing.key.second.1"}
)
})
public void onMessage(String messageBody){
System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
}