服务异步通讯——springcloud
文章目录
- 服务异步通讯——springcloud
- 初始MQ
- RabbitMQ快速入门
- 单机部署
- 1.1.下载镜像
- 安装MQ
- SpringAMQP
- work Queue 工作队列
- Fanout Exchange广播模式
- DirectExchange路由模式
- TopicExchange话题模式
- 消息转换器
初始MQ
RabbitMQ快速入门
官网https://www.rabbitmq.com/
单机部署
我们在Centos7虚拟机中使用Docker来安装。
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
安装MQ
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
SpringAMQP
https://spring.io/projects/spring-amqp/
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.10.88 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName,message);
}
}
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者接受到消息:[" + msg + "]");
}
}
work Queue 工作队列
Fanout Exchange广播模式
@Configuration
public class FanoutConfig {
// itcast.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
// 绑定队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
// 绑定队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg){
System.out.println("消费者接受到fanoutQueue1 消息:[" + msg + "]");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
System.out.println("消费者接受到fanoutQueue2 消息:[" + msg + "]");
}
@Test
public void testSendFanoutExchange(){
//交换机名称
String exchanneName = "itcast.fanout";
//消息
String message = "hello every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchanneName,"",message);
}
DirectExchange路由模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void LinstenDirectQueue1(String msg){
System.out.println("消费者接受到directQueue1 消息:[" + msg + "]");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void LinstenDirectQueue2(String msg){
System.out.println("消费者接受到directQueue2 消息:[" + msg + "]");
}
@Test
public void testSendDirectExchange(){
//交换机名称
String exchanneName = "itcast.direct";
//消息
String message = "hello,red";
// 发送消息
rabbitTemplate.convertAndSend(exchanneName,"red",message);
}
TopicExchange话题模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void LinstenTopicQueue1(String msg){
System.out.println("消费者接受到topicQueue1消息:[" + msg + "]");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void LinstenTopicQueue2(String msg){
System.out.println("消费者接受到topicQueue2消息:[" + msg + "]");
}
@Test
public void testSendTopicExchange(){
//交换机名称
String exchanneName = "itcast.topic";
//消息
String message = "小米汽车倒闭了";
// 发送消息
rabbitTemplate.convertAndSend(exchanneName,"china.news",message);
}
消息转换器
@Test
public void testSendOnjectQueue() {
Map<String, Object> msg = new HashMap<>();
msg.put("name","柳岩");
msg.put("age",18);
rabbitTemplate.convertAndSend("object.queue",msg);
}
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@RabbitListener(queues = "object.queue")
public void ListenObjectQueue(Map<String,Object> msg){
System.out.println("接收到object.queue的消息:" + msg);
}