包含组件内容
RedisQueue:消息队列监听标识 RedisQueueInit:Redis队列监听器 RedisQueueListener:Redis消息队列监听实现 RedisQueueService:Redis消息队列服务工具
代码实现
RedisQueue
import java. lang. annotation. ElementType ;
import java. lang. annotation. Retention ;
import java. lang. annotation. RetentionPolicy ;
import java. lang. annotation. Target ;
@Target ( ElementType . TYPE )
@Retention ( RetentionPolicy . RUNTIME )
public @interface RedisQueue {
String value ( ) ;
}
RedisQueueInit
import jakarta. annotation. Resource ;
import java. util. Map ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. LinkedBlockingQueue ;
import java. util. concurrent. ThreadFactory ;
import java. util. concurrent. ThreadPoolExecutor ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. atomic. AtomicBoolean ;
import java. util. concurrent. atomic. AtomicInteger ;
import lombok. extern. slf4j. Slf4j ;
import org. jetbrains. annotations. NotNull ;
import org. redisson. RedissonShutdownException ;
import org. redisson. api. RBlockingQueue ;
import org. redisson. api. RedissonClient ;
import org. springframework. beans. BeansException ;
import org. springframework. context. ApplicationContext ;
import org. springframework. context. ApplicationContextAware ;
import org. springframework. stereotype. Component ;
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {
public static final String REDIS_QUEUE_PREFIX = "redis-queue" ;
final AtomicBoolean shutdownRequested = new AtomicBoolean ( false ) ;
@Resource
private RedissonClient redissonClient;
private ExecutorService executorService;
public static String buildQueueName ( String queueName) {
return REDIS_QUEUE_PREFIX + ":" + queueName;
}
@Override
public void setApplicationContext ( ApplicationContext applicationContext) throws BeansException {
Map < String , RedisQueueListener > queueListeners = applicationContext. getBeansOfType ( RedisQueueListener . class ) ;
if ( ! queueListeners. isEmpty ( ) ) {
executorService = createThreadPool ( ) ;
for ( Map. Entry < String , RedisQueueListener > entry : queueListeners. entrySet ( ) ) {
RedisQueue redisQueue = entry. getValue ( ) . getClass ( ) . getAnnotation ( RedisQueue . class ) ;
if ( redisQueue != null ) {
String queueName = redisQueue. value ( ) ;
executorService. submit ( ( ) -> listenQueue ( queueName, entry. getValue ( ) ) ) ;
}
}
}
}
private ExecutorService createThreadPool ( ) {
return new ThreadPoolExecutor (
Runtime . getRuntime ( ) . availableProcessors ( ) * 2 ,
Runtime . getRuntime ( ) . availableProcessors ( ) * 4 ,
60L , TimeUnit . SECONDS ,
new LinkedBlockingQueue < > ( 100 ) ,
new NamedThreadFactory ( REDIS_QUEUE_PREFIX ) ,
new ThreadPoolExecutor. CallerRunsPolicy ( )
) ;
}
private void listenQueue ( String queueName, RedisQueueListener redisQueueListener) {
queueName = buildQueueName ( queueName) ;
RBlockingQueue < ? > blockingQueue = redissonClient. getBlockingQueue ( queueName) ;
log. info ( "Redis队列监听开启: {}" , queueName) ;
while ( ! shutdownRequested. get ( ) && ! redissonClient. isShutdown ( ) ) {
try {
Object message = blockingQueue. take ( ) ;
executorService. submit ( ( ) -> redisQueueListener. consume ( message) ) ;
} catch ( RedissonShutdownException e) {
log. info ( "Redis连接关闭,停止监听队列: {}" , queueName) ;
break ;
} catch ( Exception e) {
log. error ( "监听队列异常: {}" , queueName, e) ;
}
}
}
public void shutdown ( ) {
if ( executorService != null ) {
executorService. shutdown ( ) ;
try {
if ( ! executorService. awaitTermination ( 60 , TimeUnit . SECONDS ) ) {
executorService. shutdownNow ( ) ;
}
} catch ( InterruptedException ex) {
executorService. shutdownNow ( ) ;
Thread . currentThread ( ) . interrupt ( ) ;
}
}
shutdownRequested. set ( true ) ;
if ( redissonClient != null && ! redissonClient. isShuttingDown ( ) ) {
redissonClient. shutdown ( ) ;
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger ( 1 ) ;
private final String namePrefix;
public NamedThreadFactory ( String prefix) {
this . namePrefix = prefix;
}
@Override
public Thread newThread ( @NotNull Runnable r) {
return new Thread ( r, namePrefix + "-" + threadNumber. getAndIncrement ( ) ) ;
}
}
}
RedisQueueListener
public interface RedisQueueListener < T > {
void consume ( T content) ;
}
RedisQueueService
import jakarta. annotation. Resource ;
import java. util. concurrent. TimeUnit ;
import org. redisson. api. RBlockingQueue ;
import org. redisson. api. RDelayedQueue ;
import org. redisson. api. RedissonClient ;
import org. springframework. stereotype. Component ;
@Component
public class RedisQueueService {
@Resource
private RedissonClient redissonClient;
public < T > void send ( String queueName, T content) {
RBlockingQueue < T > blockingQueue = redissonClient. getBlockingQueue ( RedisQueueInit . buildQueueName ( queueName) ) ;
blockingQueue. add ( content) ;
}
public < T > void sendDelay ( String queueName, T content, long delay, TimeUnit timeUnit) {
RBlockingQueue < T > blockingFairQueue = redissonClient. getBlockingQueue ( RedisQueueInit . buildQueueName ( queueName) ) ;
RDelayedQueue < T > delayedQueue = redissonClient. getDelayedQueue ( blockingFairQueue) ;
delayedQueue. offer ( content, delay, timeUnit) ;
}
public < T > void sendDelay ( String queueName, T content, long delay) {
RBlockingQueue < T > blockingFairQueue = redissonClient. getBlockingQueue ( RedisQueueInit . buildQueueName ( queueName) ) ;
RDelayedQueue < T > delayedQueue = redissonClient. getDelayedQueue ( blockingFairQueue) ;
delayedQueue. offer ( content, delay, TimeUnit . MILLISECONDS ) ;
}
}
测试
创建监听对象
import cn. yiyanc. infrastructure. redis. annotation. RedisQueue ;
import cn. yiyanc. infrastructure. redis. queue. RedisQueueListener ;
import lombok. extern. slf4j. Slf4j ;
import org. springframework. stereotype. Component ;
@Slf4j
@Component
@RedisQueue ( "test" )
public class TestListener implements RedisQueueListener < String > {
@Override
public void invoke ( String content) {
log. info ( "队列消息接收 >>> {}" , content) ;
}
}
测试用例
import jakarta. annotation. Resource ;
import org. springframework. web. bind. annotation. PostMapping ;
import org. springframework. web. bind. annotation. RequestMapping ;
import org. springframework. web. bind. annotation. RestController ;
@RestController
@RequestMapping ( "queue" )
public class QueueController {
@Resource
private RedisQueueService redisQueueService;
@PostMapping ( "send" )
public void send ( String message) {
redisQueueService. send ( "test" , message) ;
redisQueueService. sendDelay ( "test" , "delay messaege -> " + message, 1000 ) ;
}
}
测试结果