【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【14】缓存与分布式锁


持续学习&持续更新中…

守破离


【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【14】缓存与分布式锁

  • 缓存
    • 本地缓存
    • 分布式缓存-本地模式在分布式下的问题
    • 分布式缓存
    • 整合 redis 作为缓存
    • JMeter测试出OutOfDirectMemoryError【堆外内存溢出】
  • 高并发读下缓存失效问题
    • 缓存穿透
    • 缓存雪崩
    • 缓存击穿
    • 总结解决高并发下缓存失效问题
  • 本地锁与分布式锁
    • 分布式下的本地锁
    • 锁时序问题
    • 分布式锁
  • 分布式锁演进
    • 分布式锁演进-基本原理
    • 分布式锁演进-阶段一
    • 分布式锁演进-阶段二
    • 分布式锁演进-阶段三
    • 分布式锁演进-阶段四
    • 分布式锁演进-阶段五-最终形态
    • 使用Redis实现分布式锁总结
  • Redisson—分布式锁
    • 使用
    • Redisson可重入锁
    • Redisson读写锁
    • Redisson信号量
    • Redisson闭锁
  • 数据库与缓存数据一致性问题
    • 双写模式
    • 失效模式
    • 缓存数据一致性-解决方案
    • 缓存数据一致性-解决-分布式读写锁
    • 缓存数据一致性-解决-Canal
    • 我们系统的一致性解决方案
  • SpringCache
    • 简介
    • 基础概念
    • 注解
    • 表达式语法
    • 整合
    • 使用
    • Spring-Cache的不足
  • 参考

缓存

为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作(持久化)

哪些数据适合放入缓存?

  • 即时性、数据一致性要求不高的
  • 访问量大且更新频率不高的数据(读多,写少)

举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据 更新频率 来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。

在这里插入图片描述

data = cache.load(id);//从缓存加载数据 
If(data == null){ 
	data = db.load(id);//从数据库加载数据 
	cache.put(id,data);//保存到  cache 中 
} 
return data;

注意:在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没 有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的 数据永久不一致 问题。

本地缓存

在这里插入图片描述

  • 比如将Map或者ehcache作为本地缓存
  • 如果项目是单机部署的,不是分布式,也不考虑缓存大小,那么使用本地缓存没有问题

分布式缓存-本地模式在分布式下的问题

在这里插入图片描述

  • 每个微服务发现自己的缓存中没有上架,都会去查询数据库
  • 如果某次请求负载均衡到了第一台机器,并修改了数据,那么1号机器的缓存会被修改,但是2号3号机器的缓存不会修改

分布式缓存

在这里插入图片描述

  • redis中间件的好处就是可以集群化
  • 理论上可以无限扩容redis
  • 使用中间件作为缓存就打破了本地缓存的 容量限制

整合 redis 作为缓存

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring:
  redis:
    host: 192.168.56.10
    port: 6379
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallProductApplicationTests {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Test
    public void testStringRedisTemplate(){
        //hello   world
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();

        //保存
        ops.set("hello","world_"+ UUID.randomUUID().toString());

        //查询
        String hello = ops.get("hello");
        System.out.println("之前保存的数据是:"+hello);
    }
}

修改业务:

    public Map<String, List<Catelog2Vo>> getCatalogJson() {
        //给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型;【序列化与反序列化】JSON跨语言,跨平台兼容。
        //1、加入缓存逻辑,缓存中存的数据是json字符串。
        String catalogJSON = redisTemplate.opsForValue().get(RedisConstant.CATEGORY_KEY);
        if (StringUtils.isEmpty(catalogJSON)) {
            //2、缓存中没有,查询数据库
            System.out.println("缓存不命中....将要查询数据库...");
            Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();
            return catalogJsonFromDb;
        }

        System.out.println("缓存命中....直接返回....");
        //转为我们指定的对象。
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
        return result;
    }
    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
        System.out.println("查询了数据库.....");
        Map<String, List<Catelog2Vo>> catgoryData = selectDb();
        
        //查到的数据再放入缓存,将对象转为json放在缓存中
        String s = JSON.toJSONString(catgoryData);
        redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s);
//        1天过期        
//        redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s, 1, TimeUnit.DAYS);

        return catgoryData;
    }

JMeter测试出OutOfDirectMemoryError【堆外内存溢出】

  • springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。
  • lettuce的bug导致netty堆外内存溢出;
  • netty如果没有指定堆外内存,默认使用-Xmx300m(自己配置的微服务内存),可以通过-Dio.netty.maxDirectMemory进行设置

解决方案:

  • 不能只使用-Dio.netty.maxDirectMemory去调大堆外内存。长久的运行后,堆外内存还会溢出
  • 升级lettuce客户端。(不推荐)
  • 切换使用jedis。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

高并发读下缓存失效问题

缓存穿透

在这里插入图片描述

  • 缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数 据库也无此记录,我们没有将这次查询的 null 写入缓存,这将导致这个不存在的数据每次 请求都要到存储层去查询,失去了缓存的意义。
  • 在流量大时,可能 DB 就挂掉了,要是有人利用不存在的 key 频繁攻击我们的应用,这就是 漏洞。
  • 解决: 缓存空结果、并且设置短的过期时间。

缓存雪崩

雪崩:缓存的数据key大面积同时失效

在这里插入图片描述

  • 缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失 效,请求全部转发到 DB,DB 瞬时压力过重雪崩。
  • 解决: 原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的 重复率就会降低,就很难引发集体失效的事件。

缓存击穿

瞄着一个靶子疯狂射击的时候,这个靶子就会被打穿
也叫做热点key

在这里插入图片描述

  • 对于一些设置了过期时间的 key,如果这些 key 可能会在某些时间点被超高并发地访问, 是一种非常“热点”的数据。
  • 这个时候,需要考虑一个问题:如果这个 key 在大量请求同时进来前正好失效,那么所 有对这个 key 的数据查询都落到 db,我们称为缓存击穿。
  • 解决: 加锁

总结解决高并发下缓存失效问题

  • 空结果缓存:解决缓存穿透
  • 设置过期时间(加随机值):解决缓存雪崩
  • 加锁:解决缓存击穿

本地锁与分布式锁

分布式下的本地锁

有多少个商品服务,就会有多少把锁,就会访问多少次数据库

在这里插入图片描述

IDEA开启多个商品服务:可以多复制几个商品服务交给网关进行压力测试本地锁

在这里插入图片描述

在这里插入图片描述

本地锁:

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithLocalLock() {
        //只要是同一把锁,就能锁住需要这个锁的所有线程
        //synchronized (this):SpringBoot所有的组件在容器中都是单例的。
        //TODO 本地锁:synchronized,JUC(Lock),在分布式情况下,想要锁住所有,必须使用分布式锁
        synchronized (this) {
            //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
            return getCatalogJsonFromDB();
        }
    }
    public Map<String, List<Catelog2Vo>> getCatalogJson() {
        //给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型;【序列化与反序列化】JSON跨语言,跨平台兼容。
        /**TODO 写博客
         * 1、空结果缓存:解决缓存穿透
         * 2、设置过期时间(加随机值):解决缓存雪崩
         * 3、加锁:解决缓存击穿
         */
        //1、加入缓存逻辑,缓存中存的数据是json字符串。
        String catalogJSON = getCatalogJSON();
        if (StringUtils.isEmpty(catalogJSON)) {
            //2、缓存中没有,查询数据库
            //保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。在同一把锁内完成
            System.out.println("缓存不命中....将要查询数据库...");
			Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDbWithLocalLock();
            return catalogJsonFromDb;
        }

        System.out.println("缓存命中....直接返回....");
        //转为我们指定的对象。
        Map<String, List<Catelog2Vo>> result = getJSONData(catalogJSON);
        return result;
    }

锁时序问题

在这里插入图片描述

1号线程查到数据并释放了锁,在1号线程将数据放入缓存期间(把结果放到 Redis缓存是一次 网络交互,需要时间),2号线程发现缓存中没有数据,会再查询一次数据库

在这里插入图片描述

保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。在同一把锁内完成

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
        String catalogJSON = redisTemplate.opsForValue().get(RedisConstant.CATEGORY_KEY);
        if (!StringUtils.isEmpty(catalogJSON)) {
            //1、缓存不为null直接返回
            Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
            return result;
        }

        System.out.println("查询了数据库.....");
        //2、封装数据
        Map<String, List<Catelog2Vo>> catgoryData = loadDB();
        
        //3、查到的数据再放入缓存,将对象转为json放在缓存中
        String s = JSON.toJSONString(catgoryData);
        redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s, 1, TimeUnit.DAYS);
//        redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s);

        return catgoryData;
    }

分布式锁

在这里插入图片描述

分布式锁演进

分布式锁演进-基本原理

在这里插入图片描述

在这里插入图片描述

分布式锁演进-阶段一

在这里插入图片描述

在这里插入图片描述

分布式锁演进-阶段二

在这里插入图片描述

在这里插入图片描述

分布式锁演进-阶段三

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

分布式锁演进-阶段四

在这里插入图片描述

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
        //1、占分布式锁。去redis占坑
        String uuid = UUID.randomUUID().toString();
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            System.out.println("获取分布式锁成功...");
            // 执行业务
            Map<String, List<Catelog2Vo>> dataFromDb = getCatalogJsonFromDB();
            //获取值对比+对比成功删除
            String lockValue = redisTemplate.opsForValue().get("lock");
            if(uuid.equals(lockValue)){
                //删除我自己的锁
                redisTemplate.delete("lock");//删除锁
            }
            return dataFromDb;
        } else {
            //加锁失败...重试。synchronized ()
            //休眠100ms重试
            System.out.println("获取分布式锁失败...等待重试");
            try {
                Thread.sleep(100);
            } catch (Exception e) {
            }
            return getCatalogJsonFromDbWithRedisLock();//自旋的方式
        }
    }

解释:

  • String lockValue = redisTemplate.opsForValue().get("lock");这段代码是网络传输交互,需要一定的时间
  • Redis接收到请求,在查该线程的锁的时候,该线程的锁还没有过期,于是返回了该线程的UUID
  • 但是在网络返回途中,该线程设置的锁过期了;别人占到了这个锁,并设置了锁,当然别人的UUID会被set到lock
  • 那么此时,该线程由于get到的UUID是正确的,那么就可以正确的equals,那么就会执行删除逻辑redisTemplate.delete("lock");,就会把别人的锁给删除了

分布式锁演进-阶段五-最终形态

在这里插入图片描述

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
        //1、占分布式锁。去redis占坑
        String uuid = UUID.randomUUID().toString();
//        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid);
//        加锁和设置过期时间保证原子性
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);

        if (lock) {
            System.out.println("获取分布式锁成功...");
            //加锁成功... 执行业务
            //2、设置过期时间,必须和加锁是同步的,原子的
//            redisTemplate.expire("lock",30,TimeUnit.SECONDS);

            Map<String, List<Catelog2Vo>> dataFromDb;
            try {
                dataFromDb = getCatalogJsonFromDB();
            } finally {
//           TODO 写博客     lua脚本解锁    原子操作
//                Redis分布式锁,核心:
//                                  - 加锁和过期时间保证原子性
//                                  - 解锁保证原子性
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                //删除锁
                Long delResult = redisTemplate.execute(
                        new DefaultRedisScript<Long>(script, Long.class)
                        , Arrays.asList("lock")
                        , uuid
                );
            }
            return dataFromDb;
        } else {
            //加锁失败...重试。synchronized ()
            //休眠200ms重试
            System.out.println("获取分布式锁失败...等待重试");
            try {
                Thread.sleep(200);
            } catch (Exception e) {
            }
            return getCatalogJsonFromDbWithRedisLock();//自旋的方式
        }
    }

使用Redis实现分布式锁总结

  • 加锁和过期时间保证原子性
  • 解锁(判断+删除)保证原子性

Redisson—分布式锁

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间 的协作。

官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

在这里插入图片描述

使用

<!-- 以后使用redisson作为所有分布式锁,分布式对象等功能框架-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>
@Configuration
public class MyRedissonConfig {
    /**
     * 所有对Redisson的使用都是通过RedissonClient对象
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson(@Value("${spring.redis.host}") String url) {
        //1、创建配置
        //Redis url should start with redis:// or rediss://
//        rediss:两个s代表安全连接
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url+":6379");
        //2、根据Config创建出RedissonClient示例
        return Redisson.create(config);
    }
}

Redisson的所有操作都是发送Lua脚本,所以都保证了原子性:

    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisson() {

        //锁的名字: 锁的粒度,越细越快。[锁的一个名字就是一个锁,不同名字就是不同的锁]
        //锁的粒度:具体缓存的是某个数据,11-号商品;  product-11-lock   product-12-lock

//        加锁和设置过期时间保证原子性
//        Redisson的所有操作都是发送Lua脚本,所以都保证了原子性
        RLock lock = redisson.getLock(RedisConstant.CATEGORY_KEY);
//        lock.lock();
        lock.lock(10, TimeUnit.SECONDS);
        System.out.println("获取分布式锁成功...");
        //加锁成功... 执行业务
        Map<String, List<Catelog2Vo>> dataFromDb;
        try {
            dataFromDb = getCatalogJsonFromDB();
        } finally {
            //删除锁
            lock.unlock();
        }
        return dataFromDb;
    }

Redisson可重入锁

在这里插入图片描述

在这里插入图片描述

    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        //1、获取一把锁,只要锁的名字一样,就是同一把锁
        RLock lock = redisson.getLock("my-lock");

        //2、加锁,不会有死锁问题,而且还会自动续期
        lock.lock(); //阻塞式等待,知道获取到锁。Redisson有一个看门狗,可以给锁自动续期,默认加的锁都是30s时间。
        //1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
        //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

//        lock.lock(10,TimeUnit.SECONDS); //10秒自动解锁,无需unlock。自动解锁时间一定要大于业务的执行时间。
        //问题:lock.lock(10,TimeUnit.SECONDS); 在锁时间到了以后,不会自动续期。
        //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
        //2、如果我们未指定锁的超时时间,就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间】;
        //    只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期,续成30s
        //    internalLockLeaseTime【看门狗时间】 / 3,10s

        //最佳实战
        //1)、lock.lock(30,TimeUnit.SECONDS);省掉了整个续期操作。手动解锁
        try {
//            业务代码
            System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
            Thread.sleep(30000); //模拟超长任务
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //3、解锁  假设解锁代码没有运行,redisson会不会出现死锁   答案:不会出现死锁
            System.out.println("释放锁..." + Thread.currentThread().getId());
            lock.unlock();
        }

        return "hello";
    }

lock.lock(); 加锁,不会有死锁问题,而且还会自动续期

  • 阻塞式等待,直到获取到锁。Redisson有一个看门狗,可以给锁自动续期,默认加的锁都是30s时间。
  • 锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长导致锁自动过期会被删掉
  • 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

lock.lock(10,TimeUnit.SECONDS); 10秒自动解锁,无需unlock。自动解锁时间一定要大于业务的执行时间

  • 在锁时间到了以后,即使业务没有执行完成,也不会自动续期。
  • 如果我们传递了锁的超时时间,就发送给redis执行Lua脚本,进行占锁,默认超时时间就是我们指定的时间
  • 如果我们未指定锁的超时时间(lock.lock()),就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间就是30秒】;只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间30s】,每隔10s【internalLockLeaseTime / 3:三分之一的看门狗时间】都会自动再次续期,续成30s

最佳实战lock.lock(30,TimeUnit.SECONDS);省掉了看门狗的整个续期操作。业务时间给大一点然后手动解锁

Redisson读写锁

改数据加写锁,读数据加读锁

在这里插入图片描述

在这里插入图片描述

    //保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁,大家都能用,加了跟没加一样
    //写锁没释放读就必须等待
    // 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
    // 写 + 读: 等待写锁释放
    // 写 + 写: 阻塞方式
    // 读 + 写: 有读锁。写也需要等待。
    // 只要有写的存在,都必须等待
    @GetMapping("/write")
    @ResponseBody
    public String writeValue() {
        RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
        String s = "";
        RLock rLock = lock.writeLock();
        //1、改数据加写锁,读数据加读锁
        rLock.lock();
        try {
            System.out.println("写锁加锁成功..." + Thread.currentThread().getId());
            s = UUID.randomUUID().toString();
            Thread.sleep(30000);
            stringRedisTemplate.opsForValue().set("writeValue", s);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
            System.out.println("写锁释放" + Thread.currentThread().getId());
        }

        return s;
    }

    @GetMapping("/read")
    @ResponseBody
    public String readValue() {
        RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
        String s = "";
        //加读锁
        RLock rLock = lock.readLock();
        rLock.lock();
        try {
            System.out.println("读锁加锁成功" + Thread.currentThread().getId());
            s = stringRedisTemplate.opsForValue().get("writeValue");
            Thread.sleep(30000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
            System.out.println("读锁释放" + Thread.currentThread().getId());
        }

        return s;
    }

读写锁可以保证一定能读到最新数据

  • 写锁是一个排他锁(互斥锁、独享锁)。
  • 读锁是一个共享锁,大家都能用,加了跟没加一样
  • 写锁没释放读就必须等待

只要有写的存在,都(写/读)必须等待;有读锁。写也要等待。

  • 读 + 读: 相当于无锁,并发读,只会在redis中记录好所有当前的读锁。他们都会同时加锁成功
  • 写 + 读: 等待写锁释放才能读
  • 写 + 写: 阻塞方式
  • 读 + 写: 有读锁。写也需要等待。

Redisson信号量

信号量也可以用作分布式限流

在这里插入图片描述

    /**
     * 车库停车,
     * 3车位[事先在redis中放key:park,value:3]
     * 信号量也可以用作分布式限流;
     */
    @GetMapping("/park")
    @ResponseBody
    public String park() throws InterruptedException {
        RSemaphore park = redisson.getSemaphore("park");
//        park.acquire();//获取一个信号,获取一个值,占一个车位
        boolean b = park.tryAcquire();
        if (b) { //信号量也可以用作分布式限流
            //执行业务
        } else {
            return "error";
        }

        return "ok=>" + b;
    }

    @GetMapping("/go")
    @ResponseBody
    public String go() throws InterruptedException {
        RSemaphore park = redisson.getSemaphore("park");
        park.release();//释放一个车位

        return "ok";
    }

Redisson闭锁

在这里插入图片描述

    /**
     * 放假,锁门
     * 1班没人了,2班没人了......
     * 5个班全部走完,我们可以锁大门
     */
    @GetMapping("/lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.trySetCount(5);
        door.await(); //等待闭锁都完成

        return "放假了...";
    }

    @GetMapping("/gogogo/{id}")
    @ResponseBody
    public String gogogo(@PathVariable("id") Long id) {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.countDown();//计数减一;

        return id + "班的人都走了...";
    }

数据库与缓存数据一致性问题

通过加锁解决了缓存击穿,确保了读取缓存没问题之后

有一个新的问题:那就是:缓存里面的数据如何和数据库保持一致,也就是缓存数据一致性

缓存数据一致性的原因都是:数据库的最后一次更新没有放到Redis缓存中,导致数据库和缓存内容不一致

我们只需要确保 最终一致性,放在缓存中的数据,允许读到的最新数据有延迟

双写模式

在这里插入图片描述

  • 将 {写数据库和写缓存} 放在一起加分布式锁,可以解决这一问题

  • 如果给缓存设置了过期时间,我们甚至可以不用管这个问题,当然,这取决于 我们对数据实时性查看的容忍性

失效模式

在这里插入图片描述

缓存数据一致性-解决方案

  • 无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
  • 1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小(用户点击相比于电脑还是比较慢的),不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
  • 2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
  • 3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
  • 4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据,就不用加读写锁了);

总结:

  • 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
  • 我们不应该过度设计,增加系统的复杂性
  • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

缓存数据一致性-解决-分布式读写锁

分布式读写锁。读数据等待写数据整个操作完成

缓存数据一致性-解决-Canal

Canal是阿里开源的一个中间件,相当与一个数据库的从服务器

在这里插入图片描述

我们系统的一致性解决方案

1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新

2、读写数据的时候,加上分布式的读写锁。 经常写,经常读会有极大的影响;偶尔写,经常读影响不大(读写锁的读锁相当于无锁)

SpringCache

简介

  • Spring 从 3.1 开始定义了 org.springframework.cache.Cache 和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术; 并支持使用 JCache(JSR-107)注解简化我们开发;

  • Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接 口 下 Spring 提 供 了 各 种 xxxCache 的 实 现 ; 如 RedisCache , EhCacheCache , ConcurrentMapCache 等;

  • 每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已经被调用过;

  • 如果有就直接从缓存中获取方法调用后的结果

  • 如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。

基础概念

在这里插入图片描述

注解

在这里插入图片描述

在这里插入图片描述

开启缓存功能 @EnableCaching

只需要使用注解就能完成缓存操作

  • @Cacheable: Triggers cache population.:触发将数据保存到缓存的操作
  • @CacheEvict: Triggers cache eviction.:触发将数据从缓存删除的操作 【失效模式】
  • @CachePut: Updates the cache without interfering with the method execution.:不影响方法执行更新缓存 【双写模式】
  • @Caching: Regroups multiple cache operations to be applied on a method.:组合以上多个操作
  • @CacheConfig: Shares some common cache-related settings at class-level.:在类级别共享缓存的相同配置

表达式语法

在这里插入图片描述

整合

引入依赖:spring-boot-starter-cache、spring-boot-starter-data-redis

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-cache</artifactId>
 </dependency>

默认配置:

  • 如果缓存中有,方法不用调用。
  • key默认自动生成 category::SimpleKey [] ==> 缓存的名字::SimpleKey [] (自主生成的key值)
  • 缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
  • 默认ttl时间 -1;代表永不过期

在这里插入图片描述

自定义配置类:

@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching
public class MyCacheConfig {

    /**
     * 配置文件中的东西没有用上;
     *
     * 1、原来和配置文件绑定的配置类是这样子的
     *      @ConfigurationProperties(prefix = "spring.cache")
     *      public class CacheProperties{  }
     * 2、要让他生效
     *      @EnableConfigurationProperties(CacheProperties.class)
     *
     */
//1、
//    @Autowired
//    CacheProperties cacheProperties;
//2、
//    Spring会自动注入方法参数进去
//  缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
//  默认ttl时间 -1;代表永不过期

    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
//        config = config.entryTtl();
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));

        CacheProperties.Redis redisProperties = cacheProperties.getRedis();

        //将配置文件中的所有配置都生效
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }
}

application.properties:

spring.cache.type=redis
#如果配置了前缀就用我们配置的,如果没有就默认使用缓存的名字作为前缀
# 不推荐指定前缀,不指定前缀可以让分区作为前缀
#spring.cache.redis.key-prefix=gulimall_cache_
spring.cache.redis.use-key-prefix=true
#是否缓存空值[可以用于防止缓存穿透]
spring.cache.redis.cache-null-values=true
#存活时间以毫秒为单位
spring.cache.redis.time-to-live=3600000

使用

//    @Cacheable(value = {"category"}, key = "#root.method.name")
    @Cacheable(value = {"category"}, key = "#root.method.name", sync = true)
//    @Cacheable(value = {"category"}, key = "'listLevel1Categorys'", sync = true)
//    @Cacheable(value = {"category"})
    @Override
    public List<CategoryEntity> listLevel1Categorys() {
        System.out.println("listLevel1Categorys查询了数据库.....");

        List<CategoryEntity> categoryEntities = this.list(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
        return categoryEntities;
//        return null;   // 测试缓存空结果
    }
    @Override
// 指定生成的缓存使用的key:key属性指定接受一个SpEL表达式
//    @Cacheable(value = "category", key = "#root.methodName")
    @Cacheable(value = "category", key = "#root.methodName", sync = true)
    public Map<String, List<Catelog2Vo>> getCatalogJson() {
        System.out.println("查询了数据库.....");
        //封装数据
        Map<String, List<Catelog2Vo>> catgoryData = loadDB();
        return catgoryData;
    }
    /**
     * 级联更新所有关联的数据
     * @CacheEvict:失效模式
     * 1、同时进行多种缓存操作  @Caching
     * 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true)
     * 3、存储同一类型的数据,都可以指定成同一个分区(这个分区只是Spring逻辑划分的,Redis物理存储并没有)。
     * 这样做的好处就是当我们修改了这个类型的数据,我们就直接可以将分区里跟这个类型有关的所有数据都删除掉
     * 分区名默认就是缓存的前缀(不在application.properties中指定前缀)
     */
//    @Caching(evict = {
//            @CacheEvict(value = "category", key = "'listLevel1Categorys'"),
//            @CacheEvict(value = "category", key = "'getCatalogJson'")
//    })
//  或者
    @CacheEvict(value = "category",allEntries = true) //缓存失效模式 //allEntries:删除所有category的缓存
//    @CachePut //缓存双写模式 : 方法没有返回值不支持双写模式
    @Transactional
    @Override
    public void updateCascade(CategoryEntity category) {
        this.updateById(category);

        //TODO 当category表中的某些字段被其他表使用,并且该字段发生了更新,那么也需要修改那些表的信息
        if (!StringUtils.isEmpty(category.getName())) { // 分类名发生了变化
            categoryBrandRelationService.updateCategory(category.getCatId(), category.getName());
        }
    }

每一个需要缓存的数据我们都来指定要放到那个名字的缓存。【缓存的分区(推荐按照业务类型分)】

存储同一类型的数据,都可以指定成同一个分区(这个分区只是Spring逻辑划分的,Redis物理存储并没有)。这样做的好处就是当我们修改了这个类型的数据,我们就直接可以将分区里跟这个类型有关的所有数据都删除掉

分区名默认就是缓存的前缀(不推荐在application.properties中指定前缀)

Spring-Cache的不足

读模式:Spring-Cache全部考虑到了

  • 缓存穿透:查询一个缓存和数据库都为null的数据。解决:缓存空数据;cache-null-values=true
  • 缓存击穿:大量并发进来同时查询一个正好过期的数据。
    • 解决:加锁;?默认是无加锁的;
    • sync = true(加本地锁【本地锁:有多少服务就有多少锁就有多少数据库操作被放行,这样问题不大,不会有超多请求查询数据库】,可以解决击穿)
  • 缓存雪崩:大量的key同时过期。
    • 解决:加随机时间。
    • 我们只需要加上过期时间就行
    • (存储的时间不一样,那么过期的时间自然也就不一样):time-to-live=3600000

写模式:Spring-Cache并没有考虑,我们自己可以考虑:

  • 读写加锁。(适用于读多写少的系统)
  • 引入Canal,感知到 MySQL 的更新去更新数据库
  • 加上过期时间,一段时间后缓存与数据库最终一致即可(看自己对与系统数据实时性观察的容忍性了)
  • 读多写多,直接去数据库查询就行,别用缓存

总结:

  • 常规数据(读多写少,即时性,一致性要求不高的数据);完全可以使用Spring-Cache;
  • 读模式:Spring-Cache全部解决了问题;
  • 写模式(只要缓存的数据有过期时间就足够了)
  • 特殊数据(比如实时性要求高/经常写/…):特殊设计

参考

雷丰阳: Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目.


本文完,感谢您的关注支持!


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/749095.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

操纵系统的特征调度算法

操纵系统的特征 调度算法是操作系统用来决定各个进程/作业在CPU上执行顺序的方法。最常见的调度算法有&#xff1a;FCFS、SJF、HRRN、RR、HPF和MFQ。这集先介绍前三个 先来先服务 FCFS 根据作业到达的先后顺序调度&#xff0c;CPU会一直运行直到作业结束&#xff0c;所以这个…

iptables(12)实际应用举例:策略路由、iptables转发、TPROXY

简介 前面的文章中我们已经介绍过iptables的基本原理,表、链的基本操作,匹配条件、扩展模块、自定义链以及网络防火墙、NAT等基本配置及原理。 这篇文章将以实际应用出发,列举一个iptables的综合配置使用案例,将我们前面所涉及到的功能集合起来,形成一个完整的配置范例。…

安全:Linux重要安全配置之关闭常规ssh链接-开启密钥方式链接-防入侵非常重要以及有效的一项操作

https://doc.youyacao.com/88/2154 安全&#xff1a;Linux重要安全配置之关闭常规ssh链接-开启密钥方式链接-防入侵非常重要以及有效的一项操作 问题背景 优雅草官方的服务器长期被各类牛鬼蛇神来搞事情&#xff0c;之前其中有一台测试服由于属于管理和维护&#xff0c;安全…

【直播倒计时】面向AI领域的开发工程师:TinyML在国产FPGA的边缘AI加速应用

TinyML是机器学习中的一个新兴领域&#xff0c;专注于开发可在低功耗、内存受限的设备上运行的算法和模型。其核心目标是将先进的机器学习算法和模型移植到体积小巧、能耗极低的嵌入式设备中&#xff0c;使这些设备具备边缘智能&#xff0c;能够在没有外部服务器支持的情况下进…

【面试题】Spring面试题

目录 Spring Framework 中有多少个模块&#xff0c;它们分别是什么&#xff1f;Spring框架的设计目标、设计理念&#xff1f;核心是什么&#xff1f;Spring框架中都用到了哪些设计模式&#xff1f;Spring的核心机制是什么&#xff1f;什么是Spring IOC容器&#xff1f;什么是依…

Micrometer+ZipKin分布式链路追踪

目录 背景MicrometerMicrometer与ZipKin之间的关系专业术语分布式链路追踪原理 ZipKin安装下载 MicrometerZipKin 案例演示相关文献 背景 一个系统页面上的按钮点击到结果反馈&#xff0c;在微服务框架里&#xff0c;是由N个服务组成返回结果&#xff0c;中间可能经过a->b-…

基于pytorch实现的 MobileViT 的图像识别(迁移学习)

1、介绍 MobileViT 轻量级的分类识别网络&#xff0c;结合了CNN卷积和Transformer 混合的网络架构 关于更多介绍可以自行百度&#xff0c;本文通过pytorchpython进行实现 更多基础的图像分类网络&#xff0c;参考&#xff1a;图像分类_听风吹等浪起的博客-CSDN博客 2、相关代…

算法力扣刷题记录六【203移除链表元素】

前言 链表篇&#xff0c;开始。 记录六&#xff1a;力扣【203移除链表元素】 一、数据结构——链表 来源【代码随想录】&#xff0c;总结&#xff1a; &#xff08;1&#xff09;线性结构。内存地址不连续&#xff0c;通过指针指向串联一起。 &#xff08;2&#xff09;链表类…

《化工管理》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答 问&#xff1a;《化工管理》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的第一批认定学术期刊。 问&#xff1a;《化工管理》级别&#xff1f; 答&#xff1a;国家级。主办单位&#xff1a;中国石油和化学工业联合会 主管单位&…

API-其他事件

学习目标&#xff1a; 掌握其他事件 学习内容&#xff1a; 页面加载事件元素滚动事件页面尺寸事件 页面加载事件&#xff1a; 加载外部资源&#xff08;如图片、外联CSS和JavaScript等&#xff09;加载完毕时触发的事件。 为什么要学&#xff1f;&#xff1f; 有些时候需要等…

华为认证hcna题库背诵技巧有哪些?hcna和hcia有什么区别?

大家都知道华为认证hcna是有题库供考生刷题备考的&#xff0c;但题库中海量的题目想要在短时间背诵下来可并不是一件容易的事情&#xff0c;这就需要我们掌握一定的技巧才行。华为认证hcna题库背诵技巧有哪些? hcna和hcna这二者又有什么区别呢?今天的文章将为大家进行详细解…

IMU坐标系与自定义坐标系转化

1.首先示例图为例&#xff1a; 虚线黑色角度为IMU的坐标系&#xff1b;实线为自定义坐标系&#xff1b; 矫正&#xff1a;&#xff08;默认angleyaw为IMU采的数据角度&#xff09; angleyaw_pt angleyaw-25;if(-180<angleyaw&&angleyaw<-155) // 角度跳变问…

防火墙GRE over IPSec配置

一、基础知识 1、GRE隧道 GRE隧道是一种网络通信协议&#xff0c;使用通用路由封装&#xff08;GRE&#xff09;技术&#xff0c;能够将一种网络协议下的数据报文封装在另一种网络协议中&#xff0c;从而实现在另一个网络层协议中的传输。 GRE隧道的基本概念和工作方式 基本…

怎样实现聊天弹幕效果?

可以使用HTML、CSS和JavaScript的组合。以下是一个简单的步骤和示例代码&#xff0c;说明如何创建一个基本的弹幕效果&#xff1a; HTML结构&#xff1a; 创建一个用于显示弹幕的容器和输入弹幕的表单。 <!DOCTYPE html> <html lang"en"> <hea…

android 通过gradle去除aar的重复资源图片

背景&#xff1a;项目中引入了aar包&#xff0c;结果导致资源出问题了&#xff0c;于是需要对下面aar包进行重复资源去除操作 操作具体如下&#xff1a; 目录&#xff1a;app/build.gradle 末尾配置 apply from: "${project.rootDir}/scripts/excludewidgetAar.gradle&qu…

20240626(周三)AH股行情总结:沪指午后大反弹,港股震荡走高,AIGC、短剧概念走强,低价可转债触底反弹

内容提要 上证指数午后大反弹&#xff0c;创业板指涨近2%。港股震荡走高&#xff0c;恒生科技指数涨近1%。AIGC概念领涨&#xff0c;ST股、贵金属板块领跌。低价可转债集体大涨&#xff0c;广汇转债涨20%触发临停&#xff0c;广汇汽车今日上演地天板。 周三&#xff0c;A股午…

Django项目部署:uwsgi+daphne+nginx+vue部署

一、项目情况 项目根目录&#xff1a;/mnt/www/alert 虚拟环境目录&#xff1a;/mnt/www/venv/alert 激活虚拟环境&#xff1a;source /mnt/www/venv/alert/bin/activate 二、具体配置 1、uwsgi启动配置 根目录下&#xff1a;新增 uwsgi.ini 注意&#xff1a;使用9801端…

NSSCTF-Web题目17(反序列化)

目录 [SWPUCTF 2021 新生赛]pop 1、题目 2、知识点 3、思路 [NISACTF 2022]popchains 4、题目 5、知识点 6、思路 [SWPUCTF 2021 新生赛]pop 1、题目 2、知识点 php反序列化&#xff0c;代码审计 3、思路 打开题目 出现代码&#xff0c;接下来我们逐步对代码进行分析…

模型情景制作-冰镇啤酒

夏日炎炎&#xff0c;当我们在真实世界中开一瓶冰镇啤酒的时候&#xff0c;我们也可以为模型世界中的人物添加一些冰镇啤酒。 下面介绍一种快速酒瓶制造方法&#xff0c;您只需要很少工具&#xff1a; 截取尽量直的流道&#xff08;传说中的板件零件架&#xff09;,将其夹在您的…

惠普笔记本双指触摸不滚屏

查看笔记本型号 一般在笔记本背面很小的字那里 进入惠普官网 笔记本、台式机、打印机、墨盒与硒鼓 | 中国惠普 (hp.com) 选择“支持”>“解决问题”>“软件与驱动程序” 选择笔记本 输入型号&#xff0c;选择操作系统 下载驱动进行完整 重启之后进行测试