持续学习&持续更新中…
守破离
【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【14】缓存与分布式锁
- 缓存
- 本地缓存
- 分布式缓存-本地模式在分布式下的问题
- 分布式缓存
- 整合 redis 作为缓存
- JMeter测试出OutOfDirectMemoryError【堆外内存溢出】
- 高并发读下缓存失效问题
- 缓存穿透
- 缓存雪崩
- 缓存击穿
- 总结解决高并发下缓存失效问题
- 本地锁与分布式锁
- 分布式下的本地锁
- 锁时序问题
- 分布式锁
- 分布式锁演进
- 分布式锁演进-基本原理
- 分布式锁演进-阶段一
- 分布式锁演进-阶段二
- 分布式锁演进-阶段三
- 分布式锁演进-阶段四
- 分布式锁演进-阶段五-最终形态
- 使用Redis实现分布式锁总结
- Redisson—分布式锁
- 使用
- Redisson可重入锁
- Redisson读写锁
- Redisson信号量
- Redisson闭锁
- 数据库与缓存数据一致性问题
- 双写模式
- 失效模式
- 缓存数据一致性-解决方案
- 缓存数据一致性-解决-分布式读写锁
- 缓存数据一致性-解决-Canal
- 我们系统的一致性解决方案
- SpringCache
- 简介
- 基础概念
- 注解
- 表达式语法
- 整合
- 使用
- Spring-Cache的不足
- 参考
缓存
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作(持久化)
哪些数据适合放入缓存?
- 即时性、数据一致性要求不高的
- 访问量大且更新频率不高的数据(读多,写少)
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据 更新频率 来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。
data = cache.load(id);//从缓存加载数据
If(data == null){
data = db.load(id);//从数据库加载数据
cache.put(id,data);//保存到 cache 中
}
return data;
注意:在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没 有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的 数据永久不一致 问题。
本地缓存
- 比如将Map或者ehcache作为本地缓存
- 如果项目是单机部署的,不是分布式,也不考虑缓存大小,那么使用本地缓存没有问题
分布式缓存-本地模式在分布式下的问题
- 每个微服务发现自己的缓存中没有上架,都会去查询数据库
- 如果某次请求负载均衡到了第一台机器,并修改了数据,那么1号机器的缓存会被修改,但是2号3号机器的缓存不会修改
分布式缓存
- redis中间件的好处就是可以集群化
- 理论上可以无限扩容redis
- 使用中间件作为缓存就打破了本地缓存的 容量限制
整合 redis 作为缓存
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
host: 192.168.56.10
port: 6379
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallProductApplicationTests {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate(){
//hello world
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
//保存
ops.set("hello","world_"+ UUID.randomUUID().toString());
//查询
String hello = ops.get("hello");
System.out.println("之前保存的数据是:"+hello);
}
}
修改业务:
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型;【序列化与反序列化】JSON跨语言,跨平台兼容。
//1、加入缓存逻辑,缓存中存的数据是json字符串。
String catalogJSON = redisTemplate.opsForValue().get(RedisConstant.CATEGORY_KEY);
if (StringUtils.isEmpty(catalogJSON)) {
//2、缓存中没有,查询数据库
System.out.println("缓存不命中....将要查询数据库...");
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();
return catalogJsonFromDb;
}
System.out.println("缓存命中....直接返回....");
//转为我们指定的对象。
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
System.out.println("查询了数据库.....");
Map<String, List<Catelog2Vo>> catgoryData = selectDb();
//查到的数据再放入缓存,将对象转为json放在缓存中
String s = JSON.toJSONString(catgoryData);
redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s);
// 1天过期
// redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s, 1, TimeUnit.DAYS);
return catgoryData;
}
JMeter测试出OutOfDirectMemoryError【堆外内存溢出】
- springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。
- lettuce的bug导致netty堆外内存溢出;
- netty如果没有指定堆外内存,默认使用
-Xmx300m
(自己配置的微服务内存),可以通过-Dio.netty.maxDirectMemory
进行设置
解决方案:
- 不能只使用
-Dio.netty.maxDirectMemory
去调大堆外内存。长久的运行后,堆外内存还会溢出 - 升级lettuce客户端。(不推荐)
- 切换使用jedis。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
高并发读下缓存失效问题
缓存穿透
- 缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数 据库也无此记录,我们没有将这次查询的 null 写入缓存,这将导致这个不存在的数据每次 请求都要到存储层去查询,失去了缓存的意义。
- 在流量大时,可能 DB 就挂掉了,要是有人利用不存在的 key 频繁攻击我们的应用,这就是 漏洞。
- 解决: 缓存空结果、并且设置短的过期时间。
缓存雪崩
雪崩:缓存的数据key大面积同时失效
- 缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失 效,请求全部转发到 DB,DB 瞬时压力过重雪崩。
- 解决: 原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的 重复率就会降低,就很难引发集体失效的事件。
缓存击穿
瞄着一个靶子疯狂射击的时候,这个靶子就会被打穿
也叫做热点key
- 对于一些设置了过期时间的 key,如果这些 key 可能会在某些时间点被超高并发地访问, 是一种非常“热点”的数据。
- 这个时候,需要考虑一个问题:如果这个 key 在大量请求同时进来前正好失效,那么所 有对这个 key 的数据查询都落到 db,我们称为缓存击穿。
- 解决: 加锁
总结解决高并发下缓存失效问题
- 空结果缓存:解决缓存穿透
- 设置过期时间(加随机值):解决缓存雪崩
- 加锁:解决缓存击穿
本地锁与分布式锁
分布式下的本地锁
有多少个商品服务,就会有多少把锁,就会访问多少次数据库
IDEA开启多个商品服务:可以多复制几个商品服务交给网关进行压力测试本地锁
本地锁:
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithLocalLock() {
//只要是同一把锁,就能锁住需要这个锁的所有线程
//synchronized (this):SpringBoot所有的组件在容器中都是单例的。
//TODO 本地锁:synchronized,JUC(Lock),在分布式情况下,想要锁住所有,必须使用分布式锁
synchronized (this) {
//得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
return getCatalogJsonFromDB();
}
}
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型;【序列化与反序列化】JSON跨语言,跨平台兼容。
/**TODO 写博客
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、加锁:解决缓存击穿
*/
//1、加入缓存逻辑,缓存中存的数据是json字符串。
String catalogJSON = getCatalogJSON();
if (StringUtils.isEmpty(catalogJSON)) {
//2、缓存中没有,查询数据库
//保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。在同一把锁内完成
System.out.println("缓存不命中....将要查询数据库...");
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDbWithLocalLock();
return catalogJsonFromDb;
}
System.out.println("缓存命中....直接返回....");
//转为我们指定的对象。
Map<String, List<Catelog2Vo>> result = getJSONData(catalogJSON);
return result;
}
锁时序问题
1号线程查到数据并释放了锁,在1号线程将数据放入缓存期间(把结果放到 Redis缓存是一次 网络交互,需要时间),2号线程发现缓存中没有数据,会再查询一次数据库
保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。在同一把锁内完成
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
String catalogJSON = redisTemplate.opsForValue().get(RedisConstant.CATEGORY_KEY);
if (!StringUtils.isEmpty(catalogJSON)) {
//1、缓存不为null直接返回
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}
System.out.println("查询了数据库.....");
//2、封装数据
Map<String, List<Catelog2Vo>> catgoryData = loadDB();
//3、查到的数据再放入缓存,将对象转为json放在缓存中
String s = JSON.toJSONString(catgoryData);
redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s, 1, TimeUnit.DAYS);
// redisTemplate.opsForValue().set(RedisConstant.CATEGORY_KEY, s);
return catgoryData;
}
分布式锁
分布式锁演进
分布式锁演进-基本原理
分布式锁演进-阶段一
分布式锁演进-阶段二
分布式锁演进-阶段三
分布式锁演进-阶段四
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1、占分布式锁。去redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
// 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getCatalogJsonFromDB();
//获取值对比+对比成功删除
String lockValue = redisTemplate.opsForValue().get("lock");
if(uuid.equals(lockValue)){
//删除我自己的锁
redisTemplate.delete("lock");//删除锁
}
return dataFromDb;
} else {
//加锁失败...重试。synchronized ()
//休眠100ms重试
System.out.println("获取分布式锁失败...等待重试");
try {
Thread.sleep(100);
} catch (Exception e) {
}
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
解释:
String lockValue = redisTemplate.opsForValue().get("lock");
这段代码是网络传输交互,需要一定的时间- Redis接收到请求,在查该线程的锁的时候,该线程的锁还没有过期,于是返回了该线程的UUID
- 但是在网络返回途中,该线程设置的锁过期了;别人占到了这个锁,并设置了锁,当然别人的UUID会被set到
lock
里 - 那么此时,该线程由于get到的UUID是正确的,那么就可以正确的equals,那么就会执行删除逻辑
redisTemplate.delete("lock");
,就会把别人的锁给删除了
分布式锁演进-阶段五-最终形态
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1、占分布式锁。去redis占坑
String uuid = UUID.randomUUID().toString();
// Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid);
// 加锁和设置过期时间保证原子性
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
//加锁成功... 执行业务
//2、设置过期时间,必须和加锁是同步的,原子的
// redisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
try {
dataFromDb = getCatalogJsonFromDB();
} finally {
// TODO 写博客 lua脚本解锁 原子操作
// Redis分布式锁,核心:
// - 加锁和过期时间保证原子性
// - 解锁保证原子性
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
Long delResult = redisTemplate.execute(
new DefaultRedisScript<Long>(script, Long.class)
, Arrays.asList("lock")
, uuid
);
}
return dataFromDb;
} else {
//加锁失败...重试。synchronized ()
//休眠200ms重试
System.out.println("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
}
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
使用Redis实现分布式锁总结
- 加锁和过期时间保证原子性
- 解锁(判断+删除)保证原子性
Redisson—分布式锁
Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间 的协作。
官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
使用
<!-- 以后使用redisson作为所有分布式锁,分布式对象等功能框架-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
@Configuration
public class MyRedissonConfig {
/**
* 所有对Redisson的使用都是通过RedissonClient对象
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson(@Value("${spring.redis.host}") String url) {
//1、创建配置
//Redis url should start with redis:// or rediss://
// rediss:两个s代表安全连接
Config config = new Config();
config.useSingleServer().setAddress("redis://"+url+":6379");
//2、根据Config创建出RedissonClient示例
return Redisson.create(config);
}
}
Redisson的所有操作都是发送Lua脚本,所以都保证了原子性:
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisson() {
//锁的名字: 锁的粒度,越细越快。[锁的一个名字就是一个锁,不同名字就是不同的锁]
//锁的粒度:具体缓存的是某个数据,11-号商品; product-11-lock product-12-lock
// 加锁和设置过期时间保证原子性
// Redisson的所有操作都是发送Lua脚本,所以都保证了原子性
RLock lock = redisson.getLock(RedisConstant.CATEGORY_KEY);
// lock.lock();
lock.lock(10, TimeUnit.SECONDS);
System.out.println("获取分布式锁成功...");
//加锁成功... 执行业务
Map<String, List<Catelog2Vo>> dataFromDb;
try {
dataFromDb = getCatalogJsonFromDB();
} finally {
//删除锁
lock.unlock();
}
return dataFromDb;
}
Redisson可重入锁
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
//2、加锁,不会有死锁问题,而且还会自动续期
lock.lock(); //阻塞式等待,知道获取到锁。Redisson有一个看门狗,可以给锁自动续期,默认加的锁都是30s时间。
//1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
// lock.lock(10,TimeUnit.SECONDS); //10秒自动解锁,无需unlock。自动解锁时间一定要大于业务的执行时间。
//问题:lock.lock(10,TimeUnit.SECONDS); 在锁时间到了以后,不会自动续期。
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
//2、如果我们未指定锁的超时时间,就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间】;
// 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期,续成30s
// internalLockLeaseTime【看门狗时间】 / 3,10s
//最佳实战
//1)、lock.lock(30,TimeUnit.SECONDS);省掉了整个续期操作。手动解锁
try {
// 业务代码
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000); //模拟超长任务
} catch (Exception e) {
e.printStackTrace();
} finally {
//3、解锁 假设解锁代码没有运行,redisson会不会出现死锁 答案:不会出现死锁
System.out.println("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
lock.lock(); 加锁,不会有死锁问题,而且还会自动续期
- 阻塞式等待,直到获取到锁。Redisson有一个看门狗,可以给锁自动续期,默认加的锁都是30s时间。
- 锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长导致锁自动过期会被删掉
- 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
lock.lock(10,TimeUnit.SECONDS); 10秒自动解锁,无需unlock。自动解锁时间一定要大于业务的执行时间。
- 在锁时间到了以后,即使业务没有执行完成,也不会自动续期。
- 如果我们传递了锁的超时时间,就发送给redis执行Lua脚本,进行占锁,默认超时时间就是我们指定的时间
- 如果我们未指定锁的超时时间(
lock.lock()
),就使用30 * 1000【LockWatchdogTimeout
看门狗的默认时间就是30秒】;只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间30s】,每隔10s【internalLockLeaseTime / 3:三分之一的看门狗时间】都会自动再次续期,续成30s
最佳实战:lock.lock(30,TimeUnit.SECONDS);
省掉了看门狗的整个续期操作。业务时间给大一点然后手动解锁
Redisson读写锁
改数据加写锁,读数据加读锁
//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁,大家都能用,加了跟没加一样
//写锁没释放读就必须等待
// 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
// 写 + 读: 等待写锁释放
// 写 + 写: 阻塞方式
// 读 + 写: 有读锁。写也需要等待。
// 只要有写的存在,都必须等待
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = lock.writeLock();
//1、改数据加写锁,读数据加读锁
rLock.lock();
try {
System.out.println("写锁加锁成功..." + Thread.currentThread().getId());
s = UUID.randomUUID().toString();
Thread.sleep(30000);
stringRedisTemplate.opsForValue().set("writeValue", s);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("写锁释放" + Thread.currentThread().getId());
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//加读锁
RLock rLock = lock.readLock();
rLock.lock();
try {
System.out.println("读锁加锁成功" + Thread.currentThread().getId());
s = stringRedisTemplate.opsForValue().get("writeValue");
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("读锁释放" + Thread.currentThread().getId());
}
return s;
}
读写锁可以保证一定能读到最新数据
- 写锁是一个排他锁(互斥锁、独享锁)。
- 读锁是一个共享锁,大家都能用,加了跟没加一样
- 写锁没释放读就必须等待
只要有写的存在,都(写/读)必须等待;有读锁。写也要等待。
- 读 + 读: 相当于无锁,并发读,只会在redis中记录好所有当前的读锁。他们都会同时加锁成功
- 写 + 读: 等待写锁释放才能读
- 写 + 写: 阻塞方式
- 读 + 写: 有读锁。写也需要等待。
Redisson信号量
信号量也可以用作分布式限流
/**
* 车库停车,
* 3车位[事先在redis中放key:park,value:3]
* 信号量也可以用作分布式限流;
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
// park.acquire();//获取一个信号,获取一个值,占一个车位
boolean b = park.tryAcquire();
if (b) { //信号量也可以用作分布式限流
//执行业务
} else {
return "error";
}
return "ok=>" + b;
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release();//释放一个车位
return "ok";
}
Redisson闭锁
/**
* 放假,锁门
* 1班没人了,2班没人了......
* 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假了...";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();//计数减一;
return id + "班的人都走了...";
}
数据库与缓存数据一致性问题
通过加锁解决了缓存击穿,确保了读取缓存没问题之后
有一个新的问题:那就是:缓存里面的数据如何和数据库保持一致,也就是缓存数据一致性
缓存数据一致性的原因都是:数据库的最后一次更新没有放到Redis缓存中,导致数据库和缓存内容不一致
我们只需要确保 最终一致性,放在缓存中的数据,允许读到的最新数据有延迟
双写模式
-
将 {写数据库和写缓存} 放在一起加分布式锁,可以解决这一问题
-
如果给缓存设置了过期时间,我们甚至可以不用管这个问题,当然,这取决于 我们对数据实时性查看的容忍性
失效模式
缓存数据一致性-解决方案
- 无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
- 1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小(用户点击相比于电脑还是比较慢的),不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
- 2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
- 3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
- 4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据,就不用加读写锁了);
总结:
- 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
- 我们不应该过度设计,增加系统的复杂性
- 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
缓存数据一致性-解决-分布式读写锁
分布式读写锁。读数据等待写数据整个操作完成
缓存数据一致性-解决-Canal
Canal是阿里开源的一个中间件,相当与一个数据库的从服务器
我们系统的一致性解决方案
1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新
2、读写数据的时候,加上分布式的读写锁。 经常写,经常读会有极大的影响;偶尔写,经常读影响不大(读写锁的读锁相当于无锁)
SpringCache
简介
-
Spring 从 3.1 开始定义了 org.springframework.cache.Cache 和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术; 并支持使用 JCache(JSR-107)注解简化我们开发;
-
Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接 口 下 Spring 提 供 了 各 种 xxxCache 的 实 现 ; 如 RedisCache , EhCacheCache , ConcurrentMapCache 等;
-
每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已经被调用过;
-
如果有就直接从缓存中获取方法调用后的结果
-
如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。
基础概念
注解
开启缓存功能 @EnableCaching
只需要使用注解就能完成缓存操作
@Cacheable
: Triggers cache population.:触发将数据保存到缓存的操作@CacheEvict
: Triggers cache eviction.:触发将数据从缓存删除的操作 【失效模式】@CachePut
: Updates the cache without interfering with the method execution.:不影响方法执行更新缓存 【双写模式】@Caching
: Regroups multiple cache operations to be applied on a method.:组合以上多个操作@CacheConfig
: Shares some common cache-related settings at class-level.:在类级别共享缓存的相同配置
表达式语法
整合
引入依赖:spring-boot-starter-cache、spring-boot-starter-data-redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
默认配置:
- 如果缓存中有,方法不用调用。
- key默认自动生成 category::SimpleKey [] ==> 缓存的名字::SimpleKey [] (自主生成的key值)
- 缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
- 默认ttl时间 -1;代表永不过期
自定义配置类:
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching
public class MyCacheConfig {
/**
* 配置文件中的东西没有用上;
*
* 1、原来和配置文件绑定的配置类是这样子的
* @ConfigurationProperties(prefix = "spring.cache")
* public class CacheProperties{ }
* 2、要让他生效
* @EnableConfigurationProperties(CacheProperties.class)
*
*/
//1、
// @Autowired
// CacheProperties cacheProperties;
//2、
// Spring会自动注入方法参数进去
// 缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
// 默认ttl时间 -1;代表永不过期
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
// config = config.entryTtl();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
//将配置文件中的所有配置都生效
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
application.properties:
spring.cache.type=redis
#如果配置了前缀就用我们配置的,如果没有就默认使用缓存的名字作为前缀
# 不推荐指定前缀,不指定前缀可以让分区作为前缀
#spring.cache.redis.key-prefix=gulimall_cache_
spring.cache.redis.use-key-prefix=true
#是否缓存空值[可以用于防止缓存穿透]
spring.cache.redis.cache-null-values=true
#存活时间以毫秒为单位
spring.cache.redis.time-to-live=3600000
使用
// @Cacheable(value = {"category"}, key = "#root.method.name")
@Cacheable(value = {"category"}, key = "#root.method.name", sync = true)
// @Cacheable(value = {"category"}, key = "'listLevel1Categorys'", sync = true)
// @Cacheable(value = {"category"})
@Override
public List<CategoryEntity> listLevel1Categorys() {
System.out.println("listLevel1Categorys查询了数据库.....");
List<CategoryEntity> categoryEntities = this.list(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
return categoryEntities;
// return null; // 测试缓存空结果
}
@Override
// 指定生成的缓存使用的key:key属性指定接受一个SpEL表达式
// @Cacheable(value = "category", key = "#root.methodName")
@Cacheable(value = "category", key = "#root.methodName", sync = true)
public Map<String, List<Catelog2Vo>> getCatalogJson() {
System.out.println("查询了数据库.....");
//封装数据
Map<String, List<Catelog2Vo>> catgoryData = loadDB();
return catgoryData;
}
/**
* 级联更新所有关联的数据
* @CacheEvict:失效模式
* 1、同时进行多种缓存操作 @Caching
* 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true)
* 3、存储同一类型的数据,都可以指定成同一个分区(这个分区只是Spring逻辑划分的,Redis物理存储并没有)。
* 这样做的好处就是当我们修改了这个类型的数据,我们就直接可以将分区里跟这个类型有关的所有数据都删除掉
* 分区名默认就是缓存的前缀(不在application.properties中指定前缀)
*/
// @Caching(evict = {
// @CacheEvict(value = "category", key = "'listLevel1Categorys'"),
// @CacheEvict(value = "category", key = "'getCatalogJson'")
// })
// 或者
@CacheEvict(value = "category",allEntries = true) //缓存失效模式 //allEntries:删除所有category的缓存
// @CachePut //缓存双写模式 : 方法没有返回值不支持双写模式
@Transactional
@Override
public void updateCascade(CategoryEntity category) {
this.updateById(category);
//TODO 当category表中的某些字段被其他表使用,并且该字段发生了更新,那么也需要修改那些表的信息
if (!StringUtils.isEmpty(category.getName())) { // 分类名发生了变化
categoryBrandRelationService.updateCategory(category.getCatId(), category.getName());
}
}
每一个需要缓存的数据我们都来指定要放到那个名字的缓存。【缓存的分区(推荐按照业务类型分)】
存储同一类型的数据,都可以指定成同一个分区(这个分区只是Spring逻辑划分的,Redis物理存储并没有)。这样做的好处就是当我们修改了这个类型的数据,我们就直接可以将分区里跟这个类型有关的所有数据都删除掉
分区名默认就是缓存的前缀(不推荐在application.properties中指定前缀)
Spring-Cache的不足
读模式:Spring-Cache全部考虑到了
- 缓存穿透:查询一个缓存和数据库都为null的数据。解决:缓存空数据;cache-null-values=true
- 缓存击穿:大量并发进来同时查询一个正好过期的数据。
- 解决:加锁;?默认是无加锁的;
- sync = true(加本地锁【本地锁:有多少服务就有多少锁就有多少数据库操作被放行,这样问题不大,不会有超多请求查询数据库】,可以解决击穿)
- 缓存雪崩:大量的key同时过期。
- 解决:加随机时间。
- 我们只需要加上过期时间就行
- (存储的时间不一样,那么过期的时间自然也就不一样):time-to-live=3600000
写模式:Spring-Cache并没有考虑,我们自己可以考虑:
- 读写加锁。(适用于读多写少的系统)
- 引入Canal,感知到 MySQL 的更新去更新数据库
- 加上过期时间,一段时间后缓存与数据库最终一致即可(看自己对与系统数据实时性观察的容忍性了)
- 读多写多,直接去数据库查询就行,别用缓存
总结:
- 常规数据(读多写少,即时性,一致性要求不高的数据);完全可以使用Spring-Cache;
- 读模式:Spring-Cache全部解决了问题;
- 写模式(只要缓存的数据有过期时间就足够了)
- 特殊数据(比如实时性要求高/经常写/…):特殊设计
参考
雷丰阳: Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目.
本文完,感谢您的关注支持!