1. 无锁场景
下面是一个扣减库存逻辑, 由于查库存和扣减库存两个操作不是原子的,明显存在并发超卖问题
// 假设初始库存200
@GetMapping("/stock")
public String stock(@RequestParam(value = "name", defaultValue = "World") String name) {
String key = "product:101";
Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));
if (stock > 0) {
stock = stock - 1;
redisTemplate.opsForValue().set(key, stock.toString());
System.out.println("成功扣减库存, 还剩" + stock);
} else {
throw new RuntimeException("缺货");
}
return "200";
}
压测结果: 1000人抢200库存商品, 卖出731件,存在超卖问题
2. 单机环境,加synchronized锁
private static Object STOCK_LOCK = new Object();
// 假设初始库存200
@GetMapping("/stock")
public String stock(@RequestParam(value = "name", defaultValue = "World") String name) {
String key = "product:101";
synchronized (STOCK_LOCK) {
Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));
if (stock > 0) {
stock = stock - 1;
redisTemplate.opsForValue().set(key, stock.toString());
System.out.println("成功扣减库存, 还剩" + stock);
return "200";
}
}
throw new RuntimeException("缺货");
}
压测结果:1000人抢200库存商品, 卖出200件,用例成功
3. 分布式环境,加synchronized锁
准备:这里启动两个节点, 用nginx负载均衡
压测结果:1000人抢200库存商品, 卖出310件,存在超卖问题
4. 分布式环境,redis setnx分布式锁
基础版
主要代码逻辑:
- 用setIfAbsent(setnx封装)加锁,同时设置超时时间,锁力度到具体商品
- 获取锁后执行减库存逻辑
- 执行成功释放锁
代码:
// 假设初始库存200
@GetMapping("/stock2")
public String stock2(@RequestParam(value = "name", defaultValue = "World") String name) {
String key = "product:101";
String lockKey = "lock:" + key;
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS);
if (result) {
try {
Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));
if (stock > 0) {
stock = stock - 1;
redisTemplate.opsForValue().set(key, stock.toString());
System.out.println("成功扣减库存, 还剩" + stock);
return "200";
}
} finally {
redisTemplate.delete(lockKey);
}
}
throw new RuntimeException("缺货");
}
压测结果:1000人抢200库存商品, 卖出182件,剩余库存18件,业务正常
在低并发,服务器理想情况下, 业务正常,但是还存在一些问题
问题1
现在写死的锁过期时间30秒,但是在服务器压力大时, 接口耗时不稳定, 可能超过过期时间, 锁自动失效, 可能导致超卖
解决:锁续命, 开启一个后台线程, 如果业务没执行完,给锁延长过期时间.
问题2
A线程业务执行完, 准备释放锁时, 肯能刚好锁自动过期,这时候B线程进来抢占到锁正在执行业务,A线程开始删除锁, 此时其他线程都可能去拿到锁,保证不了同步
解决: 释放锁时,判断只有加锁线程才有资格去删除锁
@GetMapping("/stock3")
public String stock3(@RequestParam(value = "name", defaultValue = "World") String name) {
String key = "product:101";
String lockKey = "lock:" + key;
String clientId = UUID.randomUUID().toString();
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
if (result) {
try {
Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));
if (stock > 0) {
stock = stock - 1;
redisTemplate.opsForValue().set(key, stock.toString());
System.out.println("成功扣减库存, 还剩" + stock);
return "200";
}
} finally {
// 只能删除自己加的锁, 不让其他线程删
if (clientId.equals(redisTemplate.opsForValue().get(lockKey))) {
/* ... */
redisTemplate.delete(lockKey);
}
}
}
throw new RuntimeException("缺货");
}
问题3
但是问题2还没彻底解决, 因为比较clientId和删除锁这两个操作不是原子的, 如果中间卡顿,卡顿期间锁刚好自动过期,其他线程占有锁, 这里再执行删除锁就会误删别人锁.
解决: 可用lua脚本执行批量命令,保证原子性
Redisson分布式锁
Redisson是专门处理分布式场景使用Redis的组件, 里面就封装了锁续命,只删自己加的锁,lua脚本,锁重入等功能.
示例:
@Bean
public Redisson redisson(RedisProperties redisProperties) {
// 此为单机模式
Config config = new Config();
config.useClusterServers().setNodeAddresses(redisProperties.getCluster().getNodes()
.stream().map(node -> "redis://" + node).collect(Collectors.toList()));
return (Redisson) Redisson.create(config);
}
@Autowired
private Redisson redisson;
// 假设初始库存200
@GetMapping("/stock4")
public String stock4(@RequestParam(value = "name", defaultValue = "World") String name) {
String key = "product:101";
String lockKey = "lock:" + key;
RLock rLock = redisson.getLock(lockKey);
// 尝试加锁, 加锁失败会间歇阻塞再次加锁, 直至成功
rLock.lock();
try {
Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));
if (stock > 0) {
stock = stock - 1;
redisTemplate.opsForValue().set(key, stock.toString());
System.out.println("成功扣减库存, 还剩" + stock);
return "200";
}
} finally {
rLock.unlock();
}
throw new RuntimeException("缺货");
}
压测结果:1000人抢200库存商品, 卖出200件,用例成功