概念
关于发布订阅这个词,其实不仅仅出现在Spring框架当中,其实在Redis中也有存在(其对应的是convertAndSend()方法),还有在MQ消息队列里也是有的,但这里就主要介绍的是关于Spring框架的ApplicationEventPublisher如何做到消息的发布与订阅。随着现在的业务量和需求量越来越大,其实基本都是分布式微服务集群的使用了,所以基本都是用到Redis与MQ。但是对于单体Spring Boot应用时,用Spring自带的发布订阅就已经绰绰有余了。
需要知道的是发布订阅需要有三个对象:事件、事件发布源、事件接收源(监听器)。
事件
对于事件来说,需要去继承ApplicationEvent。至于为什么需要继承ApplicationEvent就需要研究源码,等我后续看完再回来更新笔记。
public class LogEvent extends ApplicationEvent {
public LogEvent(Object message) {
super(message);
}
}
事件发布源
这个基本上都是业务层的代码,但是作为学习阶段,我将其直接放在控制层。关于如何去发布,就需要用到Spring中的ApplicationEventPublisher。
@Slf4j
@RestController
@RequestMapping("/log")
public class LogController {
@Autowired
private ApplicationEventPublisher publisher;
@GetMapping("/publisher")
public void log() {
log.info("进入到log方法,开始发送事件");
publisher.publishEvent(new LogEvent("log方法生成事件信息"));
log.info("log方法发送事件完毕");
}
}
监听器
/**
* 管理员日志监听器
*/
@Slf4j
@Component
public class AdminListener {
@EventListener(LogEvent.class)
public void adminListen(LogEvent logEvent) {
try {
log.info("管理员监听到的日志信息为,{}", logEvent);
Thread.sleep(5000); // 睡眠5s
log.info("管理员监听完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 用户日志事件监听器
*/
@Slf4j
@Component
public class UserListener {
@EventListener(LogEvent.class)
public void userListen(LogEvent logEvent) {
try {
log.info("用户监听到的日志信息为,{}", logEvent);
Thread.sleep(10000); // 睡眠10s
log.info("用户监听完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果
从执行结果可以发现管理员线程睡眠了5s后执行完毕才会执行用户的代码块。所以表明是同步执行的。
于是为了将同步执行变为异步执行,又在两个监听器的方法上添加了@Async注解,并且一定要在启动类上添加@EnableAsync注解。再次执行会发现两次的执行结果并不一致。两个监听器是异步执行的。甚至在监听器上放置@Order注解可以实现先后顺序,但这里就不演示了。
源码追踪
这里只进行粗略的源码查看,因为作者功力有限无法解释得完全仔细,以后看懂了再回来更新笔记。可以发现其已经使用了JDK8新特性了。从ApplicationEventPublisher的publisher(Application event)方法就会进入到其子类AbstartApplicationContext中。
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
Object applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent)event;
} else {
applicationEvent = new PayloadApplicationEvent(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
// 在这步完成真正的发布订阅 只有监听完才会向下继续执行
this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}