前言
最近因为刚入职公司开启自己的实习生涯,工作和毕设论文同步进行,导致有段时间没更新博客了,今天来分享一下最近学到的一些知识。
场景介绍
BOSS让我写一些接口,他提出这样一个需求,该接口的参数有多个,其中包含shopname参数,该参数要根据调用者传入的shopname再在后边拼接一个id,作为一个新的字段来作为参数去调用别的接口。而且要保证ID的唯一性。因为BOSS要我写的接口是我们公司内部用的系统的接口,并发量不会太大,但我想万一真有两个人近乎一起用了该接口,该怎样保证获取的id的唯一性。于是就有了下面的经历。
该场景需要从Redis中获取一个唯一id并修改它的值,因为要保整其获取的id的唯一性,又因有一定的并发量,所以采用先占位再获取的方法,防止id重复。
待优化代码
这段代码是我本来自己写的,后来感觉质量不是太高,总觉得能有优化的地方,但由于公司就我一个Python,没办法去问别人,于是就去问ChatGPT。源代码如下。
# 获取当前id_flag,采取先占位再获取的方法,防止id重复
def get_curentid(report_num:int) -> int:
def get_titleid_flag() -> int:
titleid_flag = int(r.get("titleid_flag"))
return titleid_flag
def set_titleid_flag(old_titleid_flag, new_titleid_flag: int) -> bool:
if old_titleid_flag == get_titleid_flag():
r.set("titleid_flag", new_titleid_flag)
return True
else:
return False
time_pull = time.time()
time_end = time_pull + 5
old_titleid_flag = get_titleid_flag()
while True:
if set_titleid_flag(old_titleid_flag, old_titleid_flag + report_num):
return old_titleid_flag
elif time.time() < time_end:
old_titleid_flag = get_titleid_flag()
continue
else:
return -1
代码解释:
该函数是用来当前 id_flag 的方法。采用了先占位再获取的策略,以防止 id 的重复使用。下面是简单的解释:
- get_curentid(report_num:int) -> int: 这是主函数,它接受一个整数参数 report_num,表示要修改的id值,并返回一个整数作为当前的 id_flag。
- get_titleid_flag() -> int: 这个函数从Redis中获取当前的 titleid_flag,并将其转换为整数后返回。
- set_titleid_flag(old_titleid_flag, new_titleid_flag: int) -> bool: 这个函数尝试将 titleid_flag 的值从旧值更新为新值。在更新前先查询旧值有没有变化,若无变化则更新。如果更新成功,则返回 True,否则返回 False。
- 在主函数中通过循环不断尝试更新 titleid_flag 的值,直到更新成功或者超时时间到达。超时时间为 5 秒。
- 如果成功更新了 titleid_flag 的值,则返回旧的 titleid_flag,表示成功获取到了 id_flag。如果超时未能成功更新,则返回 -1,表示获取失败。
总之,这个函数通过Redis中的一个标识值来确保获取到的 id_flag 是唯一的,并且采用了一定的重试机制来应对可能的竞争条件或者网络延迟导致的更新失败情况。
然后问了ChatGPT后,他给了我一下优化建议。
代码可优化方向:
因为对原子操作和分布式锁之前没了解过,所以特意去搜索了解了一下。
原子操作介绍:
分布式锁介绍
一些好的提高并发度和性能的算法或方案
之后又问了ChatGPT有没有什么好的方案,他给出了一下方案,总结一下就是;确保访问唯一性,减少访问频次,限制并发数量。具体回答如下:
令牌桶算法和漏桶算法
除此之外,还了解了两个新的关于限流的算法。
优化后的代码
基于上述信息,我对原本的代码进行了下列优化,利用Redis的原子操作来优化这段代码,使用 Redis 的 SETNX(SET if Not eXists)命令来实现分布式锁。SETNX 命令可以在 key 不存在的情况下设置 key 的值,如果 key 已经存在,则不进行任何操作。
import redis
import time
# 创建 Redis 客户端连接
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 获取铺位评估报告当前 id_flag,采取先占位再获取的方法,防止id重复
def get_curentid(report_num: int) -> int:
# 设置锁的过期时间,防止锁未正常释放导致死锁
lock_timeout = 5 # 5秒
# 生成锁的键名
lock_key = "titleid_flag_lock"
# 初始化等待时间
wait_time = 0.1
# 循环尝试获取锁
while True:
# 使用 SETNX 命令尝试获取锁
lock_acquired = redis_client.setnx(lock_key, "locked")
if lock_acquired:
# 如果成功获取锁,则设置锁的过期时间
redis_client.expire(lock_key, lock_timeout)
# 获取当前 id_flag
current_id_flag = int(redis_client.get("titleid_flag") or 0)
new_id_flag = current_id_flag + report_num
redis_client.set("titleid_flag",new_id_flag)
# 释放锁
redis_client.delete(lock_key)
return current_id_flag
else:
# 如果获取锁失败,则等待一段时间后重试
time.sleep(wait_time)
# 等待时间指数增加
wait_time *= 2 # 指数增长,可以根据实际情况调整
return -1 # 获取失败时返回 -1
# 测试代码
report_num = 10
current_id = get_curentid(report_num)
print("Current ID Flag:", current_id)
在这个优化版本中,通过使用 Redis 的 SETNX 命令来获取分布式锁,避免了之前循环重试的方式,提高了效率。同时,在获取锁成功后,设置了锁的过期时间,以防止锁未正常释放导致的死锁问题。同时为了引入指数退避策略,可以在获取锁失败后进行等待时间的指数增加。这样可以减少频繁重试锁的获取,降低系统负载,提高效率。
但是由于只是在开发环境下测试,也没办法模拟高并发情况来对比两段代码的运行结果,理论上优化后的是由优于前者的,但本地测试感官上是差不多。没人带真难受,老代码是我入职第一天晚上自己要的,服务器宝塔面板是第二天自己主动要的,Redis密码是自己在配置文件里查的,遇到问题也是自己解决的,我这两排工位没一个同事,感觉赛博孤岛似的,