项目收获总结--Redis的知识收获

一、概述

最近几天公司项目开发上线完成,做个收获总结吧~ 今天记录Redis的收获和提升。
在这里插入图片描述

二、Redis异步队列

Redis做异步队列一般使用 list 结构作为队列,rpush 生产消息,lpop 消费消息。当 lpop 没有消息的时候,要适当sleep再重试。若不用 sleep,list 还有个指令叫 blpop,在没有消息的时候,它会阻塞队列直到消息到来。

若要生产一次消费多次的需求,可以使用 pub/sub 主题订阅者模式,可以实现 1:N 的消息队列。但是缺点在消费者下线的情况下,生产的消息会丢失,还得使用专业的消息队列如 RabbitMQ等。

若要redis实现延时队列,使用 sortedset,拿时间戳作为score,消息内容作为 key 调用 zadd 来生产消息,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。

三、Redis分布式锁

核心思路是:先用setnx来争抢锁,抢到之后,再用 expire 给锁加一个过期时间防止锁忘记释放。若在 setnx 之后执行expire之前进程意外崩溃或者要重启维护,导致锁永远得不到释放,就要采用 set 指令配合非常复杂的参数,可以同时把 setnx 和 expire 合成一条指令来用:

SET key value NX EX 10
3.1 Redis分布式锁的实现

pom.xml引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>

配置Redis:

 
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.password=123456
spring.redis.timeout=10000
 
# 设置jedis连接池
spring.redis.jedis.pool.max-active=50
spring.redis.jedis.pool.min-idle=20

配置RedisConfig属性:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
 
@Configuration
public class RedisConfig {
 
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws Exception {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 3.创建 序列化类
        GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(genericToStringSerializer);
//        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

RedisLock工具类:

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
@Slf4j
@Component
public class RedisLock {
    @Resource
    private RedisTemplate redisTemplate;
    
    private static Map<String, LockInfo> lockInfoMap = new ConcurrentHashMap<>();
    private static final Long SUCCESS = 1L;
    
    @Data
    public static class LockInfo {
        private String key;
        private String value;
        private int expireTime;
        //更新时间
        private long renewalTime;
        //更新间隔
        private long renewalInterval;
        public static LockInfo getLockInfo(String key, String value, int expireTime) {
            LockInfo lockInfo = new LockInfo();
            lockInfo.setKey(key);
            lockInfo.setValue(value);
            lockInfo.setExpireTime(expireTime);
            lockInfo.setRenewalTime(System.currentTimeMillis());
            lockInfo.setRenewalInterval(expireTime * 2000 / 3);
            return lockInfo;
        }
    }
    
    /**
     * Lua脚本
     * // 加锁
     * if
     *     redis.call('setNx',KEYS[1],ARGV[1])
     *   then
     *     if redis.call('get',KEYS[1])==ARGV[1]
     *     return redis.call('expire',KEYS[1],ARGV[2])
     *   else
     *     return 0
     *   end
     * end
     *
     * // 解锁
     *   redis.call('get', KEYS[1]) == ARGV[1]
     * then
     *   return redis.call('del', KEYS[1])
     * else
     *   return 0
     *
     *   //更新时间
     *   if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end
     */
     
    /**
     * 使用lua脚本加锁
     *
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @Desc 注意事项,redisConfig配置里面必须使用 genericToStringSerializer序列化,否则获取不了返回值
     */
    public boolean tryLock(String lockKey, String value, int expireTime) {
 
        String luaScript = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
 
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        //Object result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey),value,expireTime + "");
        // Object result = redisTemplate.execute(redisScript, new StringRedisSerializer(), new StringRedisSerializer(), Collections.singletonList(lockKey), identity, expireTime);
 
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("已获取到{}对应的锁!", lockKey);
 
        if (expireTime >= 10) {
            lockInfoMap.put(lockKey + value, LockInfo.getLockInfo(lockKey, value, expireTime));
        }
 
        return (boolean) result;
    }
 
    /**
     * 使用lua脚本释放锁
     *
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock(String lockKey, String value) {
 
        lockInfoMap.remove(lockKey + value);
 
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, value);
        log.info("解锁成功:{}", result);
        return (boolean) result;
    }
 
 
    /**
     * 使用lua脚本更新redis锁的过期时间
     *
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean renewal(String lockKey, String value, int expireTime) {
 
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("更新redis锁的过期时间:{}", result);
        return (boolean) result;
    }
 
 
    /**
     * redisTemplate加锁
     *
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @return 成功返回true, 失败返回false
     */
    public boolean lock(String lockKey, String value, long expireTime) {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
    }
 
    /**
     * redisTemplate解锁
     *
     * @param key
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock2(String key, String value) {
        Object currentValue = redisTemplate.opsForValue().get(key);
        boolean result = false;
        if (StringUtils.isNotEmpty(String.valueOf(currentValue)) && currentValue.equals(value)) {
            result = redisTemplate.opsForValue().getOperations().delete(key);
        }
        return result;
    }
 
 
    /**
     * 定时去检查redis锁的过期时间
     * @Param
     * @Return
     */
    @Scheduled(fixedRate = 5000L)
    @Async("redisExecutor")
    public void renewal() {
        long now = System.currentTimeMillis();
        for (Map.Entry<String, LockInfo> lockInfoEntry : lockInfoMap.entrySet()) {
            LockInfo lockInfo = lockInfoEntry.getValue();
            if (lockInfo.getRenewalTime() + lockInfo.getRenewalInterval() < now) {
                renewal(lockInfo.getKey(), lockInfo.getValue(), lockInfo.getExpireTime());
                lockInfo.setRenewalTime(now);
                log.info("lockInfo {}", JSON.toJSONString(lockInfo));
            }
        }
    }
 
    /**
     * 分布式锁设置单独线程池
     * @Param
     * @Return
     */
    @Bean("redisExecutor")
    public Executor redisExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("redis-renewal-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        return executor;
    }
}

开启定时任务:在启动类上加 @EnableScheduling 注解

@SpringBootApplication
@MapperScan(value = "com.example.recordlog.mapper")
@EnableAspectJAutoProxy(proxyTargetClass = true)
//开启定时任务
@EnableScheduling
public class RecordLogApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(RecordLogApplication.class, args);
    }
}

Controller测试接口:

 
 
import com.example.recordlog.tools.RedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.List;
import java.util.UUID;
 
@RestController
@RequestMapping("/api")
public class OperateController {

@Autowired
private RedisLock redisLock;

@RequestMapping(value = "/locked", method = {RequestMethod.GET})
public void lockedTest() {
        String key = String.format("data-mining:task_statistic:%d", System.currentTimeMillis());
        String requestId = UUID.randomUUID().toString();
 
        boolean locked = false;
        try {
            locked = redisLock.tryLock(key, requestId, 30);
            if (!locked) {
                return;
            }
            //执行业务逻辑
            System.out.println("---------------->>>执行业务逻辑");
        } finally {
            if (locked) {
                redisLock.unlock(key, requestId);
            }
        }
    }
}
3.2 锁过期问题

上述代码提到使用lua脚本处理锁更新,就顺带记录当Redis 分布式锁过期但任务未完成时的三种处理方案。

锁过期问题是:在使用Redis分布式锁时锁恰好到过期时间,但业务逻辑还没有处理完毕,这可能导致多个进程同时进入临界区,造成数据不一致或业务逻辑冲突、资源浪费等其他问题。
假设分布式锁使用的简单逻辑如下:

获取锁:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

public class RedisLock {
    private Jedis jedis;
    private String lockKey;
    private int lockExpire;

    public RedisLock(Jedis jedis, String lockKey, int lockExpire) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockExpire = lockExpire;
    }

    public boolean tryLock(String requestId) {
        SetParams params = new SetParams();
        params.nx().px(lockExpire);
        String result = jedis.set(lockKey, requestId, params);
        return "OK".equals(result);
    }

    public void unlock(String requestId) {
        String script =
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                "return redis.call('del', KEYS[1]) " +
                "else return 0 end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
    }
}

使用锁:


Jedis jedis = new Jedis("localhost");
RedisLock redisLock = new RedisLock(jedis, "my_lock", 5000);

String requestId = UUID.randomUUID().toString();
if (redisLock.tryLock(requestId)) {
    try {
        // 业务逻辑
        Thread.sleep(6000); // 模拟业务逻辑处理时间超过锁过期时间
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        redisLock.unlock(requestId);
    }
} else {
    System.out.println("获取锁失败");
}

解决方案:
(1)自动续期:当锁接近过期时,自动延长锁的过期时间,确保业务逻辑在锁持有期间不会被其他进程获取。使用 ScheduledExecutorService 定期检查并延长锁的过期时间,确保锁在业务逻辑处理完之前不会过期。提供简单代码思路:

public class RedisLockWithRenewal extends RedisLock {
    private ScheduledExecutorService scheduler;

    public RedisLockWithRenewal(Jedis jedis, String lockKey, int lockExpire) {
        super(jedis, lockKey, lockExpire);
        scheduler = Executors.newScheduledThreadPool(1);
    }

    @Override
    public boolean tryLock(String requestId) {
        boolean locked = super.tryLock(requestId);
        if (locked) {
            startAutoRenewal(requestId);
        }
        return locked;
    }

    private void startAutoRenewal(String requestId) {
        scheduler.scheduleAtFixedRate(() -> {
            String script =
                    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                    "else return 0 end";
            jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(lockExpire)));
        }, lockExpire / 3, lockExpire / 3, TimeUnit.MILLISECONDS);
    }

    @Override
    public void unlock(String requestId) {
        scheduler.shutdown();
        super.unlock(requestId);
    }
}

(2)锁重入机制:锁重入机制允许持有锁的线程可以再次获取锁而不被阻塞,这可以通过记录锁持有者的信息来实现。同一线程可以多次获取锁,直到所有业务逻辑执行完毕,最后一次调用 unlock 时才真正释放锁。

import java.util.concurrent.ConcurrentHashMap;

public class ReentrantRedisLock extends RedisLock {
    private ConcurrentHashMap<String, Integer> lockHolderMap = new ConcurrentHashMap<>();

    public ReentrantRedisLock(Jedis jedis, String lockKey, int lockExpire) {
        super(jedis, lockKey, lockExpire);
    }

    @Override
    public synchronized boolean tryLock(String requestId) {
        if (lockHolderMap.containsKey(requestId)) {
            lockHolderMap.put(requestId, lockHolderMap.get(requestId) + 1);
            return true;
        } else {
            boolean locked = super.tryLock(requestId);
            if (locked) {
                lockHolderMap.put(requestId, 1);
            }
            return locked;
        }
    }

    @Override
    public synchronized void unlock(String requestId) {
        if (lockHolderMap.containsKey(requestId)) {
            int count = lockHolderMap.get(requestId);
            if (count > 1) {
                lockHolderMap.put(requestId, count - 1);
            } else {
                lockHolderMap.remove(requestId);
                super.unlock(requestId);
            }
        }
    }
}

(3)使用 Redisson 库:Redisson是Redis客户端,提供分布式锁的实现,支持自动续期、锁重入等高级特性,简化分布式锁的使用。

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissonLockExample {
    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient = Redisson.create(config);

        RLock lock = redissonClient.getLock("my_lock");
        try {
            if (lock.tryLock(0, 10, TimeUnit.SECONDS)) {
                try {
                    // 业务逻辑
                    Thread.sleep(6000); // 模拟业务逻辑处理时间
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println("获取锁失败");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redissonClient.shutdown();
        }
    }
}
四、Redis不立刻删除已过期数据的原因

比较有印象的经历,曾经尝试监听redis key过期来实现延迟消息发送,后来发现这个延迟非常不稳定,明明key已经过期,但还是没有触发,后来了解到过期key不会立马删除…这个问题需要对Redis 内存管理有了解,下面通过四个问题来考虑吧!

4.1 Redis 给缓存数据设置过期时间有什么用?

内存是有限且珍贵的,如果不对缓存数据设置过期时间,那内存占用就会一直增长,最终可能会导致 OOM 问题。通过设置合理的过期时间,Redis 会自动删除暂时不需要的数据,为新的缓存数据腾出空间。

Redis 自带给缓存数据设置过期时间的功能,比如:

127.0.0.1:6379> expire key 60 # 数据在 60s 后过期
(integer) 1
127.0.0.1:6379> setex key 60 value # 数据在 60s 后过期 (setex:[set] + [ex]pire)
OK
127.0.0.1:6379> ttl key # 查看数据还有多久过期
(integer) 56

Redis 中除字符串类型是独有设置过期时间的命令 setex 外,其他类型都需要依靠 expire 命令来设置过期时间 。另外, persist 命令可以移除一个键的过期时间。

当然,除缓解内存的消耗,还有其他用途,很多时候,业务场景就是需要某个数据只在某一时间段内存在,比如短信验证码可能只在 1 分钟内有效,用户登录的 Token 可能只在 1 天内有效,秒杀系统等。

若用传统的数据库来处理的话,基本是SQL判断过期,更麻烦且性能很差。

4.2 Redis 如何判断数据是否过期?

Redis 通过过期字典(可看作是 hash 表)来保存数据过期的时间。过期字典的键指向 Redis 数据库中的某个 key(键),过期字典的值是一个 long long 类型的整数,这个整数保存 key 所指向的数据库键的过期时间(毫秒精度的 UNIX 时间戳)。
在这里插入图片描述
过期字典是存储在 redisDb 这个结构里的:

typedef struct redisDb {
    ...
    dict *dict;     //数据库键空间,保存着数据库中所有键值对
    dict *expires   // 过期字典,保存着键的过期时间
    ...
} redisDb;

在查询一个 key 的时候,Redis 首先检查该 key 是否存在于过期字典中(时间复杂度为 O(1)),如果不在就直接返回,在的话需要判断一下这个 key 是否过期,过期直接删除 key 然后返回 null。

4.3 Redis 过期 key 删除策略?

如果设置一批 key 只能存活 1 分钟,那1分钟后,Redis 对这批 key 进行删除,就需要遵循删除策略。常用的过期数据的删除策略有四种:

1)惰性删除:只会在取出/查询 key 的时候才对数据进行过期检查。这种方式对CPU最友好,但是可能会造成太多过期key没有被删除。
(2)定期删除:周期性地随机从设置过期时间的 key 中抽查一批,然后逐个检查这些 key 是否过期,过期就删除 key。相比于惰性删除,定期删除对内存更友好,对 CPU 不太友好。
(3)延迟队列:把设置过期时间的 key 放到一个延迟队列里,到期之后就删除 key。这种方式可以保证每个过期 key 都能被删除,但维护延迟队列太麻烦,队列本身也要占用资源。
(4)定时删除:每个设置过期时间的 key 都会在设置的时间到达时立即被删除。这种方法可以确保内存中不会有过期的键,但是它对 CPU 的压力最大,因为它需要为每个键都设置一个定时器。

Redis 采用的是 定期删除+惰性/懒汉式删除 结合的策略,这也是大部分缓存框架的选择。定期删除对内存更加友好,惰性删除对 CPU 更加友好。两者各有优势,结合起来使用既能兼顾 CPU 友好,又能兼顾内存友好。

Redis 的定期删除过程是随机的(周期性地随机从设置过期时间的 key 中抽查一批),也因此并不保证所有过期键都会被立即删除。这也就解释为什么有的 key 已经过期,但并没有被删除。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。
另外,定期删除还受到执行时间和过期 key 的比例的影响:

  • 执行时间已经超过阈值,那就中断本次定期删除循环,避免使用过多占用CPU 。
  • 如果这批过期 key比例超过一个比例,就会重复执行此删除流程,更积极地清理过期 key。相应地,若过期的 key 比例低于这个比例,就会中断这次定期删除循环,避免做过多的工作而获得很少的内存回收。

查看源码所知:Redis7.2版本的执行时间阈值是25ms,过期key比例设定值是 10%

#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which we do extra efforts. */

每次随机抽查数量在expire.c中定义,Redis 7.2 版本为 20 ,即每次会随机选择 20 个已设置过期时间的 key 判断是否过期。

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */

那如何控制定期删除的执行频率?

在 Redis 中,定期删除的频率是由 hz 参数控制的。hz 默认为 10,代表每秒执行 10 次,也就是每秒钟进行 10 次尝试来查找并删除过期的 key

hz 的取值范围为 1~500。增大 hz 参数的值会提升定期删除的频率。如果想要更频繁地执行定期删除任务,可以适当增加 hz 的值,但这会加 CPU 的使用率。根据 Redis 官方建议,hz 的值不建议超过100,其实使用默认的 10 就已足够。

下面是 hz 参数的官方注释(Redis 7.2 版本redis.conf中)的重要信息。
在这里插入图片描述
类似的参数还有 “dynamic-hz”(redis.conf中),这个参数开启之后 Redis 就会在 hz 的基础上动态计算一个值。Redis 提供并默认启用使用自适应 hz 值的能力:

# 默认为 10
hz 10
# 默认开启
dynamic-hz yes

另外:hz 参数控制着定期删除过期 key 的定期任务之外,还控制一些其他定期任务例如关闭超时的客户端连接、更新统计信息等。

最后,定期删除不是把所有过期 key 都删除, 若key 数量非常庞大的话,挨个遍历检查是非常耗时的,会严重影响性能。Redis 设计这种策略的目的就是平衡内存和性能。
而 key 过期之后不立马删除是因为成本太高不太好办到,就算使用延迟队列作为删除策略,这样存在问题:

1)队列本身的开销可能很大:key 多的情况下,一个延迟队列可能无法容纳。
(2)维护延迟队列太麻烦:修改 key 的过期时间就需要调整期在延迟队列中的位置,并且,还需要引入并发控制。
4.4 如何防止大量 key 集中过期?

如果存在大量 key 集中过期的问题,可能会使 Redis 的请求延迟变高。可选方案:

1)尽量避免 key 集中过期,在设置键的过期时间时尽量随机一点。
(2)对过期的 key 开启 lazyfree 机制(修改 redis.conf 中的 lazyfree-lazy-expire参数即可),这样会在后台异步删除过期的 key,不会阻塞主线程的运行。
五、Redis bigkey处理

若key 对应的 value 所占用的内存比较大,那这个 key 就可以看作是 bigkey。
在这里插入图片描述

String 类型的 value 超过 1MB
复合类型(List、Hash、Set、Sorted Set 等)的 value 包含的元素超过 5000

bigkey会引发:客户端超时阻塞、网络阻塞、工作线程阻塞

1.使用 Redis 自带的 --bigkeys 参数来查找

# redis-cli -p 6379 --bigkeys

# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).

[00.00%] Biggest string found so far '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' with 4437 bytes
[00.00%] Biggest list   found so far '"my-list"' with 17 items

-------- summary -------

Sampled 5 keys in the keyspace!
Total key length in bytes is 264 (avg len 52.80)

Biggest   list found '"my-list"' has 17 items
Biggest string found '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' has 4437 bytes

1 lists with 17 items (20.00% of keys, avg size 17.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
4 strings with 4831 bytes (80.00% of keys, avg size 1207.75)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
0 sets with 0 members (00.00% of keys, avg size 0.00)
0 zsets with 0 members (00.00% of keys, avg size 0.00

2.使用 Redis 自带的 SCAN 命令
3.借助开源工具分析 RDB 文件
4.借助公有云的 Redis 分析服务

处理方案:

(1)分割 bigkey:将一个 bigkey 分割为多个小 key。例如,将一个含有上万字段数量的 Hash 按照一定策略(比如二次哈希)拆分为多个 Hash。
(2)手动清理:Redis 4.0+ 可以使用 UNLINK 命令来异步删除一个或多个指定的 key。Redis 4.0 以下可以考虑使用 SCAN 命令结合 DEL 命令来分批次删除。
(3)采用合适的数据结构:例如,文件二进制数据不使用 String 保存、使用 HyperLogLog 统计页面 UV、Bitmap 保存状态信息(0/1)。
(4)开启 lazy-free(惰性删除/延迟释放) :lazy-free 特性是 Redis 4.0 开始引入的,指的是让 Redis 采用异步方式延迟释放 key 使用的内存,将该操作交给单独的子线程处理,避免阻塞主线程。
六、Redis持久化机制

使用缓存的时候,需要对内存中的数据进行持久化,也就是将内存中的数据写入到硬盘中。主要是为之后重用数据(比如重启机器、机器故障之后恢复数据),或者是为做数据同步(比如 Redis 集群的主从节点通过 RDB 文件同步数据)。
Redis 支持持久化方式:

快照(snapshotting,RDB)
只追加文件(append-only file, AOF)
RDB和AOF混合持久化(Redis4.0新增,通过配置项 aof-use-rdb-preamble 开启,AOF 重写的时候就直接把 RDB 的内容写到 AOF 文件开头) 
6.1 RDB持久化

通过创建快照来获得存储在内存里面的数据在 某个时间点 上的副本,然后将快照复制到其他服务器从而创建具有相同数据的服务器副本(Redis 主从结构,主要用来提高 Redis 性能),也可将快照留存本地以便重启服务器时使用。

快照持久化是 Redis 默认采用的持久化方式,在 redis.conf 配置文件中配置:

save 900 1           #在900(15分钟)之后,如果至少有1个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 300 10          #在300(5分钟)之后,如果至少有10个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 60 10000        #在60(1分钟)之后,如果至少有10000个key发生变化,Redis就会自动触发bgsave命令创建快照。

Redis 提供两个命令来生成 RDB 快照文件:

save : 同步保存操作,会阻塞 Redis 主线程(Redis启动后是通过单线程的方式工作);
bgsave : fork 出一个子进程,子进程执行,不会阻塞 Redis 主线程,默认选项。
6.2 AOF持久化

AOF持久化的实时性比RDB好,通过 appendonly 参数开启:

appendonly yes

开启 AOF 持久化后每执行一条会更改 Redis 中的数据的命令,Redis 就会将该命令写入到 AOF 缓冲区 server.aof_buf 中,然后再写入到 AOF 文件中(此时还在系统内核缓存区未同步到磁盘,依然存在数据丢失的风险),最后再根据持久化方式( fsync策略)的配置来决定何时将系统内核缓存区的数据同步到硬盘中完成持久化保存。

AOF 文件的保存位置和 RDB 文件的位置相同,都通过 dir 参数设置,默认的文件名是 appendonly.aof。

6.3 AOF工作流程

AOF 持久化功能的实现可以简单分为 5 步:

1)命令追加(append):所有的写命令会追加到 AOF 缓冲区中。
(2)文件写入(write):将 AOF 缓冲区的数据写入到 AOF 文件中。这一步需要调用write函数(系统调用),write将数据写入系统内核缓冲区之后直接返回(延迟写)。注意此时并没有同步到磁盘。
(3)文件同步(fsync):AOF 缓冲区根据对应的持久化方式( fsync 策略)向硬盘做同步操作。这一步需要调用 fsync 函数(系统调用), fsync 针对单个文件操作,对其进行强制硬盘同步,fsync 将阻塞直到写入磁盘完成后返回,保证了数据持久化。
(4)文件重写(rewrite):随着 AOF 文件越来越大,需要定期对 AOF 文件进行重写,达到压缩的目的。
(5)重启加载(load):当 Redis 重启时,可以加载 AOF 文件进行数据恢复。

名词解释:

1)系统调用(syscall):Linux系统直接提供用于对文件和设备进行访问和控制的函数;
(2)write:写入系统内核缓冲区之后直接返回(仅是写到缓冲区),不会立即同步到硬盘。虽然提高效率,但也有数据丢失的风险。同步硬盘操作通常依赖于系统调度机制,Linux 内核通常为 30s 同步一次,具体值取决于写出的数据量和 I/O 缓冲区的状态。
(3)fsync:fsync用于强制刷新系统内核缓冲区(同步到到磁盘),确保写磁盘操作结束才会返回。

AOF 工作流程表示为:
在这里插入图片描述

6.4 AOF持久化方式

Redis的配置文件中存在三种不同的 AOF 持久化方式( fsync策略),主要区别在于 fsync 同步 AOF 文件的时机(刷盘)。

1)appendfsync always:主线程调用 write 执行写操作后,后台线程(aof_fsync线程)立即
    调用fsyn 函数同步AOF文件(刷盘),fsync完成后线程返回,这样会严重降低Redis的性能(write+fsync).2)appendfsync everysec:主线程调用write执行写操作后立即返回,由后台线程(aof_fsync线程)每秒钟
    调用fsync函数(系统调用)同步一次 AOF文件(write+fsync,fsync间隔为1秒)
    
(3)appendfsync no:主线程调用write执行写操作后立即返回,让操作系统决定何时进行同步,
    Linux下一般为30秒一次(write但不fsync,fsync的时机由操作系统决定)

因此要兼顾数据和写入性能,可以考虑 appendfsync everysec 策略 ,让Redis每秒同步一次 AOF 文件,Redis 性能受到的影响较小。而且这样即使出现系统崩溃,最多只会丢失一秒之内产生的数据。当硬盘忙于执行写入操作时,Redis 还会适当放慢速度以适应硬盘的最大写入速度。

ps:有个了解知识点,详情参考:《Redis 7.0 Multi Part AOF 的设计和实现》

从Redis7.0开始,Redis使用Multi Part AOF机制,将原来的单个AOF文件拆分成多个AOF文件。在Multi Part AOF中,AOF文件被分为三种类型,分别为:
(1)BASE:表示基础AOF文件,一般由子进程通过重写产生,该文件最多只有一个。
(2)INCR:表示增量AOF文件,一般会在AOFRW开始执行时被创建,该文件可能存在多个。
(3)HISTORY:表示历史AOF文件,它由BASE和INCR AOF整合,每次AOFRW成功完成时,本次 AOFRW 前对应的BASE和INCR AOF 将变为HISTORY,HISTORY类型的AOF会被Redis自动删除。

相关 issue:Redis 的 AOF 方式 #783

6.5 AOF日志记录

关系型数据库如 MySQL,通常都是执行命令之前记录日志,方便故障恢复,而 Redis AOF 持久化机制是在执行完命令之后再记录日志。
在这里插入图片描述
后记录日志的原因:

避免额外的检查开销,AOF记录日志不会对命令进行语法检查;
在命令执行完之后再记录,不会阻塞当前的命令执行。

当然,前面提到过风险:

如果刚执行完命令 Redis 就宕机会导致对应的修改丢失;
可能会阻塞后续其他命令的执行(AOF 记录日志是在 Redis 主线程中进行的)。
6.6 AOF重写

当AOF存储量太大时,Redis能够在后台自动重写AOF产生一个新AOF文件,这个新AOF文件和原本AOF文件所保存的数据库状态一样,但体积更小。

Redis 将AOF重写程序放到子进程里执行,避免大量的写入操作对Redis正常处理命令请求造成影响,这个重写操作是redis通过读取数据库中的键值对来实现的,与Java描述的重写不同。
在这里插入图片描述
AOF文件重写期间,Redis还会维护一个AO 重写缓冲区,该缓冲区会在子进程创建新 AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新AOF文件末尾,使得新AOF件保存的数据库状态与现有的数据库状态一致。最后,服务器用新AOF文件替换旧的AOF文件,来完成AOF文件重写操作。

开启AOF重写功能,可以调用BGREWRITEAOF命令手动执行,也可设置两个配置项,让程序自动决定触发时机:

auto-aof-rewrite-min-size:如果 AOF 文件大小小于该值,则不会触发 AOF 重写。默认值为 64 MB;
auto-aof-rewrite-percentage:执行 AOF 重写时,当前 AOF 大小(aof_current_size)和上一次重写时 AOF 大小(aof_base_size)的比值。如果当前 AOF 文件大小增加了这个百分比值,将触发 AOF 重写。
                            将此值设置为 0 将禁用自动 AOF 重写。默认值为 100

AOF重写机制优化改进可参考:《从 Redis7.0 发布看 Redis 的过去与未来》

6.7 AOF校验机制

AOF校验机制是Redis在启动时对AOF文件进行检查,判断文件是否完整,是否有损坏或者丢失的数据。通过使用校验和(checksum) 对整个 AOF 文件内容进行 CRC64 算法计算得出的数字来验证 AOF 文件。如果文件内容发生变化,那么校验和也会随之改变。因此,Redis 在启动时会比较计算出的校验和与文件末尾保存的校验和(计算的时候会把最后一行保存校验和的内容忽略),从而判断 AOF 文件是否完整。若发现文件有问题,Redis就会拒绝启动并提供相应的错误信息。AOF校验机制简单有效提高 Redis 数据的可靠性。

类似地,RDB 文件也有类似的校验机制来保证 RDB 文件的正确性。

6.8 如何选择 RDB 和 AOF
1)Redis保存的数据丢失一些也没什么影响的话,可以选择使用RDB。
(2)不建议单独使用AOF,因为随时创建一个RDB快照可以进行数据库备份、更快的重启以及解决AOF引擎错误。
(3)如果保存的数据要求安全性比较高的话,建议同时开启RDB和AOF两种持久化或者开启RDB和AOF混合持久化。

其实,了解计算机底层原理的都知道宕机数据丢失无法避免,要么是以性能很差的方式来解决。没有完美的方案只有均衡的方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/788522.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】进程(9):进程控制2(进程等待)

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;9&#xff09;进程控制2&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一. 为什么要进程等待二. 如何进行进程等待1.wait函数—…

Linux udp编程

我最近开了几个专栏&#xff0c;诚信互三&#xff01; > |||《算法专栏》&#xff1a;&#xff1a;刷题教程来自网站《代码随想录》。||| > |||《C专栏》&#xff1a;&#xff1a;记录我学习C的经历&#xff0c;看完你一定会有收获。||| > |||《Linux专栏》&#xff1…

洛谷 数学进制 7.9

P1100 高低位交换 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 代码一 #include<bits/stdc.h> using namespace std; typedef long long ll; #define IOS ios::sync_with_stdio(0),cin.tie(0),cout.tie(0)const ll N1e510; char a[N];int main() {IOS;ll a;int b[32]…

一、YOLO V10安装、使用、训练大全

YOLO V10安装、使用、训练大全 一、下载官方源码二、配置conda环境三、安装YOLOV10依赖四、使用官方YOLO V10模型1.下载模型2.使用模型2.1 图片案例 五、制作数据集1.数据集目录结构2.标注工具2.1 安装标注工具2.2 运行标注工具2.3 设置自动保存2.4 切换yolo模式2.5 开始标注2.…

Mosh|内连接、外连接、左连接、右连接(未完)

下图取自菜鸟教程&#xff0c;侵权删&#xff5e; 一、内连接&#xff1a;Inner Joins 模版&#xff1a;SELECT * FROM A JOIN B ON 条件 含义&#xff1a;返回A与B的交集&#xff0c;列为AB列之和 练习&#xff1a;将order_items表和products表连接&#xff0c;返回产品id和…

Qt:12.输入类控件(QSpinBox-整数值输入的小部件、QDateEdit、QTimeEdit、QDateTimeEdit- 日期和时间输入的控件)

目录 一、QSpinBox-整数值输入的小部件&#xff1a; 1.1QSpinBox介绍&#xff1a; 1.2属性介绍&#xff1a; 1.3通用属性介绍&#xff1a; 1.4信号介绍&#xff1a; 二、QDateEdit、QTimeEdit、QDateTimeEdit- 日期和时间输入的控件&#xff1a; 2.1QDateEdit、QTimeEdit…

文件操作和IO流(Java版)

前言 我们无时无刻不在操作文件。可以说&#xff0c;我们在电脑上能看到的图片、视频、音频、文档都是一个又一个的文件&#xff0c;我们需要从文件中读取我们需要的数据&#xff0c;将数据运算后也需要将结果写入文件中长期保存。可见文件的重要性&#xff0c;今天我们就来简…

Gemma2——Google 新开源大型语言模型完整应用指南

0.引言 Gemma 2以前代产品为基础&#xff0c;提供增强的性能和效率&#xff0c;以及一系列创新功能&#xff0c;使其在研究和实际应用中都具有特别的吸引力。Gemma 2 的与众不同之处在于&#xff0c;它能够提供与更大的专有模型相当的性能&#xff0c;但其软件包专为更广泛的可…

北斗防爆手持终端在化工厂的安全性能分析

北斗防爆手持终端在化工厂中的应用显著提升了安全性能&#xff0c;其卓越的防爆设计、高精度定位与监控功能、实时通信能力以及多功能集成特性&#xff0c;共同构筑了化工厂安全生产的坚实防线&#xff0c;确保了巡检人员与设备在复杂环境下的安全作业与高效管理。 北斗防爆手持…

[Linux][Shell][Shell基础] -- [Shebang][特殊符号][变量][父子Shell]详细讲解

目录 0.前置知识1.Shebang2.Linux特殊符号整理3.变量4.环境变量5.父子shell0.概念1.创建进程列表(创建子shell执行命令) 6.内置命令 vs 外置命令 0.前置知识 #用于注释shell脚本语⾔属于⼀种弱类型语⾔&#xff1a;⽆需声明变量类型&#xff0c;直接定义使⽤shell三剑客&#…

148. 排序链表

https://leetcode.cn/problems/sort-list/description/https://leetcode.cn/problems/sort-list/description/ 解题思路&#xff1a; 归并排序&#xff0c;先拿到链表长度&#xff0c;每次遍历到一半&#xff0c;进行分割&#xff0c;后序双指针合并。 /*** Definition for sin…

图论---匈牙利算法求二分图最大匹配的实现

开始编程前分析设计思路和程序的整体的框架&#xff0c;以及作为数学问题的性质&#xff1a; 程序流程图&#xff1a; 数学原理&#xff1a; 求解二分图最大匹配问题的算法&#xff0c;寻找一个边的子集&#xff0c;使得每个左部点都与右部点相连&#xff0c;并且没有两条边共享…

操作系统|day1.了解操作系统

文章目录 了解操作系统定义目的操作系统体系结构功能特征操作系统的区别(64位与32位)操作系统的地址内存管理缓存 了解操作系统 定义 操作系统是控制管理计算机系统的硬软件,分配调度资源的系统软件 目的 方便性,有效性(提高系统资源的利用率,提高系统的吞吐量) 操作系统体…

android13 固定U盘链接 SD卡链接 TF卡链接 硬盘链接

1.前言 有些客户使用的应用并不带有自动监听U盘 sd卡广播的代码,使用的代码是固定的地址,这样的话,就需要我们将系统的挂载目录固定了。 原始路径 /storage/3123-19FA 增加链接 /storage/upan_000 -> /storage/3123-19FA 2. 首先如果是应用本身监听的话,使用的是 /…

Linux Mac 安装Higress 平替 Spring Cloud Gateway

Linux Mac 安装Higress 平替 Spring Cloud Gateway Higress是什么?传统网关分类Higress定位下载安装包执行安装命令执行脚本 安装成功打开管理界面使用方法configure.shreset.shstartup.shshutdown.shstatus.shlogs.sh Higress官网 Higress是什么? Higress是基于阿里内部的…

CentOS 8升级gcc版本

1、查看gcc版本 gcc -v发现gcc版本为8.x.x&#xff0c;而跑某个项目的finetune需要gcc-9&#xff0c;之前搜索过很多更新gcc版本的方式&#xff0c;例如https://blog.csdn.net/xunye_dream/article/details/108918316?spm1001.2014.3001.5506&#xff0c;但执行指令 sudo yu…

S32V234平台开发(一)快速使用

快速使用 准备供电复位选择串口通信启动选择显示登陆系统 准备供电 s32v234可以使用两种电源供电 一种是左边电源端子&#xff0c;一种是右边电源适配器(12V 3A) 注意:不要同时使用两种电源同时供电 复位选择 Pressing POR RESET pulls active low EXT_POR signal on S32V2…

使用bypy丝滑传递百度网盘-服务器文件

前言 还在为百度网盘的数据集难以给服务器做同步而痛苦吗&#xff0c;bypy来拯救你了&#xff01;bypy是一个强大而灵活的百度网盘命令行客户端工具。它是基于Python开发的开源项目&#xff0c;为用户提供了一种通过命令行界面与百度网盘进行交互的方式。使用bypy&#xff0c;…

自动驾驶AVM环视算法--540度全景的算法实现和exe测试demo

参考&#xff1a;金书世界 540度全景影像是什么 540度全景影像是在360度全景影像基础上的升级功能&#xff0c;它增加了更多的摄像头来收集周围的图像数据。通常&#xff0c;这些摄像头分布在车辆的更多位置&#xff0c;例如车顶、车底等&#xff0c;以便更全面地捕捉车辆周围…

C++ | Leetcode C++题解之第226题翻转二叉树

题目&#xff1a; 题解&#xff1a; class Solution { public:TreeNode* invertTree(TreeNode* root) {if (root nullptr) {return nullptr;}TreeNode* left invertTree(root->left);TreeNode* right invertTree(root->right);root->left right;root->right …