手把手教你如何开发一个属于自己的延时队列的功能组件
- 前提介绍
- 解决痛点
- 延时队列组件的架构
- 延时队列组件的初始化流程
- 延时队列组件的整体核心类架构
- 延时队列组件的整体核心类功能
- 延时队列的开发组件
- 延迟队列的机制配置初始化类
- 源码 - DelayedQueueConfiguration
- Redission客户端的实现
- 源码 - DelayedRedissionClientTool
- 核心方法源码分析
- Offer方法存储元素 - 添加阻塞队列-元素
- poll方法获取元素 - 从阻塞队列中拉取数据
- 定义和初始化执行线程池
- 定义和初始化轮询线程池
- 源码 - DelayedThreadPoolExecutor
- 延迟队列机制支持Redis客户端
- 源码 - DelayedRedisClientSupport
- RedissonClientTool的工具的封装操作
- 源码 - RedissonClientTool
- 辅助类定义处理
- DelayedThreadPoolSupport
- 线程池的构建和初始化
- 延时队列启动DelayedBootstrapInitializer
- 源码 - DelayedBootstrapInitializer
- 注入监听器+异常处理器
- init初始化操作机制控制
- 获取延时队列数据信息的模型
- 执行线程组机制
- 定义延时队列的消费接口
- 定义延时队列的消费接口扩展接口
- DelayedBootstrapRunnable
- 延时队列的使用案例
- 延时队列投递数据方
- 延时队列消费数据方:
- 延时轮询线程异常处理器:
- 问题反馈
前提介绍
针对于目前,系统中的延时队列的开发复杂度以及统一化管理没有完成相关的标准,故此本人封装了一款,基于Redssion的框架为基础的也是基于我们现在framework为基础的延时队列框架开发机制组件,方便未来大家去开发属于自己的延时队列的开发规范以及开发成本!
解决痛点
-
基于原始的redis失效的EntryExpiredListener的定时监听器,因为考虑周期性和性能和延迟问题过大,所以有了本次版本组件封装的优化
-
简化开发,系统多出使用原生的redission客户端,因为这无形中给开发人员带来了很大的工作量,考虑未来的开发过程中会存在很多延时队列的场景
-
无标准,使用的延时开发实现原理的种类非常的多,有内存机制的延时队列、消息队列的延时实现、redis的延时队列,为了达成标准化。
-
统一化管理,防止问题重复出现或者多点问题出现机制。
延时队列组件的架构
-
延时队列采用redis 大key或者业务组、业务类型进行划分出不同的分割领域,每个组都是属于相互隔离。
-
自己消费自己的数据信息以及异常处理和轮询和执行机制
延时队列组件的初始化流程
-
主要针对于轮询线程、执行线程的初始化
-
主要针对于注册监听器、异常处理器机制
延时队列组件的整体核心类架构
延时队列组件的整体核心类功能
延时队列的开发组件
延迟队列的机制配置初始化类
DelayedQueueConfiguration:主要集中于延时队列的配置参数类,主要用于定义针对于初始化一些基础核心的基础类服务组件的集合。
源码 - DelayedQueueConfiguration
@Configuration
@ComponentScan(basePackages = "com.hyts.assemble.redisdelayer")
public class DelayedQueueConfiguration {
/**
* redission客户端的实现
* @return
*/
@Bean
public DelayedRedissionClientTool delayedRedissionClientTool(){
return new DelayedRedissionClientTool();
}
/**
* 执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
* @return
*/
@Bean("delayedExecuteThreadPoolExecutor")
public Executor delayedExecuteThreadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor =
DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
// 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* 执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
* @return
*/
@Bean("delayedExecuteThreadPoolCycle")
public Executor delayedExecuteThreadPoolCycle() {
ThreadPoolTaskExecutor threadPoolTaskExecutor =
DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
threadPoolTaskExecutor.setQueueCapacity(0);
// 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* 延迟线程池支持机制
* @return
*/
@Bean
public DelayedThreadPoolSupport delayedThreadPoolSupport(@Autowired @Qualifier("delayedExecuteThreadPoolExecutor") Executor execute,
@Autowired @Qualifier("delayedExecuteThreadPoolCycle") Executor recycle){
return new DelayedThreadPoolSupport(execute,recycle);
}
/**
* 延迟队列机制支持Redis客户端
* @return
*/
@Bean
public DelayedRedisClientSupport delayedRedisClientSupport(){
return new DelayedRedisClientSupport(delayedRedissionClientTool());
}
/**
* 线程池的构建和初始化
* @return
*/
@Bean(initMethod = "init")
public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
return new DelayedBootstrapInitializer();
}
@Bean
@ConditionalOnMissingBean(RedissonClientTool.class)
public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
return new RedissonClientTool(redissonClient);
}
}
主要包含了一下几个部件:
- DelayedRedissionClientTool:Redission客户端的实现
- delayedExecuteThreadPoolExecutor:执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
- delayedExecuteThreadPoolCycle:执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
- DelayedThreadPoolSupport:延迟线程池支持机制
- DelayedRedisClientSupport:延迟线程池支持机制
Redission客户端的实现
DelayedRedissionClientTool主要属于Redisson延时队列客户端实现类,主要包含了相关的对应的处理维护延时队列的元素数据信息操作类。
源码 - DelayedRedissionClientTool
@AutoConfigureAfter(value = RedissonClientTool.class)
@Slf4j
public class DelayedRedissionClientTool {
/**
* redissionCLientTool工具机制
*/
@Autowired
RedissonClientTool redissonClientTool;
/**
* 自动注册
*/
public DelayedRedissionClientTool() {
}
/**
* 手动注册
* @param redissonClientTool
*/
public DelayedRedissionClientTool(RedissonClientTool redissonClientTool) {
this.redissonClientTool = redissonClientTool;
}
/**
* 添加阻塞队列-元素
* @param <T>
*/
public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
//预先进行构建初始化参数条件机制
executeInvokerEvent.preCondition(executeInvokerEvent);
redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
}
/**
* 获取相关的
* @param executeInvokerEvent
* @param <T>
* @return
*/
public <T> RBlockingQueue<T> takeBlockingQueue(ExecuteInvokerEvent<T> executeInvokerEvent) {
return redissonClientTool.getRedissonClient().getBlockingQueue(executeInvokerEvent.getBizGroup());
}
/**
* 操作梳理
* @param trBlockingQueue
* @param <T>
* @return
* @throws InterruptedException
*/
public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
}
}
核心方法源码分析
Offer方法存储元素 - 添加阻塞队列-元素
public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
//预先进行构建初始化参数条件机制
executeInvokerEvent.preCondition(executeInvokerEvent);
redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
}
poll方法获取元素 - 从阻塞队列中拉取数据
public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
}
定义和初始化执行线程池
执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制,执行线程池(公共默认)
@Bean("delayedExecuteThreadPoolExecutor")
public Executor delayedExecuteThreadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor =
DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
// 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
定义和初始化轮询线程池
主要负责轮询获取对应的redis服务队列中的数据的线程所在的线程池。
@Bean("delayedExecuteThreadPoolCycle")
public Executor delayedExecuteThreadPoolCycle() {
ThreadPoolTaskExecutor threadPoolTaskExecutor =
DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
threadPoolTaskExecutor.setQueueCapacity(0);
// 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
其中内部可以看到调用了DelayedThreadPoolExecutor.initParameter的方法进行控制和初始化对应的线程池,接下来,我们来看看该线程池方法以及他 的作用是什么?
源码 - DelayedThreadPoolExecutor
public class DelayedThreadPoolExecutor {
/**
* 获取到服务器的cpu内核:逻辑内核核心数
*/
private static int DEFAULT_THREAD_CORE_BASE_SIZE = Runtime.getRuntime().availableProcessors();
/**
* IO密集型机制控制*2
*/
private static int DEFAULT_THREAD_CORE_SIZE_IO_TYPE = DEFAULT_THREAD_CORE_BASE_SIZE<<1;
/**
* 序号分配器
*/
private static AtomicInteger atomicInteger = new AtomicInteger();
/**
* 初始化参数信息
* @return
*/
public static ThreadPoolTaskExecutor initParameter(String threadGroup){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE);//核心池大小
executor.setMaxPoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE<<4);//最大线程数 = 核心*核心池大小;
executor.setQueueCapacity(1000);//队列程度
executor.setKeepAliveSeconds(30);//线程空闲时间
executor.setThreadGroupName(threadGroup);
executor.setThreadFactory(r -> new Thread(r,String.format("%s-%s",threadGroup,atomicInteger.getAndDecrement())));
executor.setThreadNamePrefix(threadGroup+"-");//线程前缀名称
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//配置拒绝策略
return executor;
}
}
上面可以看出来它主要属于一个公共方法进行控制我们的通用线程池的参数类。
主要是讲对应的Spring的ThreadPoolTaskExecutor的对象实现类进行模板化的一个功能。降低使用着的开发量。
延迟队列机制支持Redis客户端
延迟队列机制支持Redis客户端支持类,主要目的是为了作为一个讲对应的Redis客户端类的静态引用操作。
@Bean
public DelayedRedisClientSupport delayedRedisClientSupport(){
return new DelayedRedisClientSupport(delayedRedissionClientTool());
}
源码 - DelayedRedisClientSupport
public class DelayedRedisClientSupport {
@Getter
private static DelayedRedissionClientTool delayedRedissionClientTool;
/**
* 延迟队列控制redis服务机制
* @param delayedRedissionClientTool
*/
public DelayedRedisClientSupport(DelayedRedissionClientTool delayedRedissionClientTool) {
DelayedRedisClientSupport.delayedRedissionClientTool = delayedRedissionClientTool;
}
}
RedissonClientTool的工具的封装操作
@Bean
@ConditionalOnMissingBean(RedissonClientTool.class)
public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
return new RedissonClientTool(redissonClient);
}
源码 - RedissonClientTool
@Slf4j
public class RedissonClientTool {
private RedissonClient redissonClient;
public RedissonClientTool(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public <T> void addDelayQueueElement(String key, T t, long delay, TimeUnit timeUnit) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
public <T> T takeDelayQueueElement(String key) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
T t = null;
try {
t = blockingFairQueue.take();
} catch (InterruptedException e) {
log.error("takeDelayQueueElement error key: " + key, e);
}
return t;
}
/**
* 阻塞队列添加元素
* @param key
* @param t
* @param <T>
*/
public <T> void addBlockingQueueElement(String key, T t) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
blockingFairQueue.offer(t);
}
/**
*
* 取出队列的元素且删除
* @param key
* @param t
* @param <T>
* @return
*/
public <T> T pollBlockQueueElement(String key, T t) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
return blockingFairQueue.poll();
}
/**
*
* 取出队列的元素但是不删除
* @param key
* @param t
* @param <T>
* @return
*/
public <T> T peekBlockQueueElement(String key, T t) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
return blockingFairQueue.peek();
}
/**
* 队列添加元素
* @param key
* @param t
* @param <T>
*/
public <T> void addQueueElement(String key, T t) {
RQueue<T> queue = redissonClient.getQueue(key);
queue.offer(t);
}
/**
*
* 取出队列的元素且删除
* @param key
* @param t
* @param <T>
* @return
*/
public <T> T pollQueueElement(String key, T t) {
RQueue<T> queue = redissonClient.getQueue(key);
return queue.poll();
}
public RedissonClient getRedissonClient () {
return this.redissonClient;
}
}
辅助类定义处理
DelayedThreadPoolSupport
主要通过的对应的延时队列线程池的工具支持类,主要包含了对应的这两种线程池的引用操作处理模型,如下图所示。
public class DelayedThreadPoolSupport {
/**
* 任务执行线程机制
*/
@Getter
private static Executor taskExecuteThread;
/**
* 任务轮询线程机制
*/
@Getter
private static Executor taskRecycleThread;
/**
* 操作处理机制
* @param taskExecuteThread
* @param taskRecycleThread
*/
public DelayedThreadPoolSupport(Executor taskExecuteThread, Executor taskRecycleThread) {
DelayedThreadPoolSupport.taskExecuteThread = taskExecuteThread;
DelayedThreadPoolSupport.taskRecycleThread = taskRecycleThread;
}
}
线程池的构建和初始化
主要通过线程池进行构建对应的启动初始化对象实现类,用于绑定和初始化所有需要进行延时队列监听的线程。
@Bean(initMethod = "init")
public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
return new DelayedBootstrapInitializer();
}
延时队列启动DelayedBootstrapInitializer
- 开始初始化加载完成系统内部所有的相关的延迟队列监听上下文接口服务数据
- 开始开展完成线程任务分配为每个分组的监听器以及任务队列分配资源
- 开始生产相关的监听绑定关系机制
- 开始初始化相关的异常信息处理机制
- 初始化线程机制
源码 - DelayedBootstrapInitializer
@Slf4j
public class DelayedBootstrapInitializer {
@Setter
@Getter
@DelayedQueueListener(value="delayedListenerContextMap")
Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();
@Setter
@Getter
@DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();
/**
* 初始化操作机制控制
*/
public void init(){
log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
"系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
if(MapUtils.isEmpty(delayedListenerContextMap)){
log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
"请检查是否有关于实现的接口" +
"EventExecutableInvokerListener,以及相关@DelayedQueueListener");
return ;
}
log.info("启动与生产相关的侦听绑定机制");
Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));
log.info("开始初始化相关的异常信息处理机制");
Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
log.info("启动资源分配机制");
//推荐同一个组里面采用一个线程池进行处理机制
getAnnotationMetadataGroup.entrySet().forEach(param->{
log.info("初始化线程机制 {}:",param.getValue());
executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
});
}else{
log.warn("资源转换失败!无法执行资源执行机制");
}
}
/**
* 支持动态添加延时队列控制
*/
// public void addExecuteDelayeQueue(ExecuteDelayedQueue executeDelayedQueue){
// Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
// executor.execute(new DelayedBootstrapRunnable(executeDelayedQueue.getQueueName(),param.getValue(),
// DelayedBootstrapInitializer.getExecutorByGroup(executeDelayedQueue.getValue()),
// new ExecutableExceptionHandler(Lists.newArrayList(new DefaultSampleDelayedExceptionHandler()))));
// }
/**
* @param eventExecutableInvokerListener
* @return
*/
public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
}
public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
}
/**
* 获取相关的组信息
* @param object
* @return
*/
public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
if(annotationInstance instanceof DelayedQueueListener) {
DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
if(Objects.isNull(annotationInstance)){
return Strings.EMPTY;
}else{
return delayedQueueListener.group();
}
}
else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
if(Objects.isNull(annotationInstance)){
return Strings.EMPTY;
}else{
return delayedExceptionHandler.group();
}
}
return Strings.EMPTY;
}
/**
* 执行线程组机制
* @return
*/
public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
filter(Objects::nonNull).findAny().orElse(null);
}
}
注入监听器+异常处理器
由于Spring框架可以帮我自动进行获取对象模型注入的数据集合,此部分我们采用的是Map
@Setter
@Getter
@DelayedQueueListener(value="delayedListenerContextMap")
Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();
@Setter
@Getter
@DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();
init初始化操作机制控制
主要是进行初始化操作init方法,之后进行获取对应的监听器以及对象,并将这些对象直接进行注入到对应的轮询线程和执行任务的线程中,方便我们整体的延时队列进行运行处理操作。
public void init(){
log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
"系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
if(MapUtils.isEmpty(delayedListenerContextMap)){
log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
"请检查是否有关于实现的接口" +
"EventExecutableInvokerListener,以及相关@DelayedQueueListener");
return ;
}
log.info("启动与生产相关的侦听绑定机制");
Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));
log.info("开始初始化相关的异常信息处理机制");
Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
log.info("启动资源分配机制");
//推荐同一个组里面采用一个线程池进行处理机制
getAnnotationMetadataGroup.entrySet().forEach(param->{
log.info("初始化线程机制 {}:",param.getValue());
executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
});
}else{
log.warn("资源转换失败!无法执行资源执行机制");
}
}
获取延时队列数据信息的模型
主要通过延时队列处理类上面的注解的元数据信息,获取注解的分组Group操作属性。
/**
* @param eventExecutableInvokerListener
* @return
*/
public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
}
public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
}
/**
* 获取相关的组信息
* @param object
* @return
*/
public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
if(annotationInstance instanceof DelayedQueueListener) {
DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
if(Objects.isNull(annotationInstance)){
return Strings.EMPTY;
}else{
return delayedQueueListener.group();
}
}
else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
if(Objects.isNull(annotationInstance)){
return Strings.EMPTY;
}else{
return delayedExceptionHandler.group();
}
}
return Strings.EMPTY;
}
执行线程组机制
public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
filter(Objects::nonNull).findAny().orElse(null);
}
定义延时队列的消费接口
调用延时队列的执行抽象接口处理模型
@FunctionalInterface
public interface ExecutableInvokerListener<P,R> {
/**
* 执行方法
* @param param 返回值为以后callable使用
* @return
*/
R handle(P param);
}
定义延时队列的消费接口扩展接口
public interface EventExecutableInvokerListener<P,R> extends ExecutableInvokerListener<ExecuteInvokerEvent <P>,R> {
/**
* 延时偏移量
*/
long DEFAULT_DELAYED_OSFFET = 10;
/**
* 延时超时时间时间戳
*/
TimeUnit DEFAULT_DELAYED_TIMEUNIT = TimeUnit.SECONDS;
/**
* 是否可以执行异步操作(暂不支持)
*/
boolean DEFAULT_IS_ASYNC_FLAG = Boolean.TRUE;
/**
* 暂时不支持重试机制,会造成数据重复执行机制,主要面向与执行失败后的重试机制(暂不支持)
*/
int DEFAULT_RETRY_NUM = 0;
/**
* 存放在同一个线程执行
*/
String DEFAULT_BIZ_GROUP = "DEFAULT_GROUP";
/**
* 如果没有定义直接采用默认线程池进行执行
*/
Executor getExecutor();
}
DelayedBootstrapRunnable
主要用于处理对应的DelayedBootstrapRunnable的控制对象模型机制,用于轮询查询获取redis队列种的数据信息,之后回调给业务端的Listener监听器操作。
@RequiredArgsConstructor
@Slf4j
public class DelayedBootstrapRunnable implements Runnable{
/**
* 直接传递相关的执行客户端访问器
*/
public DelayedRedissionClientTool delayedRedissionClientTool = DelayedRedisClientSupport.getDelayedRedissionClientTool();
/**
* 绑定的线程组,只会执行相关的线程组之间的关系机制
*/
public final String bizGroup;
/**
* 注入参数进入
*/
public final List<EventExecutableInvokerListener> eventExecutableInvokerListeners;
/**
* 执行线程池
*/
public final Executor executorThreadPool;
/**
* 异常信息控制
*/
public final ExecutableExceptionHandler exceptionHandlers;
/**
* 启动服务处理机制
*/
@Override
public void run() {
try {
RBlockingQueue<ExecuteInvokerEvent> blockingQueue = delayedRedissionClientTool.takeBlockingQueue(new ExecuteInvokerEvent(bizGroup));
Executor executor = Objects.isNull(executorThreadPool) ? DelayedThreadPoolSupport.getTaskExecuteThread() : executorThreadPool;
Thread.currentThread().setUncaughtExceptionHandler(exceptionHandlers);
for(;;) {
try{
ExecuteInvokerEvent data = delayedRedissionClientTool.poll(blockingQueue);
log.info("侦听队列任务组:{},获得值:{}", bizGroup, data);
log.info(MessageFormat.format("【1】Execute parse complete call: the execution time should be:{0,date,yyyy-MM-dd HH:mm:ss}," +
"Actual execution time:{1,date,yyyy-MM-dd HH:mm:ss},createTime:{2,date,yyyy-MM-dd HH:mm:ss}",
data.getFiredTime(), new Date(),new Date(data.getCreateTime())));
executor.execute(() -> {
for(EventExecutableInvokerListener eventExecutableInvokerListener : eventExecutableInvokerListeners){
eventExecutableInvokerListener.handle(data);
}
});
}catch (Exception e){
log.error("无法执行处理",e);
}
}
} catch (Exception e) {
log.error("无法执行处理",e);
// throw new RuntimeException(e);
}
}
}
延时队列的使用案例
延时队列投递数据方
@Autowired(required = false)
public DelayedRedissionClientTool delayedRedissionClientTool;
public void testProducerElement(){
AtomicInteger atomicInteger = new AtomicInteger();
IntStream.range(0,200).forEach(param->{
log.info("开始投递数据信息");
// 业务编号必须传入,为了去重;此外分组必须穿,如同mq的topic
ExecuteInvokerEvent executeInvokerEvent = new ExecuteInvokerEvent(String.valueOf(atomicInteger.incrementAndGet()),"TEST_GROUP");
executeInvokerEvent2.setDelayedTime(10L); // 延时时长度
executeInvokerEvent2.setDataModel("asdasda"); //传输数据模型。泛型类型
executeInvokerEvent2.setTimeUnit(TimeUnit.SECONDS); // 延时时间单位
delayedRedissionClientTool.offer(executeInvokerEvent); //数据存储
});
}
延时队列消费数据方:
@Slf4j
@DelayedQueueListener(value="delayedQueueTest",group="TEST_GROUP")
public class DelayedQueueTest implements EventExecutableInvokerListener<ExecuteInvokerEvent<Object>,Object> {
/**
* 可以自定义线程池,但是一个组中,只会采用其中一个线程池去执行,防止过多使用资源
* @return
*/
@Override
public Executor getExecutor() {
return null;
}
/**
* 任务执行机制控制服务
* @param param 返回值为以后callable使用
* @return
*/
@Override
public Object handle(ExecuteInvokerEvent<ExecuteInvokerEvent<Object>> param) {
try {
System.out.println(MessageFormat.format("【1】执行解析完成调用:应该执行时间:{0,date,yyyy-MM-dd HH:mm:ss}," +
"实际执行时间:{1,date,yyyy-MM-dd HH:mm:ss},创建时间:{2,date,yyyy-MM-dd HH:mm:ss}",param.getFiredTime(), new Date(),new Date(param.getCreateTime())));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
延时轮询线程异常处理器:
*/
@DelayedQueueExceptionHandler(value="delayedHandler",group="TEST_GROUP")
public class DelayedTestQueueExceptionHandler implements DelayedExceptionHandler {
@Override
public void catchException(Throwable e, Thread currentThread) {
System.out.println("asdasdasda---------------------");
// e.printStackTrace();
}
}
问题反馈
- 大家是不是觉得非常便利开发相关的延迟队列?
- 异常处理机制待优化
- 性能提升带优化
- 循环线程属于非常痛点和薄弱的问题