标题
😀😀😀创作不易,各位看官点赞收藏.
文章目录
- 标题
- Redis 基础笔记
- 1、安装及环境搭建
- 2、Redis 数据类型
- 2.1、String
- 2.2、List
- 2.3、Hash
- 2.4、Set
- 2.5、Zset
- 2.6、BitMap
- 2.7、HyperLogLog
- 2.8、Geospatial
- 2.9、Stream
- 3、Redis 持久化
- 3.1、RDB
- 3.2、AOF
- 4、Redis 事务
- 5、Redis 管道
- 6、Redis 发布与订阅
- 7、Redis 主从复制
- 7.1、Redis 主从复制
- 7.2、Redis 哨兵模式
- 8、Redis 集群
- 8.1、Redis 集群理论
- 8.2、Redis 集群搭建
- 8.2.1、Redis 三主三从集群搭建
- 8.2.2、Redis 集群扩容缩容
- 9、 Spring Boot 整合 Redis
- 9.1、整合单机版
- 9.2、整合集群版
- 10、应用问题
- 11、分布式锁
Redis 基础笔记
redis:是一个 NoSql (Not Only Sql)非关系型数据库,不依赖业务逻辑方式储存,以简单的key-value方式进行存储。可以配合关系型数据库进行缓存数据,数据操作主要是在内存中而且一些特殊场景比关系型数据库优越,也需要相互依赖。
1、安装及环境搭建
redis 官网:https://redis.io/
redis 命令文档:http://doc.redisfans.com/
c语言环境:redis编译需要c语言环境。
yum install gcc # 安装gcc环境
gcc --version # 检查gcc的版本
yum install -y gcc-c++
解压编译 redis:
tar -zxvf redis-7.0.4.tar.gz # 解压
# 解压好进入redis目录
make && make install # 编译
安装成功后,默认在
/usr/local/bin
安装 redis 的相关内容:
- redis-benchmark:性能测试工具。
- redis-check-aof:修复有问题的 AOF 文件。
- redis-check-dump:修复有问题的 dump.rdb 文件。
- redis-sentinel:redis 集群搭建。
- redis-server:redis 服务启动命令。
- redis-cli:客户端,操作入口。
进入目录
/usr/local/bin
,执行 redis-server 启动 redis:
修改redis配置文件:(在redis解压目录下)
- 注释掉 bind 127.0.0.1 这一行(解决只能特定网段连接的限制)
- 将 protected-mode 属性改为 no (关闭保护模式,不然会阻止远程访问)
- 将 daemonize 属性改为 yes (这样启动时就在后台启动)
- 添加 requirepass 123456即可 当然这步完全看心情
- 将 port XXX 修改成自己的端口(默认是6379)
启动redis:
redis-server /opt/redis/redis-7.0.4/redis.conf
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N5wZ6Hqy-1690513503273)(https://s2.loli.net/2022/08/11/KFrWXSNiMjQmEza.png)]
开启访问端口:
firewall-cmd --zone=public --add-port=6379/tcp --permanent # 开放端口
firewall-cmd --reload # 重启防火墙
firewall-cmd --list-ports # 查看开放的端口号
远程连接 redis:配置好上面的信息然后,可以下载一个
Another Redis Desktop Manager
可视化的 redis 管理工具。
redis-cli 客户端连接:
redis-cli -h 连接的ip地址 -p 端口号 -a 连接密码
# 获取修改 redis 配置文件信息命令
config get 配置名称 # 例如 config get port
config set 配置名称 value # 但是这个 redis 重启后这个配置就会失效
2、Redis 数据类型
注意:在 redis 中命令是不区分大小写的,但是 key 名称是区分大小写的。
key 操作命令:
keys 正则表达 # 查看当前库中满足正则表达式的所有key
exists key # 判断某个key是否存在
type key # 查看 key 的类型
del key # 删除 key
unlink key # 非阻塞删除,现将 key 从 keyspace 中删除,后续异步再执行真正的删除
expire key time(单位秒) # 设置key的过期时间
ttl key # 查看可以会有多少秒过期,-1表示永久不会过期,-2表示已过期
get key # 获取某个key的值
move key dbIndex[0-15] # 将 key 移动到指定数据库中
# redis默认16个数据库,下标从0开始(默认初始使用0号数据库)。
select dbid # 切换数据库,例如select 10,这些库的密码都是同一个密码
dbsize # 查询当前数据库的key数量
flushdb # 清空当前库
flushall # 清空所有库
help @类型名称 # 获取对应类型的命令的帮助
2.1、String
String:是 redis 最基本的数据类型,一个 key 对应一个 value。String 是二进制安全的,简单字符串、复杂的 xml/json 的字符串、二进制图像或者音频的字符串、以及可以是数字的字符串,但是一个 key 的 value 值最大是 512MB。
set key value
# 常用参数
set key value ex 秒 px 毫秒 [nx|xx] [get] exat 以unix时间戳单位秒 pxat 以unix时间戳单位毫秒 keepttl(保留过期时间)
# nx:如果这个 key 不存在值设置值,如果存在就不设置。
# xx:key 存在还是会重新设置值
# get:如果key值存在,设置值后返回旧的值
# keepttl:重新设置key值需要保存之前的过期时间
get key # 获取 key 对应的值
append key value # 如果key存在,就向 key 的 value 追加,如果 key 不存在就新添加一个 key
strlen key # 获取value的长度
setnx key value # 如果key不存在就新增一个key,如果存在不会覆盖也不会新增key
# 对于纯数字类型的自增和自减,在redis中,自增和自减是一个原子操作,不会被线程调度打断。
incr key # 自增1
decr key # 自减1
increby key step # 自增step
decrby key step # 自减step
# 其它命令
mset k1 v1 k2 v2 k3 v3。。。。 # 设置多个 key
mget k1 k2 k3。。。 # 获取多个 key 的值
# 设置多个key,和 setnx 类似,所有的 key 都不存在是才会设置成功,有一个 key 存在所有的 key 都会设置失败。(有点原子性效果)
msetnx k1 v1 k2 v2 k3 v3
getrange key start end # 截取 value 的值,start、end是下标,类似substring。前后都包含
setrange key index value # 设置 key 从 index 下标使用 value 开始覆盖对应下标的值
setex key time(秒) value # 在设置key的时候设置过期时间
getset key value # 设置key新值的同时返回旧值
String 类型应用场景:
- 缓存数据,例如缓存用户登录信息数据。
- 计数器,记录网站访问量、点赞数量、api 调用次数等等。
- 验证码,存储用户登录验证码设置一个过期时间。
- 限流设置,记录每个 API 对应人访问量,以此来做限定。
- 分布式锁,可以使用 setnx + expire 来实现分布式锁。
- 分布式 session,对应分布式系统,可以使用 redis 对 session 进行一个集中管理。
String 底层数据结构:
底层是一个动态的字符串,可以修改的字符串。类似 Java 的 ArrayList,采用分配冗余空间减少内存的频繁分配。有一个 len 的阈值,当操作这个阈值就会进行扩容,容量小于 1MB 时会双倍扩容,大于 1MB 之后每次扩容只会增加 1MB,最大容量是 512MB。
2.2、List
List:redis 列表是简单的字符串列表,按照插入顺序进行排序,可以在头和尾部添加数据。它底层就是一个循环链表,在两端操作数据性能较好,但是通过下标检索元素性能较差。
# 常见命令
lpush/rpush key v1 v2 ... # 向 key 的左边/右边添加多个值,如果key不存在就新增key
lpop/rpop key [count] # 从左边/右边去除元素,没有指定count就是一个,指定 count 是几个,就移除几个
lrange key start end # 获取列表中的元素,从start开始,到end结束,0 -1表示获取全部元素
rpoplpush key1 key2 # 从 key1 的右边去除一个元素,然后加入到 key2 的左边
blpop key [timeout] # 阻塞行为,在移除元素时其它客户端会进行阻塞,如果 list 没有元素了就会一直阻塞,timeout 就是设置阻塞时间
# 其它命令
lindex key index # 获取 key 对应 index 下标的元素
llen key # 获取 list 的长度
lrem key n value # 从左边删除 n 个值为 value 的元素
ltrim key start end # 截取下标中 start 到 end 的元素,然后重新给列表赋值
lset key index value # 将下标 index 的元素设置为新的元素
linsert key before/after v1 newvalue # 在元素 v1 的前面/后面新增一个 newvalue 元素,如果存在多个 v1 会在第一个左右进行操作
List 实际应用:
- lpush + lpop:可以作为一个栈,先进后出。
- lpush + rpop:可以作为一个队列,先进先出。
- lpush + brpop:可以用于消息队列,但是存在消息数据丢失问题。(服务把消息取出来,但是在处理时服务出现服务失败,这样消息就会丢失)
- 订阅号功能,例如微信订阅号功能,可以使用 list 去存储用户关注订阅号发布的文章 id。
底层数据结构:
底层使用的是快速链表。在元素比较少的情况下,会使用一段连续空间存储,ziplist(压缩链表)。在元素较多的情况下才会改成快速链表,普通链表需要一部分内存空间去存放指针,redis 将压缩链表和快速链表结合起来了,将多个压缩链表使用快速链表的方式链连接起来,这样就不会出来空间冗余。
2.3、Hash
hash:是一个键值对集合,一个 String 类型的 filed 和 value 的映射表,适合存储对象类似 java 中的 map。用户的 id 作为 key,存储 value 就是用户的信息。
hset key filed value [filed value] # 添加 hash 集合,filed - value 一一对应
hget key filed # 获取 filed 对应的 value
hexists key filed # 判断 key 的 filed 是否存在,1存在,0不存在
hkeys key # 查看 key 中所有的 filed
hvals key # 查看 key 中 filed 对应的所有value
hincrby key filed value # 给filed值加上value(都要为数字)
hsetnx key filed value # 添加一个key中不存在的filed,如果存在就添加不成功
hgetall key # 获取 hash 的所有 filed 和 value
hdel key filed # 删除指定 key 的 filed 属性
hlen key # 获取 key 中 filed 的数量
hash 应用场景:
- 购物车场景,可以将用户购物车信息以 hash 存储。
- 抢购优惠卷。
2.4、Set
Set:是一个元素不可重复的列表,是一个 String 类型的无序集合,底层是一个 value 为 null 的 hash 表,查询、添加、删除的复杂度都是 O(1)。
# 常用命令
sadd key v1 v2 v3 。。。 # 添加一个或多个元素,元素不能重复
smembers key # 查看集合的所有元素
sismember key v # 判断v元素是否存在,1表示存在,0表示不存在
srem key v1 v2 ... # 删除集合中的元素
scard key # 获取集合元素的个数
# 其它命令
srandmember key n # 从集合中随机取出n个值,但是不会从集合中删除
spop key n # 随机从集合中移除n个元素并返回移除的元素
smove key1 key2 v1 # 将集合 key1 中的v1元素移到 key2 集合中,原集合中元素会移除
# 集合运算
sinter key1 [key2 key3..] # 返回key1集合与后面集合的 交集
sunion key1 [key2 key3..] # key1与其他集合的 并集
sdiff key2 [key2 key3..] # key1与其他集合的 差集 (在key1中有,在其他集合中没有)
sintercard num k1 k2 k3... [limit] # 统计 num 个集合的交集集合中不重复个数,limit 显示多少个
Set 应用场景:
- 抽奖小程序,spop 随机抽取中将号码。
- 朋友圈点赞功能,key 消息记录id,然后每个人点赞都把对应用户 id 加到 set 中去。
- 共同联系人,可以采用 set 集合的集合运算找出两个人的共同好友,也可以找出两个人之间可能认识的人。
2.5、Zset
Zset:对 set 进行一个增强,每一个元素都关联了一个 score (评分)。集合根据评分来排序集合,集合中成员是唯一但是评分可以不唯一。
# 常用命令
zadd key score1 value1 score2 value2... # 新增集合中元素,并携带score
zrange key start end [withscores] # 获取集合中start到end下标的元素,如果后面添加withscores会将元素的score也会返回到集合中,默认返回升序排列
zrevrange key start end [withscores] # 相当于是倒叙输出元素
# 如果有 () 表示不包含边界值
zrangebyscore key [(]min max[)] [limit] i1 i2 # 获取到评分在min和max之间的元素,limit 相当于是从下标 i1 开始向后移动i2个元素
zrevrangebyscore key [(]min max[)] [limit] i1 i2 # 将评分在这个区间的元素,按照从大到小进行排序
zsocre key 元素 # 获取元素对应分数
zcount key min max # 统计在这个区间元素的个数
zrem key v1 v2 ... # 从集合中删除元素
zincrby key incr value # 把 value 值对应的评分增加 incr
zrank key value # 返回value在集合中排名,从0开始
zrevrank key value # 分数倒序排列,value值排第几
# 7.0以后版本有的命令
zmpop num1 [key...] min count num2 # 把num1个set集合中,每个集合移除num2个最小分数的元素
ZSet 应用场景:
- 延时队列,将 socre 设置成需要执行的时间戳,按照时间戳进行排序后在循环去消费第一个消息,可以达到延时执行的效果。(没有ACK机制,可能出现消息丢失)
- 排行榜,将排列时间作为 key,排行作品 id 作为 member,数量作为分数进行插入,然后通过 zrevrange 和 zrange 取对应的值。
- 限流,可以使用一种滑动窗口策略,将用户 id 作为 key,访问时间戳作为 member 和 score,我们只需要统计用户 id 在指定时间戳中的个数,就可以得到对应访问频率,然后与最大数进行比较。
2.6、BitMap
Bitmap:可以实现对位的操作,底层是字符串本质是个数组,数组由多个二进制位组成。可以看做成一个以 bit 为单位的数组,只能存储 0 和 1,数组的下标称为偏移量。
setbit key offset 0|1 # 设置一个key的bitmap,offset是偏移量,值只能是0或1(如果偏移量过大,整个初始化过程会比较慢,所以一般在初 一个数字),下标从0开始
getbit key offset # 获取位对应偏移量的值,没有设置对应偏移量的值就返回0
strlen key # 返回位图占用的字节数,每8位一个字节,例如,占用了8为,就返回1,占用了9位就返回2
bitcount key [s,e] # 统计key中值为1的个数,可以指定一个偏移量范围,0 -1表示全部
bitop and|or|not|xor dest key1 key2 .... # 将这些集合进行求 和|或|非|异或 操作,然后将结果放在dest集合中返回记录条数
应用场景:
- 签到场景,偏移量:今天是一年的第几天 % 今年天数,key:年份:用户id。
- 统计用户活跃数,将用户 id 作为偏移量,来记录今天是否登录系统。
- 实现布隆过滤器。
2.7、HyperLogLog
HyperLogLog:用来做基数统计(集合中不重复元素个数)的算法,再输入元素数量很大时,计算基数时所需要的的空间总是固定的并且很小只需要 12KB 左右,它只计算出集合中元素不重复个数并不会记录这些元素,存在 0.81% 的计算误差。
pfadd key e1 e2 .... # 添加元素,如果基数发生变化返回1,否则返回0
pfcount key # 计算集合中不重复元素个数,即集合基数
pfmerge newkey k1 k2 ... # 将k1 k2 ... 的 HyperLogLog 合并成新的一个 HyperLogLog
应用场景:
- 统计网站实际访问数,同一个访问 IP 记录成一个。
- 统计网站在线人数,实时 UV 数。
- 统计用户每天搜索关键词个数,只能记录个数不能记录具体关键词。
2.8、Geospatial
Geospatial:提供经纬度设置、查询、范围查询、距离查询,经纬度 hash 等操作,它是 zset 类型。
geoadd key 经度1 纬度1 name1 经度2 纬度2 name2 。。。 # 添加地理位置,name是地理名称
# 经度范围-180到180,纬度范围-85到85,如果超出给定范围会返回一个错误,重复名称不能添加
geopos key name # 返回name对应的经纬度信息
geodist key name1 name2 [m|km|ft|mi] # 返回这两个地理位置的直线距离,默认单位是米,也可以指定单位千米、英尺、英里
# withcoord:把经纬度也返回;withhash:经纬度已hash编码形式返回;withdist:返回距离;count num:返回num条记录
georadius key 经度 纬度 radius [m|km|ft|mi] [withdist] [withcoord] [withhash] [count num] # 查询在给定经纬度为中心,radius为半径中的元素
geohash key name # 将地点的经纬度值使用hash编码返回一个映射
georadiusbymembers key name 。。。。# 与georadius类似,只不过是以指定name为中心
2.9、Stream
实现消息队列,支持消息持久化、生成全局唯一消息 ID、支持 ack 确认消息模式、支持消费组模模式,是一种用于消息队列的数据类型。
名称 | 解释 |
---|---|
Message Content | 消息内容,它底层是一个消息链表,将所有需要消费的消息串联起来,每个消息都有一个唯一的 ID 与之对应。 |
consumer group | 消费组,通过 xgroup create 创建,同一个消费组可以有多个消费者 |
last_delivered_id | 游标,每个消费组有一个游标,任意一个消费者读取了消息都会使游标往前移动。 |
consumer | 消费者 |
pending_ids[ ] | 是一个数组,用于存放消费者读取了消息,但是没有返回 ack 确认码的消息 ID,保证消息不被丢失 |
# 在消息队列尾部添加一条消息,* 表示由redis生成i消息id,也可以自己指定id,但是必须比前一个消息id大,后面的消息内容是hash结构
xadd key *|id filed1 value1 filed2 value2 ....
# 获取消息,- 表示最小,+表示最大,count num:表示返回多少条消息
xrange key start(-) end(+) [count num]
# 反向获取消息,注意下标位置反转
xrevrange key end(+) start(-) [count num]
# 删除一条消息,根据消息id删除
xdel key id
# 统计消息条数
xlen key
# 限制队列中消息的个数,将id小的删除,最多只保留num个消息
xtrim key maxlen num
# 限制消息最小id,比指定id小的消息全部删除
xtrim key minid id
# 读取消息(默认:非阻塞读取),从指定消息队列key1、key2.。中读取num条消息,$表示当前最新消息,00表示最旧的数据,也可以根据指定消息id获取数据
xread count num streams key1 key2.. [$] [00] [id1 id2 ...]
# 阻塞读取,block表示阻塞读取,millisecond表示阻塞时间,如果是0表示一直阻塞,
xread count num [block millisecode] streams key1 key2.. [$] [00] [id1 id2 ...]
# 创建消费组,指定消费哪一个队列以及指定消费组名称,$表示从头开始消费,0表示从尾部开始消费
xgroup create key groupName $|0
# 创建消费者并读取消息使用groupName组中的consumerName读取num条消息,从key1、key2.。。消息队列中读取,>表示从尚未被消费的消息开始读取并且移动到下一位
xreadgroup group groupName consumerName [count num] streams key1 key2... >
注意:在 Stream 中的消息队列,如果被任意消费组中的任意一个消费者消费后,这个组的其它消费者都不能再对这条 消息进行消费,游标位置会移动到下一位,但是其它消费组中的消费者依然可以读取消息。
消费者 ACK 机制:每个消费组读取消息后都会把消息 id 备份到 pending_ids[ ] 数组中以防止客户端消息处理失败导致消息丢失,只有当客户端执行 xack 命令时才会将消息进行擦除。
# 查看消费组中消息读取但是未 ack 确认的消息,可以指定查看的条数以及指定消费组未 ack 的消息 id
xpending key groupName [start end count consumerName]
# 向消息队列发送消息被消费的 ack,指定消费组中信息id对应消息被消费
xack key groupName id1 id2.。。。
# 打印stream的信息
xinfo stream key
3、Redis 持久化
Redis 的数据是保存在内存中,一旦服务器宕机数据就会全部丢失,这就需要将数据进行持久化到磁盘中。在 Redis 中有两种持久化方式,RDB (Redis DataBase)、AOF (Append Of File)。
3.1、RDB
RDB:在指定的时间间隔内,将内存数据的快照写到磁盘中,恢复时将磁盘中的文件读取到内存,可以在配置文件中修改 RDB 的持久化策略,在 Redis 中默认使用 RDB 持久化方式
# 以下是 Redis 默认的 RDB 持久化策略,也可以自定义
# Redis6.2之前
save 900 1 # 900秒之内发生了一次key变化进行持久化
save 300 10 # 300秒之内发生了10次key变化进行持久化
save 60 10000 # 60秒之内发生了10000次key变化进行持久化
# Redis6.2之后,将持久化频率做了改变
save 3600 1 300 100 60 10000 # 1小时、5分钟、1分钟
# 禁用 rdb 持久化,只需要配置成空串即可
save ""
自动触发 RBD:主要是通过配置文件去修改持久化策略以及 rdb 文件名称和保存路径,修改配置文件后重启服务。
# 设置频率
save 5 1
# 指定 rdb 文件保存路径,这个路径需要提前创建
dir /opt/data
# 指定文件名称
dbfilename 名称
# 其它配置
stop-writes-on-basave-error yes # 当持久化出现错误时,redis就停止写操作
rdbcompression yes # 对于持久化的文件是否进行使用LZF算法进行压缩
rdbchecksun yes # 使用CRC64算法进行数据检查
手动触发 RDB:使用 save 和 bgsave 命令来手动进行数据持久化。
# 这个命令会阻塞 redis 线程,只有当持久化成功后才会继续缓存数据,(生产中不能使用)
save
# 会异步进行持久化操作, 这个就会 fork 子进程去持久化数据
bgsave
# 获取上一次持久化时间,返回时间戳
lastsave
持久化流程:Redis 会单独创建一个子线程 (fork) 进行持久化,先将数据写到一个临时文件等持久化完成以后用这个临时文件去替换上次持久化完成的文件。
优点:
- 适合数据备份以及一些大规模数据恢复工作。
- RDB 文件在内存中加载速度比 AOF 快很多。
缺点:
- 在一定时间时间做一次备份,这时 Redis 意外宕机就可能出现丢失当前时间与最新一次持久化期间的数据。
- 如果 redis 中数据量大,在 fork 子进程时会导致占用的 CPU 资源过多,内存数据会拷贝一份需要2倍数据膨胀,可能导致服务性能降低。
修复 RDB 文件:在 REB 文件迁移的过程中可能存在文件损坏,可以通过 redis 提供的修复命令去修复对应文件。
# 在 /usr/local/bin 目录下使用
redis-check-rdb 修复文件路径
3.2、AOF
AOF:以日志的形式记录每一个写操作,将 Redis 执行过程中的所有写指令记录下来,只许追加到文件上不能修改文件,Redis重启会读取日志文件重新构建数据,就是将日志中的执行从前到后执行一次来恢复数据工作。
# aof默认是不开启的,需要在配置文件中手动开启,而 RDB 默认开启的
appendonly yes # 开启aof持久化
appendfilename "appendonly.aof" # 持久化保存文件,保存路劲在启动目录下
注意:如果 RDB 和 AOP 同时开启,redis 默认使用 AO F持久化,启动 redis 就会默认使用 appendonly.aof 进行数据初始化。
AOF 工作流程:
- 当执行写命令时并不是直接写入 AOF 文件,而是先写到 AOF 命令缓冲区,当命令达到一定数量然后在 IO 到磁盘文件上,这样避免了频繁 IO 操作。
- AOF 缓冲区会根据同步文件三种写回策略将命令同步到磁盘 AOF 文件。
- 避免写入的 AOF 文件过大,AOF 又会根据规则将命令进行合并(AOF 重写),从而达到 AOF 文件压缩目的。
三种写回策略:
- Always:同步写回,每个写命令执行完后立刻将日志写到磁盘文件上。(命令不丢失,IO 频繁)
- everysec:每秒写回,写命令执行完会把日志写到 AOF 缓存区,然后每隔 1s 将缓存区的命令写到磁盘。(默认)
- no:操作系统写回,写命令执行完后会将日志写到 AOF 缓存区,由操作系统决定何时将缓冲区内容写到磁盘。
优点:更好保护数据不丢失、性能高、可做紧急恢复。
缺点:相比较 RDB 数据集文件要大,恢复速度比 RDB 慢一些。
Redis 7 Muti Part AOF 设计:在 Redis7 之前的 AOF 文件有且只有一个,但是在 Redis7 之后底层做了改变,采用多个文件来进行 AOF 持久化,Redis 重写后文件名称就可能发生改变。
- Base:表示基础 AOF,由子进程重写产生,这个文件只有一个。
- Incr:表示增量 AOF,在进行重写时被创建,可能存在多个,这个文件用来记录数据。
- manifest:清单文件,管理、跟踪 AOF 文件。
AOF 文件异常修复:在进行 AOF 持久化时可能存在AOF文件损坏,这样备份文件在初始化数据时就会出错。通过 Redis 提供的
redis-check-aof --fix
进行文件修复,在/usr/local/bin
目录下使用
# 重启redis就可以修复文件了
redis-check-aof --fix aof文件保存路径
AOF 压缩重写:AOF 采用文件追加方式文件可能会越来越大,这时就增加了压缩重写,当文件大小达到一个阈值时就会将文件进行重写,只保留恢复数据的最小指令集。
# 重写就是将之前多个操作重写成一个操作,但是最后的结果是一样的
set k1 v1
set k2 v2
# 就会将上面的重写成一个操作
set k1 v1 k2 v2
# redis 配置文件
no-appendfsync-on-rewrite yes # (不阻塞同步)不写入 aof 文件而是写入缓存,相当于不重写请求不会阻塞,但是在这期间服务器宕机缓存数据会丢失
no-appendfsync-on-rewrite no # 进行重写,重写时请求可能出现阻塞(阻塞同步)
auto-aof-rewrite-min-size 64mb # 阈值为 64mb
auto-aof-rewrite-percentage 100 # 当操作阈值的100%就会进行重写,相当于是文件大小操作128mb就会重写
# AOF 没有到达重写阈值可以手动进行重写操作
bgrewriteaof
4、Redis 事务
Redis事务:是一个单独的隔离操作,事务中的所有命令都会序列化、按照顺序执行,事务执行过程中不会被其他客户端发来的命令打断。串联多个命令防止别的命令插队。在 Redis 中,事务有三个基本命令,multi、exec、discard。
- multi:开始事务,进行命令组队,接下来的所有命令都会按照顺序进行排队。
- exec:执行命令,将排队的命令按照排队顺序执行。
- discard:回滚操作,将刚才执行的命令取消并恢复原来的数据,在exec执行前执行。
事务处理几种情况:
- 所有命令按照排列顺序正常执行。
- 放弃事务执行,在 exec 命令之前,使用 discard 命令会取消所有队列中的命令。
- 命令在排队时 (exec 之前) 执行失败 (语法错误),在执行 exec 时会提示错误,所有命令都不会执行成功。
- 在使用 exec 命令执行时,可能出现命令执行错误,对于错误命令会执行不成功,但是其它可以执行成功的命令可以成功。(部分原子性)
watch 命令:在执行multi命令之前,先执行
watch key1 key2 ...
,如果在事务执行前这些key的值被其它操作修改,那么这个事务就会被打断。在事务中,如果几个操作去操作同一条数据可能会发生数据不同步的问题,Redis 使用乐观锁机制来防止事务冲突问题。
注意:unwatch
命令可以取消对所有key的监视,客户端断开连接,客户端对 Redis 的所有 key 的监控也会取消。
5、Redis 管道
Redis 是一种 C/S 架构,每次请求都会建立 TCP 连接,为了减少客户端与服务器端的连接,Redis 管道可以将命令进行打包进行批量操作,只请求一次并且只会获取一次返回结果,这样减少连接资源消耗提高性能。pipeline 管道实现原理就是一个队列,先进入命令先执行。
注意:
- pipeline 管道是非原子性的,支持不同数据类型的命令。
- pipeline 组装命令不能太多,太多命令可能使客户端阻塞太久,而导致服务端被迫发送一个队列等待的响应,应该将多个命令拆分成小段进行执行。
- pipeline 与 事务都是一些列命令,但是事务是将命令一条一条发送,执行 exec 时才会真正执行命令,而管道是一次将所有命令发送。事务会阻塞其它命令执行,而管道不会阻塞。
6、Redis 发布与订阅
Redis 的发布与订阅是一种消息的通讯模式,发送者发送消息,订阅者接收消息。Redis 客户端可以订阅任意数量的频道。如果发布者发布的频道被其他客户端订阅了频道,那么在发布的时候订阅的客户端就会接收到消息。
subscribe 频道名1 频道名2 。。。。 # 订阅者订阅多个频道
publish 频道名 message # 发布者通过频道发布消息
7、Redis 主从复制
主从复制:主机更新数据后会根据配置和策略,自动同步到备机上(master/slaver)。master 以写为主,slaver 以读为主。(只能一主多从,不能多主,但是可以配置集群)
- 读写分离:这样应用可以将读和写操作分开,减少redis服务压力提高性能。
- 容灾快速恢复:如果主机或者某一个从机发生故障,可以根据策略从其它从机上读取数据
7.1、Redis 主从复制
info replication # 查看 redis 服务信息
# 手动配置主从关系,重启 redis 失效,一般配置到 redis.conf 中
replicaof 主库ip 主库端口 # 在从机上使用命令,指定从机对应的主机
slaveof 主库ip 主库端口 # 修改主机对应的从机
slaveof no one # 去除从机性质,改成主机
配置文件(配从机不配主机):只需要配置从机配置文件,它就会去连接主机,然后同步主机数据。
slaveof 主机ip port # 从机配置文件上连接主机
masterauth xxxx # 主机的密码
注意:主机上可以进行读写操作,但是从机上只能读取不能写入,写入就会报错。当从机宕机后,主机会感知到会移除这个从机。如果主机宕机后,从机会感知但是不会移除主机只是主机状态变成下线状态,当主机重新上线从机依然会连接到主机。
薪火相传:一个主机下可以有多个从机,一个从机下也可以有多个从机。主机每次同步数据时只会同步给主机下的从机,然后再由从机同步给它下面的从机。(中间服务器也不能写操作)
反客为主:当主机服务宕机以后,在这个主机下的所有从机都会升级为主机,而从机下的从机不会发生变化。( )
# 当主机宕机,需要手动使用命令将从机变成主机
slaveof no one
主从复制执行流程:
- 从机启动后会向主机发送一个同步数据命令,同步主机全部数据(全量复制),从机自身的数据全部会被覆盖掉。
- 主机收到从机同步命令时,会触发 RDB 持久化操作,在持久化完成后将 RDB 快照文件发送给从机,从机就会根据文件进行初始化。
- 主机会默认 10s 给从机发送心跳检测,保持通信。
- 在主机上进行写命令是,会自动同步给从机。
- 从机下线后重连,重连都会全量进行复制主机数据。
注意:主从架构有致命缺点,主机同步数据到从机上可能出现网络延迟情况。如果主机宕机,那么这个缓存系统就不能进行写操作,只能从机读取数据。
7.2、Redis 哨兵模式
哨兵模式:反客为主的自动版,能够自动监控主机是否故障,如果故障了就会根据投票数高的从机转换成主机。哨兵机器不存放数据,只是一个监控者。
- 主从监控:监控主从 Redis 是否正常运行。
- 消息通知:哨兵可以将故障转移的结果发送给客户端。
- 故障转移:如果主机宕机,可以根据投票策略将主机下票数高的从机升级成主机。
- 配置中心:客户端通过连接哨兵获取当前 Redis 服务的主节点地址。
配置哨兵:在监控主机上也需要配置密码,不然故障转移旧主机不能连接到新主机。
# 配置哨兵,在安装目录下配置 sentinel.conf 文件中配置哨兵
daemonize yes # 后台启动
port 26379 # 端口
# mymaster是给监控对象起的名称,x是确认客观下线至少有几个哨兵同意迁移才能迁移(一个主机可以有多个哨兵,一个哨兵可以监听多个主机)
sentinel monitor mymaster 监视的主机ip 端口 x
sentinel auth-pass 监视名称 xxxx(监视对象的密码)
# 主节点在 milliseconds 没有回答哨兵,哨兵认为主机下线,默认 30s
sentinel down-after-milliseconds <master-name> <milliseconds>
# 故障转移超时时间,操作这个时间转移失败,默认3分钟
sentinel failover-timeout mymaster 180000
# 启动哨兵
redis-sentinel 哨兵配置文件路径
# 或者
redis-server 哨兵配置文件路径 --sentinel
主观下线(SD):是指某个哨兵认为 Redis 服务不再正常工作,但是没有与其它哨兵达成共识。
客观下线(OD):在 sentinel 集群中,可能出现网络抖动导致某个哨兵不能及时收到心跳包,可能误以为主机下线。X 参数就是指定在指定集群中认为主机下线的哨兵数至少达到 X 才会认为主机下线,才会进行故障转移。
注意:启动了 Sentinel 哨兵后,在哨兵的配置文件中会自动生成对应的配置。
主机下线从机上位:模拟主机下线,从机是否能够升级成主机。
- 主机下线通过投票从从机选取新的主机,并且新主机和从机数据不会丢失。
- 启用 sentinel 模式后,主机记录其它从机信息,旧主机重新上线后,会作为从机跟在新主机的后面(旧主机需要配置密码),新主机的配置文件也会发生一定变化。
- 发生主机切换,哨兵的配置文件被重写对应配置。
哨兵运行流程:
- 先某个 sentinel 哨兵对主机进行了主观下线,任务主机异常。
- 然后与 Sentinel 集群中的其它哨兵进行达成共识,判断是否达到客观下线的值。
- 如果达到客观下线的值,在 Sentinel 集群中根据 RSTF 选举算法进行故障转移的哨兵的选举。
- 选举出来的哨兵领导者从 slaver 中选举出行的主机,并进行故障转移。
- 在故障转移的过程中,可能出现数据丢失的情况。
新 master 选举原理:
- 配置从机优先级,在从机的 redis.conf 文件中。
# 设置从机优先级,数字越小优先级越高,默认是100
replica-priority 100
- 选择偏移量大的从机,数据上如果和主机同步率越高,越先被选择。
- 在每次启动 Redis 服务,都会产生一个 UUID (40位),UUID 越小越先被选择。
8、Redis 集群
在实际开发中Redis可能出现内存容量不够以及在并发操作下性能提高的需求,另外在薪火相传、反客为主、主机宕机哨兵选择新主机情况下导致 ip 地址的变化。在 Redis3.0 之前是通过代理主机的方式解决,3.0之后通过无中心化的集群配置来解决提到的问题。
代理主机:所有的应用请求都请求到代理主机上,然后代理主机来分发请求到各个 Redis 主机上,然后得到的数据返回给应用。
无中心化集群:每一个主机都可以作为请求的入口,当请求来了后会到请求主机上查询数据,如果有这条数据就返回,没有就会去其他主机上查询,主机与主机之间是相通的。
Redis 集群实现了 Redis 的水平扩容,启动 N 个 Redis 节点,那么整个数据都会分布存储存在这 N 个节点中,每一个节点储存总数据的 1/N 。即使某个 Redis 节点失效并无法进行通讯,集群中的其它节点依然可以处理请求,多个 Redis 节点之间共享数据的程序集。
8.1、Redis 集群理论
在Redis集群中应该至少有三个节点,并且按照每一个主机在不同的服务器(不同ip)上,每个从机不和自己主机在同一服务器上,这样就保证了当主机宕机以后从机能快速接替主机继续进行工作。
slot (插槽):在集群搭建成功后集群中会有16384个插槽,集群中每一个节点会负责一部分插槽。在每次向集群中插入 key 时,会根据
CRC16(key) mod 16384
去计算 key 对应的插槽,然后找到对应处理主机进行处理,理论上 Redis 集群节点可以达到16384个,每一个节点管理一个槽位,但是官方建议集群节点不超过1000个。
分片:使用 Redis 集群时我们会把数据分散储存到多台 Redis 机器上,集群中每一个节点都是一个片区。
- 写操作:根据 key 进行
CRC16(key) mod 16384
算出存储在哪个节点上(哪个分片)。 - 读操作:会根据 key 的 确定性哈希函数找到 key 在写操作时写入的片区,然后去这个节点上去找。
分片算法:
哈希取余算法:hash(key) mod 节点数量
,直接通过 key 去取余节点数量,这种方式简单粗暴但是存在一定问题。
- 如果某一个节点宕机,那么取余的节点数量需要手动去修改,不然后映射到宕机主机上。
- 对于节点扩容、缩容不方便,节点增加和删除可能出现 key 与原来的映射不一致问题,取余结果会重新洗牌。
一致性哈希算法:将集群中各个节点 IP 映射到 hash 环上,每个节点就能确定在 hash 环上的位置。在 key 映射时根据同一个 hash 算法也确定在 hash 环上的位置,然后再顺时针查找在 hash 环上的节点位置,然后就近落在这个节点上。
- 一致性问题解决了节点宕机导致需要重新计算 hash 值,即使某个节点出现问题,它也可以继续顺时针向下找下一个节点。
- 当节点比较少时,可能出现数据倾斜问题,当两个节点相距很近,数据可能大量会在某一个节点上处理。
哈希槽分区算法:在数据与节点之间添加一层槽,用于管理数据与节点之间的关系,相当于多个 key 存放在槽里面,多个槽对应一个具体节点。先使用 CRC16(key) mod 16384
查找到对应的槽,然后再根据槽位找到对应节点。
为什么插槽数数量是16384个?
- 因为在集群节点心跳包传输中,插槽如果是16位(65535),传输插槽需要的数据为8kb(16384),而14位需要的只有2kb。
- Redis 集群节点数量基本不可能超过1000个,16384个槽位够用。
8.2、Redis 集群搭建
8.2.1、Redis 三主三从集群搭建
redis 配置文件:
# 开启集群
cluster-enabled yes
# 节点配置名
cluster-config-file nodes-6379.conf
# 节点失联时间,超过这个时间自动主从切换
cluster-node-timeout 15000
注意:在 redis 配置文件中需要去掉主从以及哨兵的配置信息。
启动所有 redis 服务,进行 redis 安装路径下的 src 目录下使用命令:
# 确保 redis 的 nodes-6379.conf 文件都生成了
# 将所有的 redis 合成一个集群,只需要使用下面命令
redis-cli --cluster create --cluster-replicas 1 各个主机的真实ip地址(使用空格隔开127.0.0.1:6379) -a 密码
# redis-cli --cluster create --cluster-replicas 1 192.168.32.135:6379 192.168.32.136:6379 192.168.32.137:6379 192.168.32.138:6379 192.168.32.139:6379 192.168.32.140:6379 -a liuhongjun
# 1表示一台主机有一台从机,它会自动分配
注意事项:
- 集群配置了密码需要在启动命令后添加 -a 参数指定连接密码。
- 集群启动需要清除 Redis 中所有数据,包括 RDB、AOF、node配置文件。
测试集群:
# 因为是集群,所以可以通过任何一个主机连接到服务上,-c是集群连接
redis-cli -c -h ip -p port -a password
# 连接后使用命令
cluster nodes # 查看集群中所有节点信息,有myself字样的服务是当前连接
集群常用操作:
# 在集群中不能使用一次添加多个key的命令,例如mset k1 v1 k2 v2....,需要使用分组方式来进行批量添加
mset k1{name} v1 k2{name} v2 # name是分组名,会根据name计算slot值,然后进行处理
cluster keyslot key # 计算这个key的插槽值,不存在的key也可以计算
cluster countkeysinslot 插槽值 # 查看当前主机范围slot下key的数量
cluster getkeysinslot 插槽值 n # 返回插槽值下n个key,也只能返回自己主机上的keys
# (配置文件中)集群某一节点不可用,系统是否还继续使用,yes():不可用,no:可用
cluster-require-full-coverage yes|no
故障恢复:集群中某个主机节点宕机,那么它的从机马上替换主机成为新的主机,继续处理请求,就主机修复后启动会作为新主机的从机。(15秒超时自动断开连接),如果需要继续保持之前的从属关系,需要在旧主机上执行
cluster failover
命令。
如果集群中某一段插槽节点的主机从机都宕机了, 可以通过在 redis.conf 配置cluster-require-full-coverage
来设置,如果值是yes 表示某一节点宕机,那么整个集群将不可用,如果值为 no,表示宕机的节点的插槽不可用不可写入,但是其它插槽依然可以继续使用。
8.2.2、Redis 集群扩容缩容
扩容:在某些高并发峰值时,需要增加节点保证系统可用,但是新加入节点没有对应插槽,这就需要将之前的槽位重新计算。
- 新建一个 Redis 节点并且启动服务。
- 节点加入到集群中。
# 需要一个集群中存在节点作为介绍人
redis-cli -a 密码 --cluster add-node 新节点ip:端口 介绍人ip:端口
# redis-cli -a liuhongjun --cluster add-node 192.168.32.141:6379 192.168.32.135:6379
- 当前新加入的节点还没有分配插槽,需要其它主节点余一点插槽给新节点,并重新计算。
# 由一个已分配槽位节点重新分配槽位
redis-cli -a 密码 --cluster reshard 已分配插槽节点ip:端口
# redis-cli -a liuhongjun --cluster reshard 192.168.32.136:6379
- 新增从机节点到集群中。
# 新增从机节点到集群中,并同步某个主机数据
redis-cli -a 密码 --cluster add-node 新节点ip:端口 介绍人ip:端口 --cluster-slave --cluster-master-id 同步主机节点id
# redis-cli -a liuhongjun --cluster add-node 192.168.32.142:6379 192.168.32.135:6379 --cluster-slave --cluster-master-id 12131b05f5014a88e1199c1c6ef60b30ea3375c0
缩容:当流量顶峰过去,可以减少机器节约成本。
- 先删除主机下的从节点。
redis-cli -a 密码 --cluster del-node 节点ip:端口 节点ID
# redis-cli -a liuhongjun --cluster del-node 192.168.32.142:6379 25a9accc9cf8d24265382db3e3ef102ae3268332
# 检查 redis 集群状态
redis-cli -a liuhongjun --cluster check 某一节点ip:端口
# redis-cli -a liuhongjun --cluster check 192.168.32.135:6379
- 将主机插槽重新分配,插槽重新分配后数据也会迁移,移除插槽后这个主机会变成目标主机的从机,然后需要再执行删除从机操作。
redis-cli -a 密码 --cluster reshard 已分配插槽节点ip:端口
# redis-cli -a liuhongjun --cluster reshard 192.168.32.136:6379
# 删除从机
redis-cli -a 密码 --cluster del-node 节点ip:端口 节点ID
# redis-cli -a liuhgonjun --cluster del-node 192.168.32.141:6379 12131b05f5014a88e1199c1c6ef60b30ea3375c0
9、 Spring Boot 整合 Redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.6.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
9.1、整合单机版
修改 yaml 配置文件:
spring:
redis:
host: 101.43.29.221
port: 6001
password: XXXX
database: 0 # 库下标,0-15
timeout: 1800 # 连接超时时间
lettuce:
pool:
max-active: 20 # 最大连接数
max-wait: -1 # 最大阻塞时间,-1没有限制
max-idle: 5 # 最大空闲连接
min-idle: 0 # 最小空闲连接
创建 Redis 序列化配置类:配置这个类后会对 key 和 value 进行序列化,如果不序列化可能出现中文乱码问题。
@Configuration
@EnableCaching // 开启缓存
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setConnectionFactory(factory);
// key序列化
redisTemplate.setKeySerializer(stringRedisSerializer);
// value序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hashmap序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
return redisTemplate;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory){
RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
// 解决缓存转换异常
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 配置序列化,解决乱码问题,过期时间600秒
RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
return RedisCacheManager.builder(factory)
.cacheDefaults(cacheConfiguration)
.build();
}
}
测试:通过 RedisTemplate 就可以对 Redis 进行操作
@RestController
public class RedisController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/")
public String test1(){
redisTemplate.opsForValue().set("user:10001","10001");
String login = (String) redisTemplate.opsForValue().get("user:10001");
System.out.println(login);
return login;
}
}
9.2、整合集群版
编写 yaml 配置:
spring:
cache:
redis:
time-to-live: 10000
redis:
timeout: 5000
database: 0
cluster:
nodes: 各个节点ip:port (127.0.0.1::6379),多个主机使用英文逗号隔开
max-redirects: 3
lettuce:
pool:
max-active: 20 # 最大连接数
max-wait: -1 # 最大阻塞时间,-1没有限制
max-idle: 5 # 最大空闲连接
min-idle: 0 # 最小空闲连接
password: XXXX
主机宕机:当 key 对应插槽节点出现宕机,然后从机上位充当主机,但是 java 客户端不能动态感知集群中节点变化,就出现节点连接失败问题,需要在配置中打开 lettuce 的动态感知配置。
lettuce:
cluster:
refresh:
# 支持集群动态感应刷新
adaptive: true
# 定时刷新
period: 2000
封装 RedisTemplateService 工具类:
@Service
@Slf4j
public class RedisTemplateService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(@NonNull String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
log.error("exception when expire key {}. ", key, e);
return false;
}
}
/**
* 根据key获取过期时间
*
* @param key 键 不能为 null
* @return 时间(秒) -1 代表为永久有效 -2 代表已失效
*/
public long getExpire(@NonNull String key) {
Long expire = redisTemplate.getExpire(key, TimeUnit.SECONDS);
Optional<Long> optional = Optional.ofNullable(expire);
return optional.orElse(-2L);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
log.error("exception when set key {}. ", key, e);
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
log.error("exception when set key {}. ", key, e);
return false;
}
}
/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta <= 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta <= 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hGet(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hMGet(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hMSet(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
log.error("exception when hash set key {}. ", key, e);
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hMSet(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
log.error("exception when hash set key {}. ", key, e);
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hSet(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
log.error("exception when hash set key {}, item {} ", key, item, e);
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hSet(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
log.error("exception when hash set key {}, item {} ", key, item, e);
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hDel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
/**
* 根据key获取Set中的所有值
*
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
return redisTemplate.opsForSet().remove(key, values);
} catch (Exception e) {
return 0;
}
}
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
return redisTemplate.opsForList().remove(key, count, value);
} catch (Exception e) {
return 0;
}
}
10、应用问题
缓存穿透:是用户查询一条在redis中不存在的数据,这是就会向数据库中去查询,然而也没有查询到就会一直请求一直请求导致数据库服务压力变大。缓存穿透一般是遭受黑客攻击,黑客会一直去查询一条不存在的数据,导致我们的服务不可用。
解决方案:
- 对空值进行缓存:对于返回空值的数据也进行缓存,将空值的过期时间设置很短。
- 设置课访问的白名单:使用bitmap类型定义一个白名单,名单id作为bitmap的偏移量,先查询是否在bitmap中,如果在就将请求放行,如果不在就将请求进行拦截。
- 布隆过滤器:它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。
缓存击穿:一个经常被请求的key,在缓存时间过期时,后端数据库没有及时将缓存更新,这时突然大量的请求去请求这个key,导致在redis中无法获取到数据,会直接去请求数据库,数据库压力增大性能下降。
解决方案:
- 使用锁:在缓存失效时(取出来的值为空),不要立即去数据库中查询。
缓存雪崩:redis中的大量key集中过期或者redis宕机,导致大量请求直接从数据库中请求数据,数据库服务压力增大。
解决方案:
- 给key设置过期时间设置成间隔过期。
- 构建多级缓存结构。
- 搭建redis集群,保证redis的高可用。
- 在请求层面,对缓存业务添加限流和服务降级。
- 使用锁和队列,保证没有大量的线程对数据库进行读写,同时避免大量请求落在底层的存储系统上。
11、分布式锁
随着业务的发展,从单一的服务架构演变成分布式的服务架构。由于分布式的多线程、多进程分布在不同的服务器上,这使单机下的锁策略失效。为了解决这个问题就需要一个跨JVM的互斥锁来控制资源的访问。
- 基于数据库实现分布式锁。
- 基于缓存实现分布式锁。
- 基于Zookeeper实现分布式锁。
基于Redis缓存实现分布式锁:
# 分布式锁基于下面的命令
setnx key_lock value # setnx增加key时,如果key存在就不会添加成功,不存在才能添加成功,key_lock是锁的名称
# 在这个过程中可能存在业务逻辑中一直没有把锁删除,所以需要给锁加一个过期时间
set key_lock value nx ex time # nx 表示不能重复设置,ex设置过期时间time,单位秒
Java实现分布式锁:
@GetMapping("/lock")
public String lock(){
// 获取锁
boolean lock = redisTemplateService.setNx("lock", "1", 10);
// 如果是true就获取了锁
if (lock){
// 键num+1,
redisTemplateService.incr("num",1);
// 删除缓存中的锁
redisTemplateService.del("lock");
System.out.println("增加成功");
return "增加成功";
}else {
// 没有获取到锁,等待并重新请求锁
this.lock();
}
return "增加成功";
}
UUID防止lock误删:在获取到锁以后,但是在业务逻辑代码中如果处理的时间超过了lock设置的过期时间,那么lock就会自动过期,其他请求就会重新设置锁,并进入请求。当之前的业务逻辑代码处理完后就会去删除lock,就会把其他请求的lock删除,导致另外的请求也会进入,这样就可以使用UUID作为key值,再删除之前判断是否是自己的key,如果是自己的key就删除不是就不删除。
@GetMapping("/lock")
public String lock(){
String uuid = UUID.randomUUID().toString();
// 获取锁
boolean lock = redisTemplateService.setNx("lock", uuid, 10);
// 如果是true就获取了锁
if (lock){
// 键num+1,
redisTemplateService.incr("num",1);
String value = (String) redisTemplateService.get("lock");
if (uuid.equals(value)){
// 删除缓存中的锁
redisTemplateService.del("lock");
}
return "增加成功";
}else {
// 没有获取到锁,等待并重新请求锁
this.lock();
}
return "增加成功";
}
控制资源的访问。
- 基于数据库实现分布式锁。
- 基于缓存实现分布式锁。
- 基于Zookeeper实现分布式锁。