目录
0.交换机种类和区别
1.声明队列和交换机以及RountKey
2.初始化循环绑定
3.声明交换机
4.监听队列
4.1 监听普通队列
4.2监听死信队列
5.削峰填谷的实现
0.交换机种类和区别
-
Direct Exchange(直连交换机):
- 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。
- 当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。
-
Fanout Exchange(扇出交换机):
- 扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。
- 当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。
-
Topic Exchange(主题交换机):
- 主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。
- 绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。
-
Headers Exchange(标头交换机):
- 标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。
- 在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。
如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息
比如此时交换机有20条信息,a,b队列都会分配到20条信息
默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置
1.声明队列和交换机以及RountKey
package com.example.config; import lombok.Getter; @Getter public enum RabbitmqBind { DATA_CLEAN_PROCESS_DEAD( RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP, RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD, RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD, false, false, null, null ), DATA_CLEAN_PROCESS( RabbitMqExchangeEnum.E_DIRECT_RCP, RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS, RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS, true, true, RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP, RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD), SMS_CLEAN_DEAD( RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP, RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD, RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD, true, false, null, null ), SMS_CLEAN( RabbitMqExchangeEnum.E_TOPIC_RCP, RabbitMqQueueConstants.Q_API_TO_DCN_SMS, RabbitmqRoutingKey.K_API_TO_DCN_SMS, true, true, RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP, RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD ), ; /** * 交换机 */ private RabbitMqExchangeEnum exchange; /** * 队列名称 */ private String queueName; /** * 路由Key */ private RabbitmqRoutingKey routingKey; /** * 绑定标识 * 是否启用 */ private Boolean isBind; /** * 是否绑定死信 */ private Boolean isDeathBelief; /** * 绑定的死信交换机 */ private RabbitMqExchangeEnum boundDeadExchange; /** * 死信key */ private RabbitmqRoutingKey deadRoutingKey; RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind, Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey ) { this.exchange = exchange; this.queueName = queueName; this.routingKey = routingKey; this.isBind = isBind; this.isDeathBelief = isDeathBelief; this.boundDeadExchange = boundDeadExchange; this.deadRoutingKey = deadRoutingKey; } /** * 交换机 */ @Getter public enum RabbitMqExchangeEnum { /** * 交换机定义,类型 - 名称 */ E_DIRECT_RCP("direct", "E_DIRECT_RCP"), DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"), E_TOPIC_RCP("topic", "E_TOPIC_RCP"), E_TOPIC_PAY("topic", "E_TOPIC_PAY"); private String exchangeType; private String exchangeName; RabbitMqExchangeEnum(String exchangeType, String exchangeName) { this.exchangeType = exchangeType; this.exchangeName = exchangeName; } } /** * 队列名定义 */ public interface RabbitMqQueueConstants { /** * 接收清洗数据 */ String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS"; /** * 清洗结束通知 */ String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS"; /** * 死信队列 */ String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD"; /** * 清洗结束通知死信队列 */ String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD"; } /** * routingKey */ @Getter public enum RabbitmqRoutingKey { /** * 路由 */ K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"), K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"), // 路由绑定死信路由 DEAD("DEAD"), //死信路由 K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"), K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"), ; private String keyName; RabbitmqRoutingKey(String keyName) { this.keyName = keyName; } } }
2.初始化循环绑定
package com.example.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Arrays; @Configuration @ConditionalOnClass(EnableRabbit.class) public class MqConfig { @Resource protected RabbitTemplate rabbitTemplate; @Resource ConnectionFactory connectionFactory; // // @Lazy // @Autowired // protected RabbitAdmin rabbitAdmin; // // // public static final int DEFAULT_CONCURRENT = 10; // // @Bean("customContainerFactory") // public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, // ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConcurrentConsumers(DEFAULT_CONCURRENT); // factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT); // configurer.configure(factory, connectionFactory); // return factory; // } // // @Bean // @ConditionalOnMissingBean // public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { // return new RabbitTransactionManager(connectionFactory); // } // // @Bean // @ConditionalOnMissingBean // public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { // return new RabbitAdmin(connectionFactory); // } @PostConstruct protected void init() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitTemplate.setChannelTransacted(true); //创建exchange Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values()) .forEach(rabbitMqExchangeEnum -> { Exchange exchange = RabbitmqExchange .getInstanceByType(rabbitMqExchangeEnum.getExchangeType()) .createExchange(rabbitMqExchangeEnum.getExchangeName()); rabbitAdmin.declareExchange(exchange); } ); //创建队列并绑定exchange Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> { if (RabbitmqBind.getIsBind()) { if (RabbitmqBind.getIsDeathBelief()) { //需要绑定死信交换机的队列 rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName()) .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName()) .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build()); rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(), Binding.DestinationType.QUEUE, RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null)); } else { //不需要绑定死信交换机的队列 rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(), true, false, false, null)); rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(), Binding.DestinationType.QUEUE, RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null)); } } }); } }
绑定的形式由枚举类中定义
3.声明交换机
package com.example.config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;
import java.util.Arrays;
@Getter
@Slf4j
public enum RabbitmqExchange {
DIRECT("direct"){
@Override
public Exchange createExchange(String exchangeName) {
return new DirectExchange(exchangeName, true, false);
}
},
TOPIC("topic"){
@Override
public Exchange createExchange(String exchangeName) {
return new TopicExchange(exchangeName, true, false);
}
};
public static RabbitmqExchange getInstanceByType(String type){
return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
.findAny()
.orElseThrow(() ->
// new ProcessException("无效的exchange type")
new RuntimeException("无效的exchange type")
);
}
private String type;
RabbitmqExchange(String type) {
this.type = type;
}
public abstract Exchange createExchange(String exchangeName);
}
4.监听队列
4.1 监听普通队列
package com.example.listener; import com.example.config.RabbitmqBind; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component @RabbitListener(queues = { RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5") //, containerFactory = "customContainerFactory" public class MqListener { @RabbitHandler public void processMessage(String message) { log.info("DataClean recive message :{} ", message); process(message); } @RabbitHandler public void processMessage(byte[] message) { String msg = new String(message); log.info("DataClean recive message :{} ", msg); process(msg); } /** * 处理推送消息 * @param message */ private void process(String message) { log.info("process message :{}" , message); if(StringUtils.isBlank(message)) { log.error("process message is blank , message:{}" , message); return; } } }
监听并处理任务
4.2监听死信队列
package com.example.listener; import com.example.config.RabbitmqBind; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component @RabbitListener(queues = { RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5") public class DeadListener { @RabbitHandler public void processMessage(String message) { log.info("DataClean recive message :{} ", message); process(message); } @RabbitHandler public void processMessage(byte[] message) { String msg = new String(message); log.info("DataClean recive message :{} ", msg); process(msg); } /** * 处理推送消息 * @param message */ private void process(String message) { log.info("Dead process message :{}" , message); if(StringUtils.isBlank(message)) { log.error("Dead process message is blank , message:{}" , message); return; } } }
5.削峰填谷的实现
把高峰期的消息填进低峰期
可以用拉取的方式来实现
或者用消费者的最大数量和最小数量来实现
channel.basicQos();//设置最大获取未确认消息的数量,实现权重