1.概述
在我们平时的项目业务系统开发过程中,一个需求功能的业务逻辑经常出现主线业务和副线业务之分。比如,在当下移动端电商app进行注册账号操作,注册成功之后会发送短信、邮箱、站内信等通知,发放红包活动抵用券,推送用户注册信息给大数据系统进行数据分析以便后期个性化推荐等等。由此看出一个注册接口代码逻辑需要干这么多事情,业余逻辑高度耦合,并且串行执行耗时严重,所以我们接下来将围绕如何解决这两个问题进行叙述。串行执行耗时这个问题只需要改成异步,也就是主线逻辑注册成功之后接口就可以返回,而剩下的副线业务逻辑异步执行即可,说到异步解耦我想很多同学就想到了消息队列MQ,因为其一大核心作用就是异步解耦,不过消息队列中间件引入系统相对来说是一个比较重的操作,而我们这里采取的是今天的主角Spring event来实现业务解耦。
Spring事件(Spring Event)是Spring框架的一项功能,它允许不同组件之间通过发布-订阅机制进行解耦的通信。在Spring中,事件是表示应用程序中特定事件的对象,例如用户注册、订单创建、数据更新等。当这些事件发生时,可以通知其他组件来执行相应的操作。
具体来说,Spring事件机制包含以下几个主要的部分:
- 事件(Event): 事件是一个普通的POJO类,用于封装与应用程序状态变化相关的信息。通常情况下,事件类继承自
ApplicationEvent
抽象类,Spring中提供了一些内置的事件,也可以自定义事件。 - 事件发布者(ApplicationEventPublisher): 事件发布者是一个接口,用于发布事件。在Spring中,ApplicationContext就是一个事件发布者,可以通过ApplicationContext的
publishEvent()
方法来发布事件。 - 事件监听器(ApplicationListener): 事件监听器是一个接口,用于监听事件并在事件发生时执行相应的逻辑。在Spring中,我们可以通过实现
ApplicationListener
接口或使用@EventListener
注解来定义事件监听器。 - 事件监听器注册: 事件监听器需要注册到事件发布者(ApplicationContext)中,以便在事件发生时被正确调用。在Spring中,通常通过XML配置、注解或者编程方式将事件监听器注册到ApplicationContext中。
2.Spring Event使用示例
2.1 用户注册
下面我就基于用户注册成功之后进行短信邮箱、站内信通知,发放红包优惠券,推送用户信息给大数据系统进行示例展示
首先先定义一个用户类信息User
:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Long id;
private String userNo;
private String nickname;
private String email;
private String phone;
private Integer gender;
private Date birthday;
private Integer isDelete;
}
自定义一个注册事件
@getter
public class RegisterEvent extends ApplicationEvent {
// 携带用户信息
private User user;
public RegisterEvent(Object source, User user) {
super(source);
this.user = user;
}
}
定义事件监听器
事件监听器有两种实现方式,一种是实现ApplicationListener
接口,另一种是使用@EventListener
注解。
三个监听器如下所示:消息通知和发送红包监听器通过实现ApplicationListener接口
@Slf4j
@Component // 把监听器注册到spring容器中
public class RegisterMsgNoticeListener implements ApplicationListener<RegisterEvent> {
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("=========>>>站内信通知了");
log.info("=========>>>短信通知了");
log.info("=========>>>邮箱通知了");
}
}
@Slf4j
@Component
@Order(1)
public class RegisterSendRedPacketListener implements ApplicationListener<RegisterEvent> {
@SneakyThrows
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>发放红包了");
// 睡眠一下,模拟发送优惠券比较复杂
TimeUnit.SECONDS.sleep(2);
log.info("======>>>发放优惠券了");
}
}
使用@EventListener
实现推送用户信息监听器
@Slf4j
@Component
public class RegisterPushDataListener{
@EventListener
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>推送用户信息到大数据系统了,user={}", event.getUser());
}
}
事件发布:
@Slf4j
@Service
public class UserServiceImpl implements UserService {
@Resource
private ApplicationContext applicationContext;
@Override
public void registerUser(User user) {
log.info("=====>>>user注册成功了");
applicationContext.publishEvent(new RegisterEvent(this, user));
}
}
单元测试用例:
@SpringBootTest
@RunWith(SpringRunner.class)
public class UserServiceImplTest {
@Resource
private UserService userService;
@Test
public void testEvent() {
User user = User.builder().userNo("1111").birthday(new Date()).gender(0)
.phone("12345677890").email("shepherd@163.com").nickname("芽儿哟").build();
userService.registerUser(user);
}
}
执行结果如下:
=====>>>user注册成功了
=========>>>站内信通知了
=========>>>短信通知了
=========>>>邮箱通知了
======>>>推送用户信息到大数据系统了,user=User(id=null, userNo=1111, nickname=芽儿哟, email=shepherd@163.com, phone=12345677890, gender=0, birthday=Tue Apr 09 17:12:25 CST 2024, isDelete=null)
======>>>发放红包了
======>>>发放优惠券了
=====>>>user注册完成结束了
如果我们要控制监听器的执行顺序,使用@Order
即可,注意如果是实现了ApplicationListener
,我们把@Order
放到bean类上即可,但如果是通过注解@EventListener
实现的,就必须写到方法上,下面就是先执行发送红包优惠券监听器,再执行消息通知监听器,最后才执行推送用户数据监听器。注意异步的情况下只保证按顺序将监听器丢入进线程池,具体事件处理执行顺序是不确定的。
@Slf4j
@Component
@Order(1)
public class RegisterSendRedPacketListener implements ApplicationListener<RegisterEvent> {
@SneakyThrows
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>发放红包了");
// 睡眠一下,模拟发送优惠券比较复杂
TimeUnit.SECONDS.sleep(2);
log.info("======>>>发放优惠券了");
}
}
@Slf4j
@Component // 把监听器注册到spring容器中
@Order(2)
public class RegisterMsgNoticeListener implements ApplicationListener<RegisterEvent> {
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("=========>>>站内信通知了");
log.info("=========>>>短信通知了");
log.info("=========>>>邮箱通知了");
}
}
@Slf4j
@Component
public class RegisterPushDataListener{
@EventListener
@Order(3)
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>推送用户信息到大数据系统了,user={}", event.getUser());
}
}
你知道Spring Event
发布订阅事件处理默认是同步还是异步的?基于前面示例执行结果知道默认是同步的,很多同学因为基于消息队列MQ异步解耦的思想,自然而然以为是Spring Event
的事件处理是异步的,这是一个误区。Spring Boot并不会自动默认维护一个线程池来处理event事件,要想异步处理事件使用 @Async
标记即可,注意前提条件是:使用 @EnableAsync
开启 Spring 异步:
@SpringBootApplication
@EnableAsync
public class BaseDemoApplication {
public static void main(String[] args) {
SpringApplication.run(BaseDemoApplication.class, args);
}
}
使用@Async
的时候,一般都会自定义线程池,因为@Async
的默认线程池为 SimpleAsyncTaskExecutor
,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
@Configuration
public class InitConfig {
/**
* 初始化一个线程池,放入spring beanFactory
* @return
*/
@Bean(name = "asyncExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
分别在监听器加上@Async
注解:
@Slf4j
@Component // 把监听器注册到spring容器中
@Order(2)
@Async("asyncExecutor")
public class RegisterMsgNoticeListener implements ApplicationListener<RegisterEvent> {
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("=========>>>站内信通知了");
log.info("=========>>>短信通知了");
log.info("=========>>>邮箱通知了");
}
}
@Slf4j
@Component
@Order(1)
@Async("asyncExecutor")
public class RegisterSendRedPacketListener implements ApplicationListener<RegisterEvent> {
@SneakyThrows
@Override
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>发放红包了");
// 睡眠一下,模拟发送优惠券比较复杂
TimeUnit.SECONDS.sleep(2);
log.info("======>>>发放优惠券了");
}
}
@Slf4j
@Component
public class RegisterPushDataListener{
@EventListener
@Order(3)
@Async("asyncExecutor")
public void onApplicationEvent(RegisterEvent event) {
log.info("======>>>推送用户信息到大数据系统了,user={}", event.getUser());
}
}
通过执行结果日志打印可以看到开启多线程异步执行了,并且每次执行结果不确定,验证了上面所说的异步情况下@Order
不再能控制监听器的执行顺序了。
2.2 借助事件进行启动初始化
在日常开发中,我们经常碰到需要再项目系统服务启动时进行一些业务上逻辑处理、数据初始化等操作,比如基础数据的写入、缓存的加载、任务的开启等等。实现这个功能的方式有很多,这里我们就用Spring提供的事件ContextRefreshedEvent
来实现,当ApplicationContext被初始化或刷新之后触发该事件。
@Slf4j
@Component
public class InitListener implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("========>>>服务启动了,执行业务初始化操作了");
}
}
项目推荐:基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba企业级系统架构底层框架封装,解决业务开发时常见的非功能性需求,防止重复造轮子,方便业务快速开发和企业技术栈框架统一管理。引入组件化的思想实现高内聚低耦合并且高度可配置化,做到可插拔。严格控制包依赖和统一版本管理,做到最少化依赖。注重代码规范和注释,非常适合个人学习和企业使用
Github地址:https://github.com/plasticene/plasticene-boot-starter-parent
Gitee地址:https://gitee.com/plasticene3/plasticene-boot-starter-parent
微信公众号:Shepherd进阶笔记
交流探讨qun:Shepherd_126
3.Spring Event实现原理
Spring Event
是一种基于观察者模式(Observer Pattern)的实现。观察者模式(Observer Design Pattern)也被称为发布订阅模式。其定义是:在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。Spring Event发布订阅的流程如下图所示:
直接从入口发布applicationContext.publishEvent()
开始分析,会来到AbstractApplicationContext#publishEvent()
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
// ApplicationEventMulticaster 未初始化完成时先将applicationEvent 暂存
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 获取监听管理器ApplicationEventMulticaster并进行广播,事件处理核心入口所在
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException {
if (this.applicationEventMulticaster == null) {
throw new IllegalStateException("ApplicationEventMulticaster not initialized - " +
"call 'refresh' before multicasting events via the context: " + this);
}
return this.applicationEventMulticaster;
}
ApplicationEventMulticaster
是在Spring启动时核心方法AbstractApplicationContext#refresh()
中进行注入的:
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
//初始化beanfactor
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
// beanFactory赋值
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
//空实现,提供子类覆盖的额外处理,即子类处理自定义的beanFactorypostProcess
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
//增强beanFactory功能如自动装配等
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
//创建注册beanPostProcessor
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
//国际化处理
initMessageSource();
// Initialize event multicaster for this context.
//初始化多播器
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
// 初始化web服务器等bean
onRefresh();
// Check for listener beans and register them.
//将所有的ApplicationListener添加到事件多播器中
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
//实例化所有非懒加载的单例bean
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
//启动servelt服务器等
finishRefresh();
}
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
注册ApplicationEventMulticaster
的逻辑很简单,如果Spring容器中有了ApplicationEventMulticaster
就使用自定义的,不然就会创建默认的SimpleApplicationEventMulticaster
放入容器中。
接下来我们就进入事件处理核心所在:SimpleApplicationEventMulticaster#multicastEvent()
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 获取线程池
Executor executor = getTaskExecutor();
// 循环遍历调用监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 是否存在线程池 异步执行逻辑
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
// 非异步线程处理
invokeListener(listener, event);
}
}
}
这里就可以看出在处理事件会先获取线程池,没有的话就同步执行,这也解释上面所说的Spring Event默认是同步处理事件的。接着往下看,执行监听器处理逻辑
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
// 是否存在 ErrorHandler
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 执行监听器的onApplicationEvent()
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
从方法#invokeListener()
可以看出会先判断是否定义了ErrorHandler
,有的话在事件处理过程中出现异常会进行异常捕获并做相应处理,如果没有就是直接报错毫无处理。结论就是:**最终事件的执行是由同一个线程按顺序来完成的,任何一个报错,都会导致后续的监听器执行不了。**这里我就不做演示了,把上面案例改为同步,然后某个监听器的处理逻辑报错就可以验证了,我们可以通过自定义一个事件广播器来解决,从上面Spring启动初始化可以看出只是new了一个SimpleApplicationEventMulticaster
对象放入容器中,并没有为其线程池Exector
属性进行赋值,这也是为啥默认是单线程同步处理事件的原因所在,所以我们可以自定义一个事件广播器设置好线程池,这样事件处理默认就是异步的了,不需要再在监听器是使用@Async
。与此同时我们也可以自定义一个事件异常处理器来对处理事件过程中发生异常进行相应处理,保证不同监听器的事件处理互不干扰,逻辑如下所示
@Slf4j
@Configuration
public class InitConfig {
/**
* 自定义事件广播器,异步处理事件,这样监听器就不需要使用@Async注解了
* @param executor
* @return
*/
@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(
@Qualifier("asyncExecutor") Executor executor,
ErrorHandler errorHandler) {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(executor);
simpleApplicationEventMulticaster.setErrorHandler(errorHandler);
return simpleApplicationEventMulticaster;
}
/**
* 注入一个事件异常处理器
* @return
*/
@Bean
public ErrorHandler errorHandler() {
return (t) -> {
log.error("listener handle error: ", t);
};
}
/**
* 初始化一个线程池,放入spring beanFactory
* @return
*/
@Bean(name = "asyncExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
整体类图如下:
4.Spring Event与消息队列MQ的区别
Spring Event和消息队列(MQ)是两种不同的消息传递机制,它们在实现消息通信方面有各自的优缺点。
Spring Event的优缺点: 优点:
简单易用: Spring Event是Spring框架提供的一个内置的事件发布-订阅机制,使用起来非常简单,无需引入额外的依赖。
无中间件依赖: Spring Event不依赖于任何消息中间件,适用于小型项目或者简单的消息通信场景。
模块解耦: Spring Event可以帮助实现模块之间的解耦,提高系统的灵活性和可维护性。
缺点:
单点问题: Spring Event是在单个应用内部的事件通知机制,如果应用崩溃或者重启,事件将会丢失。
不支持分布式: Spring Event只能在单个应用内部传递消息,不支持分布式环境下的消息传递。
性能问题: Spring Event在大规模消息通信场景下可能会存在性能问题,因为它是同步执行的,消息发布者需要等待所有订阅者处理完消息后才能继续执行。
消息队列(MQ)的优缺点: 优点:
异步处理: 消息队列支持异步消息处理,提高系统的并发能力和响应速度。
可靠性: 消息队列通常具有消息持久化、消息重试等特性,能够保证消息传递的可靠性。
分布式支持: 消息队列支持分布式环境下的消息传递,可以实现跨服务、跨应用的消息通信。
缺点:
复杂性: 使用消息队列需要引入额外的消息中间件,并且需要配置和管理这些中间件,增加了系统的复杂性。
维护成本: 消息队列需要维护消息中间件的稳定性和可用性,需要投入一定的维护成本。
一致性问题: 消息队列在消息传递过程中可能会出现一致性问题,需要额外的设计和处理。
综上所述,Spring Event适用于简单的应用内部消息通信场景,操作简单但有一定的局限性;消息队列适用于分布式、高并发的消息通信场景,可以提供更高的可靠性和灵活性,但需要考虑复杂性和维护成本。在选择使用哪种方式时,需要根据具体的业务需求和系统架构来进行权衡和选择。
5.总结
综上所述,Spring Event在业务系统中的实际使用案例包括订单支付成功事件、用户注册事件等,可以带来模块解耦、异步处理、增强扩展性等优点。然而,对于复杂的业务场景、事件失效风险以及调试困难等缺点需要进行注意和权衡。在使用Spring Event时,需要根据具体业务需求和系统特点进行合理的选择和使用。在使用需注意一下几点:
-
监听器默认同步执行,不要误认为和消息队列MQ一样异步消费消息的,Spring Event是应用内部发布-订阅机制,如果事件处理逻辑过于复杂同步阻塞可能对当前主流程带来影响,建议使用异步的方式。
-
不要依赖监听器执行顺序:首先我认为监听器之间有依赖关系就说明设计是有问题的,这不就是耦合依赖吗?和我们使用Spring Event的初衷有点背道而驰,如果两个监听器事件处理有前后依赖顺序,就应该想办法合并成一个。虽然我们可以使用
@Order
来控制监听器之间的执行顺序,但是仅在同步执行的场景下有效,监听器异步执行的情况下实际执行顺序仍然是不可控的。 -
监听器的事件处理并不绝对可靠
- 多个监听器事件处理的执行是由同一个线程按顺序来完成的,任何一个报错,都会导致后续的监听器执行不了
- 程序关闭时可能发生监听事件未处理完成。
-
事务事件:Spring Event同步执行的时候,是和主业务方法事务一起的,可能会出现下面这种异常情况,用户注册成功后发布消息通知事件,但在后续的事务处理中处理异常导致事务回滚,会出现用户收到注册成功短信但实际没有注册成功。所以我们一般认为Spring 事件是子任务,和主业务事务不需要强一致性。