文章目录
- 前言
- 一、配置
- 1. 添加依赖
- 2. 配置文件/类
- 3. 注入redission
- 3. 封装工具类
- 二、应用
- 1. RedisUtils工具类的基本使用
- 三、队列
- 1. 工具类
- 2. 普通队列
- 3. 有界队列(限制数据量)
- 4. 延迟队列(延迟获取数据)
- 5. 优先队列(数据可插队)
前言
redission是一个开源的java redis的客户端,在其基础上进行了进一步扩展。这些扩展极大地丰富了Redis的应用场景,尤其是在构建分布式系统时。
一、配置
1. 添加依赖
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
<exclusions>
<exclusion>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-30</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-27</artifactId>
<version>${redisson.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redisson-spring-boot-starter</artifactId>
<version>${lock4j.version}</version>
</dependency>
2. 配置文件/类
spring:
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码(如没有密码请注释掉)
password: asd60787533
# 连接超时时间
timeout: 10s
# 是否开启ssl
ssl: false
redisson:
# redis key前缀
keyPrefix: demo
# 线程池数量
threads: 4
# Netty线程池数量
nettyThreads: 8
# 单节点配置
singleServerConfig:
# 客户端名称
clientName: demo
# 最小空闲连接数
connectionMinimumIdleSize: 8
# 连接池大小
connectionPoolSize: 32
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
/**
* redis缓存key前缀
*/
private String keyPrefix;
/**
* 线程池数量,默认值 = 当前处理核数量 * 2
*/
private int threads;
/**
* Netty线程池数量,默认值 = 当前处理核数量 * 2
*/
private int nettyThreads;
/**
* 单机服务配置
*/
private SingleServerConfig singleServerConfig;
/**
* 集群服务配置
*/
private ClusterServersConfig clusterServersConfig;
@Data
@NoArgsConstructor
public static class SingleServerConfig {
/**
* 客户端名称
*/
private String clientName;
/**
* 最小空闲连接数
*/
private int connectionMinimumIdleSize;
/**
* 连接池大小
*/
private int connectionPoolSize;
/**
* 连接空闲超时,单位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超时,单位:毫秒
*/
private int timeout;
/**
* 发布和订阅连接池大小
*/
private int subscriptionConnectionPoolSize;
}
@Data
@NoArgsConstructor
public static class ClusterServersConfig {
/**
* 客户端名称
*/
private String clientName;
/**
* master最小空闲连接数
*/
private int masterConnectionMinimumIdleSize;
/**
* master连接池大小
*/
private int masterConnectionPoolSize;
/**
* slave最小空闲连接数
*/
private int slaveConnectionMinimumIdleSize;
/**
* slave连接池大小
*/
private int slaveConnectionPoolSize;
/**
* 连接空闲超时,单位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超时,单位:毫秒
*/
private int timeout;
/**
* 发布和订阅连接池大小
*/
private int subscriptionConnectionPoolSize;
/**
* 读取模式
*/
private ReadMode readMode;
/**
* 订阅模式
*/
private SubscriptionMode subscriptionMode;
}
}
3. 注入redission
@Slf4j
@Configuration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {
@Autowired
private RedissonProperties redissonProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public RedissonAutoConfigurationCustomizer redissonCustomizer() {
return config -> {
config.setThreads(redissonProperties.getThreads())
.setNettyThreads(redissonProperties.getNettyThreads())
.setCodec(new JsonJacksonCodec(objectMapper));
RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
if (ObjectUtil.isNotNull(singleServerConfig)) {
// 使用单机模式
config.useSingleServer()
//设置redis key前缀
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(singleServerConfig.getTimeout())
.setClientName(singleServerConfig.getClientName())
.setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
.setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
.setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
}
// 集群配置方式 参考下方注释
RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
if (ObjectUtil.isNotNull(clusterServersConfig)) {
config.useClusterServers()
//设置redis key前缀
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(clusterServersConfig.getTimeout())
.setClientName(clusterServersConfig.getClientName())
.setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
.setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
.setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
.setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
.setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
.setReadMode(clusterServersConfig.getReadMode())
.setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
}
log.info("初始化 redis 配置");
};
}
/**
* redis集群配置 yml
*
* --- # redis 集群配置(单机与集群只能开启一个另一个需要注释掉)
* spring:
* redis:
* cluster:
* nodes:
* - 192.168.0.100:6379
* - 192.168.0.101:6379
* - 192.168.0.102:6379
* # 密码
* password:
* # 连接超时时间
* timeout: 10s
* # 是否开启ssl
* ssl: false
*
* redisson:
* # 线程池数量
* threads: 16
* # Netty线程池数量
* nettyThreads: 32
* # 集群配置
* clusterServersConfig:
* # 客户端名称
* clientName: ${ruoyi.name}
* # master最小空闲连接数
* masterConnectionMinimumIdleSize: 32
* # master连接池大小
* masterConnectionPoolSize: 64
* # slave最小空闲连接数
* slaveConnectionMinimumIdleSize: 32
* # slave连接池大小
* slaveConnectionPoolSize: 64
* # 连接空闲超时,单位:毫秒
* idleConnectionTimeout: 10000
* # 命令等待超时,单位:毫秒
* timeout: 3000
* # 发布和订阅连接池大小
* subscriptionConnectionPoolSize: 50
* # 读取模式
* readMode: "SLAVE"
* # 订阅模式
* subscriptionMode: "MASTER"
*/
}
public class KeyPrefixHandler implements NameMapper {
private final String keyPrefix;
public KeyPrefixHandler(String keyPrefix) {
//前缀为空 则返回空前缀
this.keyPrefix = StringUtils.isBlank(keyPrefix) ? "" : keyPrefix + ":";
}
/**
* 增加前缀
*/
@Override
public String map(String name) {
if (StringUtils.isBlank(name)) {
return null;
}
if (StringUtils.isNotBlank(keyPrefix) && !name.startsWith(keyPrefix)) {
return keyPrefix + name;
}
return name;
}
/**
* 去除前缀
*/
@Override
public String unmap(String name) {
if (StringUtils.isBlank(name)) {
return null;
}
if (StringUtils.isNotBlank(keyPrefix) && name.startsWith(keyPrefix)) {
return name.substring(keyPrefix.length());
}
return name;
}
}
3. 封装工具类
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public class RedisUtils {
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 限流
*
* @param key 限流key
* @param rateType 限流类型
* @param rate 速率
* @param rateInterval 速率间隔
* @return -1 表示失败
*/
public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {
RRateLimiter rateLimiter = CLIENT.getRateLimiter(key);
rateLimiter.trySetRate(rateType, rate, rateInterval, RateIntervalUnit.SECONDS);
if (rateLimiter.tryAcquire()) {
return rateLimiter.availablePermits();
} else {
return -1L;
}
}
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
/**
* 发布通道消息
*
* @param channelKey 通道key
* @param msg 发送数据
* @param consumer 自定义处理
*/
public static <T> void publish(String channelKey, T msg, Consumer<T> consumer) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.publish(msg);
consumer.accept(msg);
}
public static <T> void publish(String channelKey, T msg) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.publish(msg);
}
/**
* 订阅通道接收消息
*
* @param channelKey 通道key
* @param clazz 消息类型
* @param consumer 自定义处理
*/
public static <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.addListener(clazz, (channel, msg) -> consumer.accept(msg));
}
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
*/
public static <T> void setCacheObject(final String key, final T value) {
setCacheObject(key, value, false);
}
/**
* 缓存基本的对象,保留当前对象 TTL 有效期
*
* @param key 缓存的键值
* @param value 缓存的值
* @param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)
* @since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案
*/
public static <T> void setCacheObject(final String key, final T value, final boolean isSaveTtl) {
RBucket<T> bucket = CLIENT.getBucket(key);
if (isSaveTtl) {
try {
bucket.setAndKeepTTL(value);
} catch (Exception e) {
long timeToLive = bucket.remainTimeToLive();
setCacheObject(key, value, Duration.ofMillis(timeToLive));
}
} else {
bucket.set(value);
}
}
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
* @param duration 时间
*/
public static <T> void setCacheObject(final String key, final T value, final Duration duration) {
RBatch batch = CLIENT.createBatch();
RBucketAsync<T> bucket = batch.getBucket(key);
bucket.setAsync(value);
bucket.expireAsync(duration);
batch.execute();
}
/**
* 注册对象监听器
* <p>
* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
*
* @param key 缓存的键值
* @param listener 监听器配置
*/
public static <T> void addObjectListener(final String key, final ObjectListener listener) {
RBucket<T> result = CLIENT.getBucket(key);
result.addListener(listener);
}
/**
* 设置有效时间
*
* @param key Redis键
* @param timeout 超时时间
* @return true=设置成功;false=设置失败
*/
public static boolean expire(final String key, final long timeout) {
return expire(key, Duration.ofSeconds(timeout));
}
/**
* 设置有效时间
*
* @param key Redis键
* @param duration 超时时间
* @return true=设置成功;false=设置失败
*/
public static boolean expire(final String key, final Duration duration) {
RBucket rBucket = CLIENT.getBucket(key);
return rBucket.expire(duration);
}
/**
* 获得缓存的基本对象。
*
* @param key 缓存键值
* @return 缓存键值对应的数据
*/
public static <T> T getCacheObject(final String key) {
RBucket<T> rBucket = CLIENT.getBucket(key);
return rBucket.get();
}
/**
* 获得key剩余存活时间
*
* @param key 缓存键值
* @return 剩余存活时间
*/
public static <T> long getTimeToLive(final String key) {
RBucket<T> rBucket = CLIENT.getBucket(key);
return rBucket.remainTimeToLive();
}
/**
* 删除单个对象
*
* @param key 缓存的键值
*/
public static boolean deleteObject(final String key) {
return CLIENT.getBucket(key).delete();
}
/**
* 删除集合对象
*
* @param collection 多个对象
*/
public static void deleteObject(final Collection collection) {
RBatch batch = CLIENT.createBatch();
collection.forEach(t -> {
batch.getBucket(t.toString()).deleteAsync();
});
batch.execute();
}
/**
* 检查缓存对象是否存在
*
* @param key 缓存的键值
*/
public static boolean isExistsObject(final String key) {
return CLIENT.getBucket(key).isExists();
}
/**
* 缓存List数据
*
* @param key 缓存的键值
* @param dataList 待缓存的List数据
* @return 缓存的对象
*/
public static <T> boolean setCacheList(final String key, final List<T> dataList) {
RList<T> rList = CLIENT.getList(key);
return rList.addAll(dataList);
}
/**
* 注册List监听器
* <p>
* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
*
* @param key 缓存的键值
* @param listener 监听器配置
*/
public static <T> void addListListener(final String key, final ObjectListener listener) {
RList<T> rList = CLIENT.getList(key);
rList.addListener(listener);
}
/**
* 获得缓存的list对象
*
* @param key 缓存的键值
* @return 缓存键值对应的数据
*/
public static <T> List<T> getCacheList(final String key) {
RList<T> rList = CLIENT.getList(key);
return rList.readAll();
}
/**
* 缓存Set
*
* @param key 缓存键值
* @param dataSet 缓存的数据
* @return 缓存数据的对象
*/
public static <T> boolean setCacheSet(final String key, final Set<T> dataSet) {
RSet<T> rSet = CLIENT.getSet(key);
return rSet.addAll(dataSet);
}
/**
* 注册Set监听器
* <p>
* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
*
* @param key 缓存的键值
* @param listener 监听器配置
*/
public static <T> void addSetListener(final String key, final ObjectListener listener) {
RSet<T> rSet = CLIENT.getSet(key);
rSet.addListener(listener);
}
/**
* 获得缓存的set
*
* @param key 缓存的key
* @return set对象
*/
public static <T> Set<T> getCacheSet(final String key) {
RSet<T> rSet = CLIENT.getSet(key);
return rSet.readAll();
}
/**
* 缓存Map
*
* @param key 缓存的键值
* @param dataMap 缓存的数据
*/
public static <T> void setCacheMap(final String key, final Map<String, T> dataMap) {
if (dataMap != null) {
RMap<String, T> rMap = CLIENT.getMap(key);
rMap.putAll(dataMap);
}
}
/**
* 注册Map监听器
* <p>
* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
*
* @param key 缓存的键值
* @param listener 监听器配置
*/
public static <T> void addMapListener(final String key, final ObjectListener listener) {
RMap<String, T> rMap = CLIENT.getMap(key);
rMap.addListener(listener);
}
/**
* 获得缓存的Map
*
* @param key 缓存的键值
* @return map对象
*/
public static <T> Map<String, T> getCacheMap(final String key) {
RMap<String, T> rMap = CLIENT.getMap(key);
return rMap.getAll(rMap.keySet());
}
/**
* 获得缓存Map的key列表
*
* @param key 缓存的键值
* @return key列表
*/
public static <T> Set<String> getCacheMapKeySet(final String key) {
RMap<String, T> rMap = CLIENT.getMap(key);
return rMap.keySet();
}
/**
* 往Hash中存入数据
*
* @param key Redis键
* @param hKey Hash键
* @param value 值
*/
public static <T> void setCacheMapValue(final String key, final String hKey, final T value) {
RMap<String, T> rMap = CLIENT.getMap(key);
rMap.put(hKey, value);
}
/**
* 获取Hash中的数据
*
* @param key Redis键
* @param hKey Hash键
* @return Hash中的对象
*/
public static <T> T getCacheMapValue(final String key, final String hKey) {
RMap<String, T> rMap = CLIENT.getMap(key);
return rMap.get(hKey);
}
/**
* 删除Hash中的数据
*
* @param key Redis键
* @param hKey Hash键
* @return Hash中的对象
*/
public static <T> T delCacheMapValue(final String key, final String hKey) {
RMap<String, T> rMap = CLIENT.getMap(key);
return rMap.remove(hKey);
}
/**
* 获取多个Hash中的数据
*
* @param key Redis键
* @param hKeys Hash键集合
* @return Hash对象集合
*/
public static <K, V> Map<K, V> getMultiCacheMapValue(final String key, final Set<K> hKeys) {
RMap<K, V> rMap = CLIENT.getMap(key);
return rMap.getAll(hKeys);
}
/**
* 设置原子值
*
* @param key Redis键
* @param value 值
*/
public static void setAtomicValue(String key, long value) {
RAtomicLong atomic = CLIENT.getAtomicLong(key);
atomic.set(value);
}
/**
* 获取原子值
*
* @param key Redis键
* @return 当前值
*/
public static long getAtomicValue(String key) {
RAtomicLong atomic = CLIENT.getAtomicLong(key);
return atomic.get();
}
/**
* 递增原子值
*
* @param key Redis键
* @return 当前值
*/
public static long incrAtomicValue(String key) {
RAtomicLong atomic = CLIENT.getAtomicLong(key);
return atomic.incrementAndGet();
}
/**
* 递减原子值
*
* @param key Redis键
* @return 当前值
*/
public static long decrAtomicValue(String key) {
RAtomicLong atomic = CLIENT.getAtomicLong(key);
return atomic.decrementAndGet();
}
/**
* 获得缓存的基本对象列表
*
* @param pattern 字符串前缀
* @return 对象列表
*/
public static Collection<String> keys(final String pattern) {
Stream<String> stream = CLIENT.getKeys().getKeysStreamByPattern(pattern);
return stream.collect(Collectors.toList());
}
/**
* 删除缓存的基本对象列表
*
* @param pattern 字符串前缀
*/
public static void deleteKeys(final String pattern) {
CLIENT.getKeys().deleteByPattern(pattern);
}
/**
* 检查redis中是否存在key
*
* @param key 键
*/
public static Boolean hasKey(String key) {
RKeys rKeys = CLIENT.getKeys();
return rKeys.countExists(key) > 0;
}
}
二、应用
1. RedisUtils工具类的基本使用
创建接口
@GetMapping("key")
public String getKey(String key){
return RedisUtils.getCacheObject(key);
}
@GetMapping("setKey")
public String setKey(String key,String value){
RedisUtils.setCacheObject(key,value);
return "success";
}
设置key
获取key对应的值
其他方法的作用,可以自行测试。这里就不再演示使用
三、队列
redission也支持队列,下面封装了一些队列的相关方法。可以处理了一些简单的队列任务,如果业务复杂可以选择mq
1. 工具类
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {
private static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
/**
* 添加普通队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.offer(data);
}
/**
* 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.poll();
}
/**
* 通用删除队列数据(不支持延迟队列)
*/
public static <T> boolean removeQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.delete();
}
/**
* 添加延迟队列数据 默认毫秒
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
}
/**
* 添加延迟队列数据
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
* @param timeUnit 单位
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.offer(data, time, timeUnit);
}
/**
* 获取一个延迟队列数据 没有数据返回 null
*
* @param queueName 队列名
*/
public static <T> T getDelayedQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.poll();
}
/**
* 删除延迟队列数据
*/
public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.remove(data);
}
/**
* 销毁延迟队列 所有阻塞监听 报错
*/
public static <T> void destroyDelayedQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.destroy();
}
/**
* 添加优先队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addPriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
return priorityBlockingQueue.offer(data);
}
/**
* 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getPriorityQueueObject(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.poll();
}
/**
* 优先队列删除队列数据(不支持延迟队列)
*/
public static <T> boolean removePriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyPriorityQueue(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.delete();
}
/**
* 尝试设置 有界队列 容量 用于限制数量
*
* @param queueName 队列名
* @param capacity 容量
*/
public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
return boundedBlockingQueue.trySetCapacity(capacity);
}
/**
* 尝试设置 有界队列 容量 用于限制数量
*
* @param queueName 队列名
* @param capacity 容量
* @param destroy 已存在是否销毁
*/
public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
if (boundedBlockingQueue.isExists() && destroy) {
destroyQueue(queueName);
}
return boundedBlockingQueue.trySetCapacity(capacity);
}
/**
* 添加有界队列数据
*
* @param queueName 队列名
* @param data 数据
* @return 添加成功 true 已达到界限 false
*/
public static <T> boolean addBoundedQueueObject(String queueName, T data) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
return boundedBlockingQueue.offer(data);
}
/**
* 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getBoundedQueueObject(String queueName) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.poll();
}
/**
* 有界队列删除队列数据(不支持延迟队列)
*/
public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyBoundedQueue(String queueName) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.delete();
}
/**
* 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
*/
public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
if (isDelayed) {
// 订阅延迟队列
CLIENT.getDelayedQueue(queue);
}
queue.subscribeOnElements(consumer);
}
}
2. 普通队列
添加数据到队列
@GetMapping("add")
public String add(){
QueueUtils.addQueueObject("queue:simple",1);
QueueUtils.addQueueObject("queue:simple",2);
QueueUtils.addQueueObject("queue:simple",3);
return "ok";
}
消费队列数据
遵循先进先出,获取数据后就会删除。如果队列中没有数据,获取到的就为null
@GetMapping("get")
public Integer get(){
return QueueUtils.getQueueObject("queue:simple");
}
移除队列数据
@GetMapping("remove")
public String remove(){
QueueUtils.removeQueueObject("queue:simple",3);
return "ok";
}
销毁队列
@GetMapping("destroy")
public String destroy(){
QueueUtils.destroyQueue("queue:simple");
return "ok";
}
订阅队列消息
- 订阅的消息一般在项目启动的时候使用,只能订阅一次
- 当监听到队列新增数据的时候会立即取出来进行消费
@PostConstruct
public void sub(){
QueueUtils.subscribeBlockingQueue("queue:simple",(o)->{
System.out.println("接收到消息:"+o);
},false);
}
我们再次调用新增
3. 有界队列(限制数据量)
设置队列最大容量
有界队列在使用前必须设置容量
@GetMapping("set")
public String set(){
boolean b = QueueUtils.trySetBoundedQueueCapacity("queue:bound", 10);
return "ok";
}
新增有界队列数据
@GetMapping("add")
public String add(){
QueueUtils.addBoundedQueueObject("queue:bound",1);
return "ok";
}
新增完毕后我们可以发现,我们直接设置的最大容量变成来了9。每次添加数据都会查询当前最大容量是否>0,如果大于0添加成功并且减一,否则添加失败
获取有界队列数据
@GetMapping("get")
public Integer get(){
return QueueUtils.getBoundedQueueObject("queue:bound");
}
我们可以看到当获取数据的时候,容量+1,数据从redis中删除
其他用法与普通队列类似,就不再演示了
4. 延迟队列(延迟获取数据)
添加延迟数据
延迟队列的实现原理是将数据添加到另一个缓存队列中,当到达指定时间才会转移到普通队列中
@GetMapping("add")
public String add(){
QueueUtils.addDelayedQueueObject("queue:belay",1,10, TimeUnit.SECONDS);
return "ok";
}
获取延迟数据
必须达到指定时间后才能获取
@GetMapping("get")
public Integer get(){
return QueueUtils.getDelayedQueueObject("queue:belay");
}
删除延迟数据
@GetMapping("remove")
public String remove(){
QueueUtils.removeQueueObject("queue:belay",3);
return "ok";
}
清空延迟数据
@GetMapping("destroy")
public String destroy(){
QueueUtils.destroyDelayedQueue("queue:belay");
return "ok";
}
订阅消息使用方法同普通队列类似,第三个参数需要改为true
5. 优先队列(数据可插队)
插入优先队列的数据我们需要先实现比较接口
@Data
@Accessors(chain = true)
class Order implements Comparable<Order>{
private Long id;
@Override
public int compareTo(Order o) {
return Long.compare(getId(), o.id);
}
}
新增优先数据
@GetMapping("add")
public String add(){
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(1L));
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(6L));
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(2L));
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(5L));
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(22L));
QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(3L));
return "ok";
}
我们可以看到插入的数据是有序的
获取优先队列数据
@GetMapping("get")
public Integer get(){
return QueueUtils.getPriorityQueueObject("queue:priority");
}
删除优先队列数据
@GetMapping("remove")
public String remove(){
QueueUtils.removeQueueObject("queue:priority",3);
return "ok";
}
清空优先队列数据
@GetMapping("destroy")
public String destroy(){
QueueUtils.destroyDelayedQueue("queue:priority");
return "ok";
}
订阅消息使用方法同普通队列一样