文章目录
- 前言
- 1、stream设计思想
- 2、编码常用的注解
- 3、编码步骤
- 3.1、添加依赖
- 3.2、修改配置文件
- 3.3、生产
- 3.4、消费
- 3.5、延迟队列
- 3.5.1、修改配置文件
- 3.5.2、生产端
- 3.5.2、消息确认机制 消费端
前言
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。
1、stream设计思想
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
2、编码常用的注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
3、编码步骤
3.1、添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.2、修改配置文件
server:
port: 8088
spring:
cloud:
stream:
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于bidding整合
type: rabbit #消息组件类型
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: localhost #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: tiger #rabbitmq 用户名
password: tiger #rabbitmq 密码
virtual-host: tiger_vh #虚拟路径
bindings: #服务的整合处理
saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道
destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
3.3、生产
/**
* 订单消息输出通道处理器
*/
@Component
public interface OrderOutputChannelProcesor {
@Output("saveOrderOutput")
MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
@Autowired
@Output("saveOrderOutput")
private MessageChannel messageChannel;
public void sentMsg(UserInfo userInfo){
messageChannel.send(MessageBuilder.withPayload(userInfo).build());
log.info("消息发送成功:" + userInfo);
}
}
3.4、消费
/**
* 订单消息输入通道处理器
*/
@Component
public interface OrderInputChannelProcesor {
@Input("saveOrderInput")
SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){
log.info("接收消息成功:" + userInfoMessage.getPayload());
}
}
3.5、延迟队列
安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.5.1、修改配置文件
server:
port: 8088
spring:
cloud:
stream:
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于bidding整合
type: rabbit #消息组件类型
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: localhost #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: tiger #rabbitmq 用户名
password: tiger #rabbitmq 密码
virtual-host: tiger_vh #虚拟路径
bindings: #服务的整合处理
saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道
destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit
group: saveOrderGroup #分组
rabbit:
bindings: #服务的整合处理
saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道
producer:
delayed-exchange: true
saveOrderInput:
consumer:
delayed-exchange: true
3.5.2、生产端
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
@Autowired
@Output("saveOrderOutput")
private MessageChannel messageChannel;
public void sentMsg(UserInfo userInfo){
messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
log.info("消息发送成功:" + userInfo);
}
}
3.5.2、消息确认机制 消费端
rabbit:
bindings: #服务的整合处理
saveOrderInput:
consumer:
acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){
log.info("接收消息成功:" + userInfoMessage.getPayload());
Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
/*
* deliveryTag:Channel的消息投递的唯一标识符。
* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
* 如果设置为false,则消息被丢弃或发送到死信Exchange。
*/
try {
channel.basicAck(delieverTag,true);
} catch (IOException e) {
e.printStackTrace();
}
}
定义交换机类型为direct
rabbit:
bindings: #服务的整合处理
saveOrderInput:
consumer:
bindingRoutingKey: orderRoutingKey
bindQueue: true
exchangeType: direct
saveOrderOutput:
producer:
routingKeyExpression: '''orderRoutingKey'''
exchangeType: direct