1.压力测试出的内存泄漏及解决(可跳过)
使用jmeter对查询产品分类列表接口进行压力测试,出现了堆外内存溢出异常。
我们设置的虚拟机堆内存100m,并不是堆外内存100m
产生堆外内存溢出:OutOfDirectMemoryError
原因是因为:
springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。
lettuce的bug导致netty堆外内存溢出,-Xmx300m, netty如果没有指定堆外内存,默认使用-Xmx300m作为堆外内存。
由于在高并发的时候,获取的数据量非常大,高并发一进来,由于数据在传输期间都要占内存,导致内存分配不足,堆外内存溢出。
解决方案:不能使用-Dio.netty.maxDirectoryMemory只去调大堆外内存(只是延缓了问题的出现)
1)、升级lettuce客户端, 2)、切换使用Jedis
lettuce,jedis操作redis的底层客户端,spring再次封装redisTemplate
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
2.高并发下缓存失效的问题
2.1.缓存穿透
缓存穿透: 指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是 数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不 存在的数据每次请求都要到存储层去查询,失去了缓存的意义
风险: 利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
解决: null结果缓存,并加入短暂过期时间
2.2.缓存雪崩
缓存雪崩: 缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时 压力过重雪崩。
解决: 原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这 样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。
2.3.缓存击穿
缓存穿透:
- 对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
- 如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
解决: 加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db
* 1.空结果缓存,解决缓存穿透
* 2.设置过期时间(加随机值),解决缓存雪崩
* 3.加锁,解决缓存击穿
前两个都好解决,对于加锁的问题,如果锁没加好,又会有新的问题
直接加synchronized锁
// 只要是同一把锁,就能锁住需要这个锁的所有线程
// 1.synchronized(this):,springboot所有组件(这里是CategoryServiceImpl)在容器中都是单例的
// TODO 本地锁,syncrhonized, JUC(Lock) 分布式情况下,想要锁住所有,必须使用分布式锁
synchronized (this) {
return getDataFromDb();
}
得到锁以后,应该再去缓存中确定一次,如果没有才需要继续查询。
String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
if (!StringUtils.isEmpty(catelogJSON)) {
// 如果缓存不为空,直接返回
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}
分布式下如何加锁
TODO 本地锁,syncrhonized, JUC(Lock) 分布式情况下,想要锁住所有,必须使用分布式锁
注意锁的时序问题
要保证查数据库和把结果放入缓存是原子操作,是在同一把锁内进行的,否则就会导致释放锁的时序问题,导致多次查询数据库。
开启多个服务的时候只能锁住本地服务
3.分布式锁的原理与使用
分布式锁演进-基本原理
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。 等待可以自旋的方式。
占坑在redis中通常使用set命令,set的介绍如下:
https://www.cnblogs.com/zhouj850/p/10949359.html
SET key value [EX seconds] [PX milliseconds] [NX|XX]
设置k-v值
EX过期时间,PX毫秒,NX指的是Not Exist,不存在才给里面放。
所以使用SETNX命令可以进行占坑操作。
分布式锁演进-阶段一
代码如下:
setnx占好了位,业务代码异常或者程序在页面过程 中宕机。没有执行删除锁逻辑,这就造成了死锁
放在finally里面,也不行,机器宕机也会导致锁无法释放。
解决:设置锁的自动过期,即使没有删除,会自动删除
分布式锁演进-阶段二
代码如下:
问题: 1、setnx设置好,正要去设置过期时间,宕机。又死锁了。
解决: 设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令
redis设置锁的过期时间的命令:
set lock 1111 EX 1000 NX
查询锁的过期时间命令:
ttl lock
设置过期时间,必须和加锁是同步的、原子的
分布式锁演进-阶段三
代码如下:
问题:
1、删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们 直接删除,有可能把别人正在持有的锁删除了。
解决: 占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。
分布式锁演进-阶段四
问题:
1、如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁
获取值对比+对比成功删除锁,一定要是原子操作。
解决: 删除锁必须保证原子性。使用redis+Lua脚本完成
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
**保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 **
所有代码如下:
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
// 1.占分布式锁,去redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("Lock", uuid, 300, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
System.out.println("获取分布式锁成功...");
try {
// 加锁成功。。。执行业务
// 设置过期时间,必须和加锁是同步的、原子的
//stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
dataFromDb = getDataFromDb();
} finally {
// 获取值对比+对比成功删除锁,一定要是原子操作。
// String lockValue = stringRedisTemplate.opsForValue().get("lock");
// if (uuid.equals(lockValue)) {
// // 删除自己的锁
// stringRedisTemplate.delete("lock");
// }
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
// 删除锁
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
}
return dataFromDb;
} else {
// 加锁失败。。。重试synchronized(), 重试自旋
// 休眠100ms重试
System.out.println("获取分布锁失败,,,");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getCatalogJsonFromDbWithRedisLock(); // 自旋的方式
}
}
更难的事情,锁的自动续期
Redis简介-整合
分布式场景下,还需要用到读写锁、信号量等机制,使用上述方案只能实现简单的分布式锁,因此我们需要借助Redissson框架。完成我们所有的分布式功能。
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
参考文档:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0
<!--以后使用redisson作为所有分布式锁,分布式对象功能框架-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
将redisson作为分布式锁等功能框架
-
1)、引入依赖
-
2)、配置redisson
Redisson的配置
@Configuration
public class MyRedissonConfig {
/**
* 所有对redissson的使用都是通过RedissonClient对象
* @Bean给容器中放一个组件对象
* **/
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
// 创建配置(单节点配置)
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 根据config创建出redissonClient示例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
Redisson-lock锁
简介
可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock
接口(和本地锁一样的使用方式)。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RLock lock = redisson.getLock(“anyLock”);
// 最常见的使用方法
lock.lock();
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
测试
Redisson的优点:
1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删
2、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除。
lock.lock(100, TimeUnit.SECONDS);
//10秒自动解锁,自动解锁的时间一定要大于业务的执行时间。
问题,lock.lock(10, TimeUnit.SECONDS)
,在锁时间到了以后,不会自动续期。
1.如果我们传递了锁的超时时间,发送给redis执行脚本,进行占锁,默认超时就是我们指定时间
2.如果我们未指定锁的超时时间,就使用30*1000【LockWatchDogTimeout看门狗的默认时间】
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续期满30s
internalLockLeaseTime【看门狗时间】/ 1/3, 10s
测试代码
@ResponseBody
@GetMapping("/lock")
public String lock() {
// 1.获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
// Redisson的优点:
// 1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删
// 2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除。
// lock.lock(); // 阻塞式等待,默认加的锁都是30s
lock.lock(100, TimeUnit.SECONDS); //10秒自动解锁,自动解锁的时间一定要大于业务的执行时间。
// 问题,lock.lock(10, TimeUnit.SECONDS),在锁时间到了以后,不会自动续期。
// 1.如果我们传递了锁的超时时间,发送给redis执行脚本,进行占锁,默认超时就是我们指定时间
// 2.如果我们未指定锁的超时时间,就使用30*1000【LockWatchDogTimeout看门狗的默认时间】
// 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续期满30s
// internalLockLeaseTime【看门狗时间】/ 1/3, 10s
// 最佳实战,
// 1).lock.lock(10,TimeUnit.SECONDS), 省掉了续期操作,手动解锁
try{
System.out.println("加锁成功,执行业务" + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
} finally {
// 3.解锁 假设代码没有运行,redisson会不会出现死锁
System.out.println("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
lock.lock(10,TimeUnit.SECONDS), 省掉了续期操作,手动解锁
读写锁(ReadWriteLock)
简介:
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
大家都知道,如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
测试
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁, 独享锁),读锁是一个共享锁
写锁没释放读就必须等待
- 读 + 读 相当于无锁,并发读,只会在redis记录好,所有当前的读锁,他们都会同时加锁成功
- 写 + 读,等待写锁释放
- 写 + 写,阻塞方式
- 读 + 写,有读锁,写也需要等待
只要有写的存在,都必须等待
测试代码如下:
@GetMapping("/write")
@ResponseBody
public String writeValue() {
String s = "";
RReadWriteLock lock = redisson.getReadWriteLock("rw_lock");
RLock rLock = lock.writeLock();
try {
rLock.lock();
// 改数据加写锁,读数据加读锁
s = UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue() {
// 读写锁还是同一把读写锁
RReadWriteLock lock = redisson.getReadWriteLock("rw_lock");
// 加读锁
RLock rLock = lock.readLock();
// 加读锁
String s = "";
try {
rLock.lock();
s = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
闭锁(CountDownLatch)
简介
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
测试
/**
* 放假,锁门
* 5个半全部走完,我们可以锁大门
* **/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假了...";
}
@GetMapping("/gogogo/{id}")
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); // 计数减1
return id + "班的人都走了....";
}
信号量(Semaphore)
简介
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
测试
/**
* 车库停车
* 3车位
* 信号量可以用来做分布式限流
* **/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
// park.acquire(); // 获取一个信号,获取一个值,占一个车位
boolean b = park.tryAcquire();
if (b) {
// 执行业务
} else {
}
return "ok";
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release(); // 释放一个车位
return "ok";
}