从浅入深 学习 SpringCloud 微服务架构(十六)
一、SpringCloudStream:自定义消息通道
1、在子工程 stream_product (子模块)中,创建 自定义的消息通道类 MyProcessor.java
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\channel\MyProcessor.java
*
* 2024-5-11 创建 自定义的消息通道类 MyProcessor.java
*/
package djh.it.stream.channel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
//消息生产者的配置
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
//消息消费者的配置
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
2、在子工程 stream_product (子模块)中,修改 消息发送的工具类 MessageSender.java 使用自定义消息通道。
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\producer\MessageSender.java
*
* 2024-5-10 抽取一个消息发送的工具类 MessageSender.java
*/
package djh.it.stream.producer;
import djh.it.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
//@EnableBinding(Source.class)
@EnableBinding(MyProcessor.class)
public class MessageSender {
// @Autowired
// private MessageChannel output;
//
// //发送消息
// public void send(Object obj){
// output.send(MessageBuilder.withPayload((obj)).build());
// }
@Autowired
@Qualifier(value = "myoutput")
private MessageChannel myoutput;
//发送消息
public void send(Object obj){
myoutput.send(MessageBuilder.withPayload((obj)).build());
}
}
3、在子工程 stream_product (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。
## spring_cloud_demo\stream_product\src\main\resources\application.yml
server:
port: 7001 #服务端口
spring:
application:
nmae: stream_product #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output: #管道交互
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myoutput: # 自定义消息通道
destination: djh-custom-output
binders: #配置绑定器
defaultRabbit:
type: rabbit
4、在子工程 stream_consumer (子模块)中,创建 自定义的消息通道类 MyProcessor.java
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\channel\MyProcessor.java
*
* 2024-5-11 创建 自定义的消息通道类 MyProcessor.java
*/
package djh.it.stream.channel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
//消息生产者的配置
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
//消息消费者的配置
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
5、在子工程 stream_consumer (子模块)中,修改 获取消息工具类 MessageListener.java 使用自定义消息通道。
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\consumer\MessageListener.java
*
* 2024-5-10 创建一个获取消息工具类 MessageListener.java
*/
package djh.it.stream.consumer;
import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {
// //监听 binding 中的消息
// @StreamListener(Sink.INPUT)
// public void input(String message) {
// System.out.println("获取到的消息: " + message);
// }
//监听 binding 中的消息
@StreamListener(MyProcessor.MYINPUT)
public void input(String message) {
System.out.println("获取到的消息: " + message);
}
}
6、在子工程 stream_consumer (子模块)中,修改 application.yml 配置文件, 添加自定义消息配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.yml
server:
port: 7002 #服务端口
spring:
application:
nmae: stream_consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
binders: #配置绑定器
defaultRabbit:
type: rabbit
7、在子工程 stream_product (子模块)中,运行 启动类 ProducerApplication.java 进行测试
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ProducerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)发送消息的话,定义一个通道接口,通过接口中内置的 messagechannel,(sprngcloudtream 中内置接口 Source)
* 4)@EnableBinding 注解 :绑定对应通道。
* 5)发送消息的话,通过 MessageChannel 发送消息,如果需要 MessageChannel --> 通过绑定内置接口获取。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
}
8、在子工程 stream_consumer (子模块)中,运行 启动类 ConsumerApplication.java 进行测试。
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ConsumerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)定义一个通道接口,通过内置获取消息的接口:Sink
* 4)绑定对应通道。
* 5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
9、在子工程 stream_product (子模块)中,运行 一个测试类 ProducterTest.java 进行测试。
/**
* spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java
*
* 2024-5-10 创建一个测试类 ProducterTest.java
*/
package djh.it.stream;
import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
messageSender.send("hello 测试 工具类");
}
}
10、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类,在 idea Run Dashboard 控制面板,
同样会输出 “获取到的消息: hello 测试 工具类”
二、SpringCloudStream:消息分组
1、SpringCloudStream:消息分组
-
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
-
实现的方式非常简单,我们只需要在服务消费者端设置 spring.c1oud.stream.bindings.input.group 属性即可。
2、在子工程 stream_consumer (子模块),复制一个更名为:在子工程 stream_consumer_2 (子模块),并把 application.yml 配置文件中的端口号改为:7003
1)子工程 stream_consumer_2 (子模块)中的 pom.xml 文件。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_cloud_demo</artifactId>
<groupId>djh.it</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stream_consumer_2</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</project>
<!-- spring_cloud_demo\stream_consumer_2\pom.xml -->
2)子工程 stream_consumer_2 (子模块)中的 application.yml 文件。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.yml
server:
port: 7003 #服务端口
spring:
application:
nmae: stream_consumer_2 #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
binders: #配置绑定器
defaultRabbit:
type: rabbit
3)子工程 stream_consumer_2 (子模块)中的 自定义的消息通道类 MyProcessor.java
/**
* spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\channel\MyProcessor.java
*
* 2024-5-11 创建 自定义的消息通道类 MyProcessor.java
*/
package djh.it.stream.channel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
//消息生产者的配置
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
//消息消费者的配置
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
4)子工程 stream_consumer_2 (子模块)中的 获取消息工具类 MessageListener.java
/**
* spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\consumer\MessageListener.java
*
* 2024-5-11 创建一个获取消息工具类 MessageListener.java
*/
package djh.it.stream.consumer;
import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {
// //监听 binding 中的消息
// @StreamListener(Sink.INPUT)
// public void input(String message) {
// System.out.println("获取到的消息: " + message);
// }
//监听 binding 中的消息
@StreamListener(MyProcessor.MYINPUT)
public void input(String message) {
System.out.println("获取到的消息: " + message);
}
}
5)子工程 stream_consumer_2 (子模块)中的 启动类 ConsumerApplication_2.java
/**
* spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\ConsumerApplication_2.java
*
* 2024-5-11 SpringCloudStream 入门案例:启动类 ConsumerApplication_2.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)定义一个通道接口,通过内置获取消息的接口:Sink
* 4)绑定对应通道。
* 5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication_2 {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication_2.class);
}
}
3、启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,
在 idea Run Dashboard 控制面板,两个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”
4、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,添加 消息分组配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.yml
server:
port: 7002 #服务端口
spring:
application:
nmae: stream_consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
group: group1 #消息分组(同一组只能有一个消息者获取消息)
binders: #配置绑定器
defaultRabbit:
type: rabbit
5、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分组配置。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.yml
server:
port: 7003 #服务端口
spring:
application:
nmae: stream_consumer_2 #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
group: group1 #消息分组(同一组只能有一个消息者获取消息)
binders: #配置绑定器
defaultRabbit:
type: rabbit
6、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,
在 idea Run Dashboard 控制面板,发现只有一个消费都启动类都会输出 “获取到的消息: hello 测试 工具类”
三、SpringCloudStream:消息分区
1、消息分区
有一些场景需要满足,同一个特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同-个实例统计计算分析,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.
2、在子工程 stream_producer (子模块)的 application.yml 配置文件中,添加 消息分区配置。
## spring_cloud_demo\stream_product\src\main\resources\application.yml
server:
port: 7001 #服务端口
spring:
application:
nmae: stream_product #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output: #管道交互
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myoutput: # 自定义消息通道
destination: djh-custom-output
producer: # 配置分区
partition-key-expression: payload # 分区关键字,对象中的 id 或 对象。
partition-count: 2 # 分区大小
binders: #配置绑定器
defaultRabbit:
type: rabbit
3、在子工程 stream_consumer (子模块)的 application.yml 配置文件中,也添加 消息分区配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.yml
server:
port: 7002 #服务端口
spring:
application:
nmae: stream_consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
instanceCount: 2 # 消费者总数。
instanceIndex: 0 # 当前消费者的索引,从 0 开始。
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
group: group1 #消息分组(同一组只能有一个消息者获取消息)
consumer:
partitioned: true # 开启分区支持
binders: #配置绑定器
defaultRabbit:
type: rabbit
3、在子工程 stream_consumer_2 (子模块)的 application.yml 配置文件中,也添加 消息分区配置。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.yml
server:
port: 7003 #服务端口
spring:
application:
nmae: stream_consumer_2 #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
instanceCount: 2 # 消费者总数。
instanceIndex: 1 # 当前消费者的索引,从 0 开始。
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
myinput: #自定义消息通道
destination: djh-custom-output
group: group2 #消息分组(同一组只能有一个消息者获取消息)
consumer:
partitioned: true # 开启分区支持
binders: #配置绑定器
defaultRabbit:
type: rabbit
4、修改 子工程 stream_producer (子模块)的 测试类 ProducterTest 进行测试。
/**
* spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java
*
* 2024-5-10 创建一个测试类 ProducterTest.java
*/
package djh.it.stream;
import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
// messageSender.send("hello 测试 工具类");
for(int i=0;i<5;i++){
messageSender.send("0");
}
}
}
5、重新启动 rabbitmqctl-server.bat 服务,并运行 测试类 ProducterTest 和 ConsumerApplication 启动类 和 ConsumerApplication_2 启动类,
在 idea Run Dashboard 控制面板,发现只有 ConsumerApplication 一个消费者启动类都会输出 “获取到的消息: 0”
上一节关联链接请点击:
# 从浅入深 学习 SpringCloud 微服务架构(十五)