一、流程图
二、导包
<!--消息队列 AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
三、配置文件
#消息队列
spring:
rabbitmq:
host: 192.168.88.130
port: 5672
virtual-host: my_vhost #使用的虚拟主机
username: root
password: root
listener:
simple:
acknowledge-mode: manual #开启手动应答
四、配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
/**
* 订单交换机
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 订单队列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 订单路由键
*/
public static final String ORDER_ROUTING = "order_routing";
/**
* 死信交换机
*/
public static final String ORDER_DEAD_EXCHANGE = "order_dead_exchange";
/**
* 死信队列
*/
public static final String ORDER_DEAD_QUEUE = "order_dead_queue";
/**
* 死信路由键
*/
public static final String ORDER_DEAD_ROUTING = "order_dead_routing";
/**
* 订单交换机
*/
@Bean("orderExchange")
public Exchange getOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}
/**
* 订单队列
*/
@Bean("orderQueue")
public Queue getOrderQueue() {
Map<String, Object> map = new HashMap<>(3);
map.put("x-dead-letter-exchange", ORDER_DEAD_EXCHANGE);//死信交换机
map.put("x-dead-letter-routing-key", ORDER_DEAD_ROUTING);//死信路由键
map.put("x-message-ttl", 1000 * 60 * 15);//队列过期时间
return QueueBuilder
.durable(ORDER_QUEUE)
.withArguments(map)
.build();
}
/**
* 将订单交换机与订单队列绑定
*/
@Bean
Binding orderExchangeBindingOrder(@Qualifier("orderExchange") Exchange exchange,
@Qualifier("orderQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();
}
/**
* 死信交换机
*/
@Bean("orderDeadExchange")
public Exchange getOrderDeadExchange() {
return new DirectExchange(ORDER_DEAD_EXCHANGE);
}
/**
* 死信队列
*/
@Bean("orderDeadQueue")
public Queue getOrderDeadQueue() {
return new Queue(
ORDER_DEAD_QUEUE,//队列名
true,//是否持久化
false,//是否具有排他性,只在首次声明时可见,不允许其他用户访问,连接断开时自动删除
false,//是否自动删除,经历过至少一次连接后,所有消费者都断开了连接,此队列会自动删除
null
);
}
/**
* 将死信交换机与死信队列绑定
*/
@Bean
Binding deadExchangeBindingDeadQueue(@Qualifier("orderDeadExchange") Exchange exchange,
@Qualifier("orderDeadQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_ROUTING).noargs();
}
}
五、发送消息的类
import com.sky.configuration.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息队列发送消息
*/
@Component
public class SendRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param orderId 15分钟后要检查的订单编号
*/
public void sendDelayOrder(Long orderId) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,//订单交换机
RabbitMQConfig.ORDER_ROUTING,//订单路由键
orderId//要取消的订单编号
);
}
}
六、接收消息的类
import com.rabbitmq.client.Channel;
import com.sky.configuration.RabbitMQConfig;
import com.sky.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* 消息队列接收消息
*/
@Component
public class ReceiveRabbitMQ {
@Autowired
private OrderMapper orderMapper;
/**
* @param orderId 要取消的订单的编号
* @param msg 包含了要回复的队列
* @param channel 有回复功能的参数
*/
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_QUEUE)
public void ReceiveDeadOrder(Long orderId, Channel channel, Message msg) throws IOException {
orderMapper.delCancelOrder(orderId);//查询数据库,订单是否付款,未付款:改为已取消
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),//应答的消息
false//是否批量应答
);
}
}
七、在业务代码中注入发送类,并调用发送类的发送方法
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private SendRabbitMQ sendRabbitMQ;
/**
* 用户下单
*/
public void submitOrder(OrdersSubmitDTO ordersSubmitDTO) {
sendRabbitMQ.sendDelayOrder(order.getId());//发送延迟消息到消息队列
}
}