业务服务:redisson

文章目录

  • 前言
  • 一、配置
    • 1. 添加依赖
    • 2. 配置文件/类
    • 3. 注入redission
    • 3. 封装工具类
  • 二、应用
    • 1. RedisUtils工具类的基本使用
  • 三、队列
    • 1. 工具类
    • 2. 普通队列
    • 3. 有界队列(限制数据量)
    • 4. 延迟队列(延迟获取数据)
    • 5. 优先队列(数据可插队)


前言

redission是一个开源的java redis的客户端,在其基础上进行了进一步扩展。这些扩展极大地丰富了Redis的应用场景,尤其是在构建分布式系统时。


一、配置

1. 添加依赖

<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${redisson.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-data-30</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-27</artifactId>
    <version>${redisson.version}</version>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>lock4j-redisson-spring-boot-starter</artifactId>
    <version>${lock4j.version}</version>
</dependency>

在这里插入图片描述

2. 配置文件/类

spring:
  redis:
    # 地址
    host: localhost
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 0
    # 密码(如没有密码请注释掉)
    password: asd60787533
    # 连接超时时间
    timeout: 10s
    # 是否开启ssl
    ssl: false

redisson:
  # redis key前缀
  keyPrefix: demo
  # 线程池数量
  threads: 4
  # Netty线程池数量
  nettyThreads: 8
  # 单节点配置
  singleServerConfig:
    # 客户端名称
    clientName: demo
    # 最小空闲连接数
    connectionMinimumIdleSize: 8
    # 连接池大小
    connectionPoolSize: 32
    # 连接空闲超时,单位:毫秒
    idleConnectionTimeout: 10000
    # 命令等待超时,单位:毫秒
    timeout: 3000
    # 发布和订阅连接池大小
    subscriptionConnectionPoolSize: 50
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {

    /**
     * redis缓存key前缀
     */
    private String keyPrefix;

    /**
     * 线程池数量,默认值 = 当前处理核数量 * 2
     */
    private int threads;

    /**
     * Netty线程池数量,默认值 = 当前处理核数量 * 2
     */
    private int nettyThreads;

    /**
     * 单机服务配置
     */
    private SingleServerConfig singleServerConfig;

    /**
     * 集群服务配置
     */
    private ClusterServersConfig clusterServersConfig;

    @Data
    @NoArgsConstructor
    public static class SingleServerConfig {

        /**
         * 客户端名称
         */
        private String clientName;

        /**
         * 最小空闲连接数
         */
        private int connectionMinimumIdleSize;

        /**
         * 连接池大小
         */
        private int connectionPoolSize;

        /**
         * 连接空闲超时,单位:毫秒
         */
        private int idleConnectionTimeout;

        /**
         * 命令等待超时,单位:毫秒
         */
        private int timeout;

        /**
         * 发布和订阅连接池大小
         */
        private int subscriptionConnectionPoolSize;

    }

    @Data
    @NoArgsConstructor
    public static class ClusterServersConfig {

        /**
         * 客户端名称
         */
        private String clientName;

        /**
         * master最小空闲连接数
         */
        private int masterConnectionMinimumIdleSize;

        /**
         * master连接池大小
         */
        private int masterConnectionPoolSize;

        /**
         * slave最小空闲连接数
         */
        private int slaveConnectionMinimumIdleSize;

        /**
         * slave连接池大小
         */
        private int slaveConnectionPoolSize;

        /**
         * 连接空闲超时,单位:毫秒
         */
        private int idleConnectionTimeout;

        /**
         * 命令等待超时,单位:毫秒
         */
        private int timeout;

        /**
         * 发布和订阅连接池大小
         */
        private int subscriptionConnectionPoolSize;

        /**
         * 读取模式
         */
        private ReadMode readMode;

        /**
         * 订阅模式
         */
        private SubscriptionMode subscriptionMode;

    }

}

3. 注入redission

@Slf4j
@Configuration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {

    @Autowired
    private RedissonProperties redissonProperties;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public RedissonAutoConfigurationCustomizer redissonCustomizer() {
        return config -> {
            config.setThreads(redissonProperties.getThreads())
                .setNettyThreads(redissonProperties.getNettyThreads())
                .setCodec(new JsonJacksonCodec(objectMapper));
            RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
            if (ObjectUtil.isNotNull(singleServerConfig)) {
                // 使用单机模式
                config.useSingleServer()
                    //设置redis key前缀
                    .setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
                    .setTimeout(singleServerConfig.getTimeout())
                    .setClientName(singleServerConfig.getClientName())
                    .setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
                    .setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
                    .setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
                    .setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
            }
            // 集群配置方式 参考下方注释
            RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
            if (ObjectUtil.isNotNull(clusterServersConfig)) {
                config.useClusterServers()
                    //设置redis key前缀
                    .setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
                    .setTimeout(clusterServersConfig.getTimeout())
                    .setClientName(clusterServersConfig.getClientName())
                    .setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
                    .setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
                    .setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
                    .setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
                    .setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
                    .setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
                    .setReadMode(clusterServersConfig.getReadMode())
                    .setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
            }
            log.info("初始化 redis 配置");
        };
    }
    /**
     * redis集群配置 yml
     *
     * --- # redis 集群配置(单机与集群只能开启一个另一个需要注释掉)
     * spring:
     *   redis:
     *     cluster:
     *       nodes:
     *         - 192.168.0.100:6379
     *         - 192.168.0.101:6379
     *         - 192.168.0.102:6379
     *     # 密码
     *     password:
     *     # 连接超时时间
     *     timeout: 10s
     *     # 是否开启ssl
     *     ssl: false
     *
     * redisson:
     *   # 线程池数量
     *   threads: 16
     *   # Netty线程池数量
     *   nettyThreads: 32
     *   # 集群配置
     *   clusterServersConfig:
     *     # 客户端名称
     *     clientName: ${ruoyi.name}
     *     # master最小空闲连接数
     *     masterConnectionMinimumIdleSize: 32
     *     # master连接池大小
     *     masterConnectionPoolSize: 64
     *     # slave最小空闲连接数
     *     slaveConnectionMinimumIdleSize: 32
     *     # slave连接池大小
     *     slaveConnectionPoolSize: 64
     *     # 连接空闲超时,单位:毫秒
     *     idleConnectionTimeout: 10000
     *     # 命令等待超时,单位:毫秒
     *     timeout: 3000
     *     # 发布和订阅连接池大小
     *     subscriptionConnectionPoolSize: 50
     *     # 读取模式
     *     readMode: "SLAVE"
     *     # 订阅模式
     *     subscriptionMode: "MASTER"
     */

}
public class KeyPrefixHandler implements NameMapper {

    private final String keyPrefix;

    public KeyPrefixHandler(String keyPrefix) {
        //前缀为空 则返回空前缀
        this.keyPrefix = StringUtils.isBlank(keyPrefix) ? "" : keyPrefix + ":";
    }

    /**
     * 增加前缀
     */
    @Override
    public String map(String name) {
        if (StringUtils.isBlank(name)) {
            return null;
        }
        if (StringUtils.isNotBlank(keyPrefix) && !name.startsWith(keyPrefix)) {
            return keyPrefix + name;
        }
        return name;
    }

    /**
     * 去除前缀
     */
    @Override
    public String unmap(String name) {
        if (StringUtils.isBlank(name)) {
            return null;
        }
        if (StringUtils.isNotBlank(keyPrefix) && name.startsWith(keyPrefix)) {
            return name.substring(keyPrefix.length());
        }
        return name;
    }
}

3. 封装工具类

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public class RedisUtils {

    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);

    /**
     * 限流
     *
     * @param key          限流key
     * @param rateType     限流类型
     * @param rate         速率
     * @param rateInterval 速率间隔
     * @return -1 表示失败
     */
    public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {
        RRateLimiter rateLimiter = CLIENT.getRateLimiter(key);
        rateLimiter.trySetRate(rateType, rate, rateInterval, RateIntervalUnit.SECONDS);
        if (rateLimiter.tryAcquire()) {
            return rateLimiter.availablePermits();
        } else {
            return -1L;
        }
    }

    /**
     * 获取客户端实例
     */
    public static RedissonClient getClient() {
        return CLIENT;
    }

    /**
     * 发布通道消息
     *
     * @param channelKey 通道key
     * @param msg        发送数据
     * @param consumer   自定义处理
     */
    public static <T> void publish(String channelKey, T msg, Consumer<T> consumer) {
        RTopic topic = CLIENT.getTopic(channelKey);
        topic.publish(msg);
        consumer.accept(msg);
    }

    public static <T> void publish(String channelKey, T msg) {
        RTopic topic = CLIENT.getTopic(channelKey);
        topic.publish(msg);
    }

    /**
     * 订阅通道接收消息
     *
     * @param channelKey 通道key
     * @param clazz      消息类型
     * @param consumer   自定义处理
     */
    public static <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {
        RTopic topic = CLIENT.getTopic(channelKey);
        topic.addListener(clazz, (channel, msg) -> consumer.accept(msg));
    }

    /**
     * 缓存基本的对象,Integer、String、实体类等
     *
     * @param key   缓存的键值
     * @param value 缓存的值
     */
    public static <T> void setCacheObject(final String key, final T value) {
        setCacheObject(key, value, false);
    }

    /**
     * 缓存基本的对象,保留当前对象 TTL 有效期
     *
     * @param key       缓存的键值
     * @param value     缓存的值
     * @param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)
     * @since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案
     */
    public static <T> void setCacheObject(final String key, final T value, final boolean isSaveTtl) {
        RBucket<T> bucket = CLIENT.getBucket(key);
        if (isSaveTtl) {
            try {
                bucket.setAndKeepTTL(value);
            } catch (Exception e) {
                long timeToLive = bucket.remainTimeToLive();
                setCacheObject(key, value, Duration.ofMillis(timeToLive));
            }
        } else {
            bucket.set(value);
        }
    }

    /**
     * 缓存基本的对象,Integer、String、实体类等
     *
     * @param key      缓存的键值
     * @param value    缓存的值
     * @param duration 时间
     */
    public static <T> void setCacheObject(final String key, final T value, final Duration duration) {
        RBatch batch = CLIENT.createBatch();
        RBucketAsync<T> bucket = batch.getBucket(key);
        bucket.setAsync(value);
        bucket.expireAsync(duration);
        batch.execute();
    }

    /**
     * 注册对象监听器
     * <p>
     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
     *
     * @param key      缓存的键值
     * @param listener 监听器配置
     */
    public static <T> void addObjectListener(final String key, final ObjectListener listener) {
        RBucket<T> result = CLIENT.getBucket(key);
        result.addListener(listener);
    }

    /**
     * 设置有效时间
     *
     * @param key     Redis键
     * @param timeout 超时时间
     * @return true=设置成功;false=设置失败
     */
    public static boolean expire(final String key, final long timeout) {
        return expire(key, Duration.ofSeconds(timeout));
    }

    /**
     * 设置有效时间
     *
     * @param key      Redis键
     * @param duration 超时时间
     * @return true=设置成功;false=设置失败
     */
    public static boolean expire(final String key, final Duration duration) {
        RBucket rBucket = CLIENT.getBucket(key);
        return rBucket.expire(duration);
    }

    /**
     * 获得缓存的基本对象。
     *
     * @param key 缓存键值
     * @return 缓存键值对应的数据
     */
    public static <T> T getCacheObject(final String key) {
        RBucket<T> rBucket = CLIENT.getBucket(key);
        return rBucket.get();
    }

    /**
     * 获得key剩余存活时间
     *
     * @param key 缓存键值
     * @return 剩余存活时间
     */
    public static <T> long getTimeToLive(final String key) {
        RBucket<T> rBucket = CLIENT.getBucket(key);
        return rBucket.remainTimeToLive();
    }

    /**
     * 删除单个对象
     *
     * @param key 缓存的键值
     */
    public static boolean deleteObject(final String key) {
        return CLIENT.getBucket(key).delete();
    }

    /**
     * 删除集合对象
     *
     * @param collection 多个对象
     */
    public static void deleteObject(final Collection collection) {
        RBatch batch = CLIENT.createBatch();
        collection.forEach(t -> {
            batch.getBucket(t.toString()).deleteAsync();
        });
        batch.execute();
    }

    /**
     * 检查缓存对象是否存在
     *
     * @param key 缓存的键值
     */
    public static boolean isExistsObject(final String key) {
        return CLIENT.getBucket(key).isExists();
    }

    /**
     * 缓存List数据
     *
     * @param key      缓存的键值
     * @param dataList 待缓存的List数据
     * @return 缓存的对象
     */
    public static <T> boolean setCacheList(final String key, final List<T> dataList) {
        RList<T> rList = CLIENT.getList(key);
        return rList.addAll(dataList);
    }

    /**
     * 注册List监听器
     * <p>
     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
     *
     * @param key      缓存的键值
     * @param listener 监听器配置
     */
    public static <T> void addListListener(final String key, final ObjectListener listener) {
        RList<T> rList = CLIENT.getList(key);
        rList.addListener(listener);
    }

    /**
     * 获得缓存的list对象
     *
     * @param key 缓存的键值
     * @return 缓存键值对应的数据
     */
    public static <T> List<T> getCacheList(final String key) {
        RList<T> rList = CLIENT.getList(key);
        return rList.readAll();
    }

    /**
     * 缓存Set
     *
     * @param key     缓存键值
     * @param dataSet 缓存的数据
     * @return 缓存数据的对象
     */
    public static <T> boolean setCacheSet(final String key, final Set<T> dataSet) {
        RSet<T> rSet = CLIENT.getSet(key);
        return rSet.addAll(dataSet);
    }

    /**
     * 注册Set监听器
     * <p>
     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
     *
     * @param key      缓存的键值
     * @param listener 监听器配置
     */
    public static <T> void addSetListener(final String key, final ObjectListener listener) {
        RSet<T> rSet = CLIENT.getSet(key);
        rSet.addListener(listener);
    }

    /**
     * 获得缓存的set
     *
     * @param key 缓存的key
     * @return set对象
     */
    public static <T> Set<T> getCacheSet(final String key) {
        RSet<T> rSet = CLIENT.getSet(key);
        return rSet.readAll();
    }

    /**
     * 缓存Map
     *
     * @param key     缓存的键值
     * @param dataMap 缓存的数据
     */
    public static <T> void setCacheMap(final String key, final Map<String, T> dataMap) {
        if (dataMap != null) {
            RMap<String, T> rMap = CLIENT.getMap(key);
            rMap.putAll(dataMap);
        }
    }

    /**
     * 注册Map监听器
     * <p>
     * key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置
     *
     * @param key      缓存的键值
     * @param listener 监听器配置
     */
    public static <T> void addMapListener(final String key, final ObjectListener listener) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        rMap.addListener(listener);
    }

    /**
     * 获得缓存的Map
     *
     * @param key 缓存的键值
     * @return map对象
     */
    public static <T> Map<String, T> getCacheMap(final String key) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        return rMap.getAll(rMap.keySet());
    }

    /**
     * 获得缓存Map的key列表
     *
     * @param key 缓存的键值
     * @return key列表
     */
    public static <T> Set<String> getCacheMapKeySet(final String key) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        return rMap.keySet();
    }

    /**
     * 往Hash中存入数据
     *
     * @param key   Redis键
     * @param hKey  Hash键
     * @param value 值
     */
    public static <T> void setCacheMapValue(final String key, final String hKey, final T value) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        rMap.put(hKey, value);
    }

    /**
     * 获取Hash中的数据
     *
     * @param key  Redis键
     * @param hKey Hash键
     * @return Hash中的对象
     */
    public static <T> T getCacheMapValue(final String key, final String hKey) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        return rMap.get(hKey);
    }

    /**
     * 删除Hash中的数据
     *
     * @param key  Redis键
     * @param hKey Hash键
     * @return Hash中的对象
     */
    public static <T> T delCacheMapValue(final String key, final String hKey) {
        RMap<String, T> rMap = CLIENT.getMap(key);
        return rMap.remove(hKey);
    }

    /**
     * 获取多个Hash中的数据
     *
     * @param key   Redis键
     * @param hKeys Hash键集合
     * @return Hash对象集合
     */
    public static <K, V> Map<K, V> getMultiCacheMapValue(final String key, final Set<K> hKeys) {
        RMap<K, V> rMap = CLIENT.getMap(key);
        return rMap.getAll(hKeys);
    }

    /**
     * 设置原子值
     *
     * @param key   Redis键
     * @param value 值
     */
    public static void setAtomicValue(String key, long value) {
        RAtomicLong atomic = CLIENT.getAtomicLong(key);
        atomic.set(value);
    }

    /**
     * 获取原子值
     *
     * @param key Redis键
     * @return 当前值
     */
    public static long getAtomicValue(String key) {
        RAtomicLong atomic = CLIENT.getAtomicLong(key);
        return atomic.get();
    }

    /**
     * 递增原子值
     *
     * @param key Redis键
     * @return 当前值
     */
    public static long incrAtomicValue(String key) {
        RAtomicLong atomic = CLIENT.getAtomicLong(key);
        return atomic.incrementAndGet();
    }

    /**
     * 递减原子值
     *
     * @param key Redis键
     * @return 当前值
     */
    public static long decrAtomicValue(String key) {
        RAtomicLong atomic = CLIENT.getAtomicLong(key);
        return atomic.decrementAndGet();
    }

    /**
     * 获得缓存的基本对象列表
     *
     * @param pattern 字符串前缀
     * @return 对象列表
     */
    public static Collection<String> keys(final String pattern) {
        Stream<String> stream = CLIENT.getKeys().getKeysStreamByPattern(pattern);
        return stream.collect(Collectors.toList());
    }

    /**
     * 删除缓存的基本对象列表
     *
     * @param pattern 字符串前缀
     */
    public static void deleteKeys(final String pattern) {
        CLIENT.getKeys().deleteByPattern(pattern);
    }

    /**
     * 检查redis中是否存在key
     *
     * @param key 键
     */
    public static Boolean hasKey(String key) {
        RKeys rKeys = CLIENT.getKeys();
        return rKeys.countExists(key) > 0;
    }
}

二、应用

1. RedisUtils工具类的基本使用

创建接口

@GetMapping("key")
public String getKey(String key){
    return RedisUtils.getCacheObject(key);
}

@GetMapping("setKey")
public String setKey(String key,String value){
    RedisUtils.setCacheObject(key,value);
    return "success";
}

设置key

在这里插入图片描述
在这里插入图片描述

获取key对应的值

在这里插入图片描述
其他方法的作用,可以自行测试。这里就不再演示使用

三、队列

redission也支持队列,下面封装了一些队列的相关方法。可以处理了一些简单的队列任务,如果业务复杂可以选择mq

1. 工具类

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {

    private static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);


    /**
     * 获取客户端实例
     */
    public static RedissonClient getClient() {
        return CLIENT;
    }

    /**
     * 添加普通队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     */
    public static <T> boolean addQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.offer(data);
    }

    /**
     * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.poll();
    }

    /**
     * 通用删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removeQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.remove(data);
    }

    /**
     * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.delete();
    }

    /**
     * 添加延迟队列数据 默认毫秒
     *
     * @param queueName 队列名
     * @param data      数据
     * @param time      延迟时间
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
    }

    /**
     * 添加延迟队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     * @param time      延迟时间
     * @param timeUnit  单位
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        delayedQueue.offer(data, time, timeUnit);
    }

    /**
     * 获取一个延迟队列数据 没有数据返回 null
     *
     * @param queueName 队列名
     */
    public static <T> T getDelayedQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.poll();
    }

    /**
     * 删除延迟队列数据
     */
    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.remove(data);
    }

    /**
     * 销毁延迟队列 所有阻塞监听 报错
     */
    public static <T> void destroyDelayedQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        delayedQueue.destroy();
    }

    /**
     * 添加优先队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     */
    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.offer(data);
    }

    /**
     * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.poll();
    }

    /**
     * 优先队列删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.remove(data);
    }

    /**
     * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyPriorityQueue(String queueName) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.delete();
    }

    /**
     * 尝试设置 有界队列 容量 用于限制数量
     *
     * @param queueName 队列名
     * @param capacity  容量
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.trySetCapacity(capacity);
    }

    /**
     * 尝试设置 有界队列 容量 用于限制数量
     *
     * @param queueName 队列名
     * @param capacity  容量
     * @param destroy   已存在是否销毁
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        if (boundedBlockingQueue.isExists() && destroy) {
            destroyQueue(queueName);
        }
        return boundedBlockingQueue.trySetCapacity(capacity);
    }

    /**
     * 添加有界队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     * @return 添加成功 true 已达到界限 false
     */
    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.offer(data);
    }

    /**
     * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.poll();
    }

    /**
     * 有界队列删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.remove(data);
    }

    /**
     * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyBoundedQueue(String queueName) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.delete();
    }

    /**
     * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
     */
    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        if (isDelayed) {
            // 订阅延迟队列
            CLIENT.getDelayedQueue(queue);
        }
        queue.subscribeOnElements(consumer);
    }

}

2. 普通队列

添加数据到队列

@GetMapping("add")
public String add(){
    QueueUtils.addQueueObject("queue:simple",1);
    QueueUtils.addQueueObject("queue:simple",2);
    QueueUtils.addQueueObject("queue:simple",3);

    return "ok";
}

在这里插入图片描述

消费队列数据

遵循先进先出,获取数据后就会删除。如果队列中没有数据,获取到的就为null

@GetMapping("get")
public Integer get(){
    return QueueUtils.getQueueObject("queue:simple");
}

在这里插入图片描述

移除队列数据

@GetMapping("remove")
public String remove(){
    QueueUtils.removeQueueObject("queue:simple",3);
    return "ok";
}

在这里插入图片描述

销毁队列

@GetMapping("destroy")
public String destroy(){
    QueueUtils.destroyQueue("queue:simple");
    return "ok";
}

在这里插入图片描述

订阅队列消息

  • 订阅的消息一般在项目启动的时候使用,只能订阅一次
  • 当监听到队列新增数据的时候会立即取出来进行消费
@PostConstruct
public void sub(){
    QueueUtils.subscribeBlockingQueue("queue:simple",(o)->{
        System.out.println("接收到消息:"+o);
    },false);
}

我们再次调用新增

在这里插入图片描述

3. 有界队列(限制数据量)

设置队列最大容量

有界队列在使用前必须设置容量

@GetMapping("set")
public String set(){
    boolean b = QueueUtils.trySetBoundedQueueCapacity("queue:bound", 10);
    return  "ok";
}

在这里插入图片描述

新增有界队列数据

@GetMapping("add")
public String add(){
    QueueUtils.addBoundedQueueObject("queue:bound",1);

    return "ok";
}

新增完毕后我们可以发现,我们直接设置的最大容量变成来了9。每次添加数据都会查询当前最大容量是否>0,如果大于0添加成功并且减一,否则添加失败

在这里插入图片描述
在这里插入图片描述

获取有界队列数据

@GetMapping("get")
public Integer get(){
    return QueueUtils.getBoundedQueueObject("queue:bound");
}

我们可以看到当获取数据的时候,容量+1,数据从redis中删除

在这里插入图片描述
在这里插入图片描述

其他用法与普通队列类似,就不再演示了

4. 延迟队列(延迟获取数据)

添加延迟数据

延迟队列的实现原理是将数据添加到另一个缓存队列中,当到达指定时间才会转移到普通队列中

@GetMapping("add")
public String add(){
    QueueUtils.addDelayedQueueObject("queue:belay",1,10, TimeUnit.SECONDS);
    return "ok";
}

获取延迟数据

必须达到指定时间后才能获取

@GetMapping("get")
public Integer get(){
    return QueueUtils.getDelayedQueueObject("queue:belay");
}

删除延迟数据

@GetMapping("remove")
public String remove(){
    QueueUtils.removeQueueObject("queue:belay",3);
    return "ok";
}

清空延迟数据

@GetMapping("destroy")
public String destroy(){
    QueueUtils.destroyDelayedQueue("queue:belay");
    return "ok";
}

订阅消息使用方法同普通队列类似,第三个参数需要改为true

5. 优先队列(数据可插队)

插入优先队列的数据我们需要先实现比较接口

@Data
@Accessors(chain = true)
class Order implements Comparable<Order>{

    private Long id;

    @Override
    public int compareTo(Order o) {
        return Long.compare(getId(), o.id);
    }
}

新增优先数据

@GetMapping("add")
public String add(){
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(1L));
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(6L));
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(2L));
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(5L));
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(22L));
   QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(3L));
   return "ok";
}

我们可以看到插入的数据是有序的

在这里插入图片描述

获取优先队列数据

@GetMapping("get")
public Integer get(){
    return QueueUtils.getPriorityQueueObject("queue:priority");
}

删除优先队列数据

@GetMapping("remove")
public String remove(){
    QueueUtils.removeQueueObject("queue:priority",3);
    return "ok";
}

清空优先队列数据

@GetMapping("destroy")
public String destroy(){
    QueueUtils.destroyDelayedQueue("queue:priority");
    return "ok";
}

订阅消息使用方法同普通队列一样

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/484577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Netty - 五种 I/O 多路复用机制 select、poll、epoll、kqueue、iocp(windows) 对比

文章目录 Preselect、poll、epoll、kqueue、iocp(windows) Pre 高性能网络编程 - select、 poll 、epoll 、libevent select、poll、epoll、kqueue、iocp(windows) 这里我将对比一下常见的多路复用技术&#xff1a;select、poll、epoll、kqueue 和 IOCP&#xff08;Windows&a…

分区表索引失效导致业务异常

业务无法正常进行&#xff0c;查看数据库后台进程&#xff0c;发现有大量阻塞 QL_ID WAIT_CLASS EVENT ------------- --------------- ------------------------- 1cpk7srb6cr0r User I/O db file scattered read 279knu21n06x6…

音视频开发之旅(78)- Docker使用和交互流程

目录 1.Docker是什么 2.DockerFile的使用 3.常用命令 4.Docker和Web服务的交互流程 5.资料 一、Docker是什么 Docker通过轻量级的容器化技术&#xff0c;使得应用程序及其依赖可以打包在一个可移植的容器中运行&#xff0c;确保应用在不同环境下的一致性和效率。 1.1 核心…

中断(NVIC)的使用--EXTI--TIM

目录 中断是什么 轮询 中断 中断调用情况 中断的分类 内部中断&#xff08;TIM、UART等&#xff09; tim.c tim.h 外部中断EXTI exti.c exti.h 中断是什么 在处理事件的时候有两种方式&#xff1a;轮询和中断。 轮询 顾名思义&#xff0c;就是每轮都询问一次。比如…

结构体类型详细讲解(附带枚举,联合)

前言&#xff1a; 如果你还对结构体不是很了解&#xff0c;那么本篇文章将会从 为什么存在结构体&#xff0c;结构体的优点&#xff0c;结构体的定义&#xff0c;结构体的使用与结构体的大小依次介绍&#xff0c;同样会附带枚举与联合体 目录 为什么存在结构体&#xff1a; 结构…

毕业设计:日志记录编写(3/17起更新中)

目录 3/171.配置阿里云python加速镜像&#xff1a;2. 安装python3.9版本3. 爬虫技术选择4. 数据抓取和整理5. 难点和挑战 3/241.数据库建表信息2.后续进度安排3. 数据处理和分析 3/17 当前周期目标&#xff1a;构建基本的python环境&#xff1a;运行爬虫程序 1.配置阿里云pytho…

【C++】如何用一个哈希表同时封装出unordered_set与unordered_map

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.哈希桶源码 2.哈希…

(三维重建学习)已有位姿放入colmap和3D Gaussian Splatting训练

这里写目录标题 一、colmap解算数据放入高斯1. 将稀疏重建的文件放入高斯2. 将稠密重建的文件放入高斯 二、vkitti数据放入高斯 一、colmap解算数据放入高斯 运行Colmap.bat文件之后&#xff0c;进行稀疏重建和稠密重建之后可以得到如下文件结构。 1. 将稀疏重建的文件放入高…

windows10 WSL启动Ubuntu虚拟机,安装DolphinScheduler

文章目录 1. 启动WSL与虚拟机2. 安装Docker与DolphinScheduler容器 1. 启动WSL与虚拟机 使用管理员权限运行命令&#xff1a; Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows-Subsystem-Linux重启后即可创建虚拟机 在Microsoft Store中搜索Ubuntu&…

Wear-Any-Way——可控虚拟试衣一键试穿,可自定义穿着方式

概述 Wear-Any-Way 是阿里巴巴最新推出的虚拟试衣技术&#xff0c;它不仅可以让用户在虚拟环境中试穿衣服&#xff0c;还可以根据需要自定义衣服的样式&#xff0c;比如卷起袖子、打开或拖动外套等。这种技术的引入旨在帮助消费者更好地了解衣服在不同穿着方式下的效果&#x…

一个python实现的kline-chart图表程序(二)

前面一中简单介绍了kline-chart的图表程序&#xff0c;实际上这个程序最主要的功能不是显示K线&#xff0c;因为显示K线的程序太多了&#xff0c;没必要专门重写&#xff0c;这个程序最主要的功能是根据需要显示包含K线在内的各种指标&#xff0c;自己算的指标&#xff0c;或是…

plSql 大批量数据导入到表中

主要2种思路&#xff0c;一为insert插入sql&#xff0c;二是借助plsql提供的工具 insert语句odbc importer/导入器 insert语句 把要插入的数据转为insert语句&#xff0c;直接复制到plsql的sql窗口&#xff0c;运行即可&#xff1b;或者在命令行窗口回车键&#xff0c;选择要执…

使用 RunwayML 对图像进行 Camera 操作

RunwayML 是一個功能強大的平台&#xff0c;可以讓您使用 AI 和机器学习来增强您的图像和视频。 它提供一系列预训练模型&#xff0c;可用于各种任务&#xff0c;包括图像编辑、风格化和特效。 在本文中&#xff0c;我们将介绍如何使用 RunwayML 对图像进行 Camera 操作。我们…

[AIGC] SQL中的数据添加和操作:数据类型介绍

SQL&#xff08;结构化查询语言&#xff09;作为一种强大的数据库查询和操作工具&#xff0c;它能够完成从简单查询到复杂数据操作的各种任务。在这篇文章中&#xff0c;我们主要讨论如何在SQL中添加&#xff08;插入&#xff09;数据&#xff0c;以及在数据操作过程中&#xf…

数据结构(五)——树森林

5.4 树和森林 5.4.1 树的存储结构 树的存储1&#xff1a;双亲表示法 用数组顺序存储各结点&#xff0c;每个结点中保存数据元素、指向双亲结点(父结点)的“指针” #define MAX_TREE_SIZE 100// 树的结点 typedef struct{ElemType data;int parent; }PTNode;// 树的类型 type…

学习或复习电路的game推荐:nandgame(NAND与非门游戏)、Turing_Complete(图灵完备)

https://www.nandgame.com/ 免费 https://store.steampowered.com/app/1444480/Turing_Complete/ 收费&#xff0c;70元。据说可以导出 Verilog &#xff01;

关于安卓调用文件浏览器(一)打开并复制

背景 最近在做一个硬件产品&#xff0c;安卓应用开发。PM抽风&#xff0c;要求从app打开文件浏览器&#xff0c;跳转到指定目录&#xff0c;然后可以实现文件复制粘贴操作。 思考 从应用开发的角度看&#xff0c;从app打开系统文件浏览器并且选择文件&#xff0c;这是很常见…

馆室一体化查档平台制度有哪些

馆室一体化查档平台制度是指图书馆或档案馆在数字化和信息化的背景下&#xff0c;建立起的集查阅、借阅、咨询、文献传递等多项功能于一体的平台制度。下面是一些常见的馆室一体化查档平台制度&#xff1a; 1. 馆藏管理制度&#xff1a;包括图书和档案的采购、编目、分类、整理…

那些王道书里的题目-----计算机网络篇

注&#xff1a;仅记录个人认为有启发的题目 p155 34.下列四个地址块中&#xff0c;与地址块 172.16.166.192/26 不重叠&#xff0c;且与172.16.166.192/26聚合后的地址块不会引入多余地址的是&#xff08;&#xff09; A.172.16.166.192/27 B.172.16.166.128/26 …

day06vue2学习

day06 路由的封装抽离 问题&#xff1a;所有的路由配置都堆在main.js中不太合适么&#xff1f;不好&#xff0c;会加大代码的复杂度 目标&#xff1a;将路由模块抽离出来。好处&#xff1a;差分模块&#xff0c;利于维护。 大致的做法就是&#xff0c;将路由相关的东西都提…