文章目录
- 前言
- 一、springboot整合redis
- 1、jedis
- 1.1、单点模式
- 1.2、哨兵模式
- 1.3 集群模式
- 1.4、关于jedis线程不安全的验证
- 2、lettuce(推荐)
- 2.1、单点模式
- 2.2、哨兵模式
- 2.3、集群模式
- 3、RedisTemplate config
- 二、redis常用知识点
- 1、缓存数据一致性
- 2、缓存雪崩
- 3、缓存击穿
- 4、缓存穿透
- 5、慢查询
- 6、pipelining
前言
Redis 是一个高性能的键值存储数据库,在许多应用场景中广泛使用,以下是有关 Redis 实践与扩展的一些内容。
一、springboot整合redis
1、jedis
yml
spring:
redis:
host: localhost # Redis服务器地址
port: 6379 # Redis服务器端口
password: your_password # 如果有密码的话
timeout: 5000ms # 连接超时时间
1.1、单点模式
public RedisConnectionFactory standalone() {
RedisStandaloneConfiguration standalone = new RedisStandaloneConfiguration();
standalone.setDatabase(this.redisProperties.getDatabase());
standalone.setHostName(this.redisProperties.getHost());
standalone.setPort(this.redisProperties.getPort());
standalone.setPassword(RedisPassword.of(this.redisProperties.getPassword()));
JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb = (JedisClientConfiguration.JedisPoolingClientConfigurationBuilder)JedisClientConfiguration.builder();
this.initJedisPoolConfig(jpb);
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(standalone, jpb.build());
return jedisConnectionFactory;
}
private void initJedisPoolConfig(JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
RedisProperties.Pool pool = this.redisProperties.getJedis().getPool();
CustomRedisProperties.Pool customPool = this.customRedisProperties.getJedis().getPool();
jedisPoolConfig.setMaxIdle(pool.getMaxIdle());
jedisPoolConfig.setMinIdle(pool.getMinIdle());
jedisPoolConfig.setMaxTotal(pool.getMaxActive());
jedisPoolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());
jedisPoolConfig.setTestOnBorrow(customPool.getTestOnBorrow());
jedisPoolConfig.setTestOnReturn(customPool.getTestOnReturn());
jedisPoolConfig.setTestWhileIdle(customPool.getTestWhileIdle());
jpb.poolConfig(jedisPoolConfig);
if (!Objects.isNull(pool.getEnabled()) && pool.getEnabled()) {
jpb.and().usePooling();
}
}
1.2、哨兵模式
yml
spring:
redis:
password: your_redis_password
sentinel:
# 主节点名称
master: mymaster
# 哨兵节点地址列表
nodes:
- 192.168.1.101:26379
- 192.168.1.102:26379
- 192.168.1.103:26379
jedis:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 5000ms
config
public RedisConnectionFactory sentinel() {
List<String> redisNodes = this.redisProperties.getSentinel().getNodes();
String master = this.redisProperties.getSentinel().getMaster();
RedisSentinelConfiguration sentinel = new RedisSentinelConfiguration();
for (String redisHost : redisNodes) {
String[] item = redisHost.split(":");
String ip = item[0];
String port = item[1];
sentinel.addSentinel(new RedisNode(ip, Integer.parseInt(port)));
}
sentinel.setMaster(master);
sentinel.setDatabase(this.redisProperties.getDatabase());
if (!StringUtils.isEmpty(this.redisProperties.getPassword())) {
sentinel.setPassword(RedisPassword.of(this.redisProperties.getPassword()));
}
JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb = (JedisClientConfiguration.JedisPoolingClientConfigurationBuilder)JedisClientConfiguration.builder();
this.initJedisPoolConfig(jpb);
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(sentinel, jpb.build());
return jedisConnectionFactory;
}
private void initJedisPoolConfig(JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
RedisProperties.Pool pool = this.redisProperties.getJedis().getPool();
CustomRedisProperties.Pool customPool = this.customRedisProperties.getJedis().getPool();
jedisPoolConfig.setMaxIdle(pool.getMaxIdle());
jedisPoolConfig.setMinIdle(pool.getMinIdle());
jedisPoolConfig.setMaxTotal(pool.getMaxActive());
jedisPoolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());
jedisPoolConfig.setTestOnBorrow(customPool.getTestOnBorrow());
jedisPoolConfig.setTestOnReturn(customPool.getTestOnReturn());
jedisPoolConfig.setTestWhileIdle(customPool.getTestWhileIdle());
jpb.poolConfig(jedisPoolConfig);
if (!Objects.isNull(pool.getEnabled()) && pool.getEnabled()) {
jpb.and().usePooling();
}
}
1.3 集群模式
yml
spring:
redis:
password: your_redis_password
sentinel:
master: mymaster # 主节点名称
nodes:
- 192.168.1.101:26379
- 192.168.1.102:26379
- 192.168.1.103:26379 # 哨兵节点地址列表
jedis:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 5000ms
config
public RedisConnectionFactory cluster() {
RedisClusterConfiguration cluster = new RedisClusterConfiguration(this.redisProperties.getCluster().getNodes());
if (this.redisProperties.getCluster().getMaxRedirects() != null) {
cluster.setMaxRedirects(this.redisProperties.getCluster().getMaxRedirects());
}
cluster.setUsername(this.redisProperties.getUsername());
cluster.setPassword(RedisPassword.of(this.redisProperties.getPassword()));
JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb = (JedisClientConfiguration.JedisPoolingClientConfigurationBuilder) JedisClientConfiguration.builder();
this.initJedisPoolConfig(jpb);
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(cluster, jpb.build());
return jedisConnectionFactory;
}
private void initJedisPoolConfig(JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jpb) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
RedisProperties.Pool pool = this.redisProperties.getJedis().getPool();
CustomRedisProperties.Pool customPool = this.customRedisProperties.getJedis().getPool();
jedisPoolConfig.setMaxIdle(pool.getMaxIdle());
jedisPoolConfig.setMinIdle(pool.getMinIdle());
jedisPoolConfig.setMaxTotal(pool.getMaxActive());
jedisPoolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());
jedisPoolConfig.setTestOnBorrow(customPool.getTestOnBorrow());
jedisPoolConfig.setTestOnReturn(customPool.getTestOnReturn());
jedisPoolConfig.setTestWhileIdle(customPool.getTestWhileIdle());
jpb.poolConfig(jedisPoolConfig);
if (!Objects.isNull(pool.getEnabled()) && pool.getEnabled()) {
jpb.and().usePooling();
}
}
1.4、关于jedis线程不安全的验证
public class Test {
public static void main(String[] args) {
//连接本地的 Redis 服务
Jedis jedis = new Jedis("localhost");
//查看服务是否运行
System.out.println("服务正在运行: "+jedis.ping());
for (int i = 0; i < 2; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 10; j++) {
jedis.set("a" + finalI, String.valueOf(finalI));
System.out.println("a" + finalI + " = " + jedis.get("a" + finalI));
}
}).start();
}
}
}
运行结果
服务正在运行: PONG
a1 = OK
a1 = OK
a0 = 1
a0 = OK
a1 = 1
a1 = OK
a1 = 1
a0 = OK
a0 = OK
a1 = 0
假设jedis是线程安全的,那么在代码中,输出语句:“System.out.println(“a” + finalI + " = " + jedis.get(“a” + finalI));”的预期输出结果应该是“a0 = 0”或“a1 = 1”。但是实际上控制台中可以看到“a0 = OK”、“a1 = OK” 的错误输出。
jedis的请求流和响应流都是全局变量,当不同的线程在set和get的时候,有可能会出现线程A的set()的响应流,被线程B的get()作为返回了,所以出现了“a0 = OK”,“a1 = OK”的情况
2、lettuce(推荐)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.1、单点模式
yml
spring:
redis:
host: localhost # Redis服务器地址
port: 6379 # Redis服务器端口
password: your_password # 如果有密码的话
timeout: 5000ms # 连接超时时间
lettuce:
pool:
max-active: 8 # 连接池最大活跃连接数
max-idle: 8 # 连接池最大空闲连接数
min-idle: 0 # 连接池最小空闲连接数
max-wait: -1ms # 获取连接的最大等待时间
config
@Configuration
public class RedisConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory(@Value("${spring.redis.host}") String host,
@Value("${spring.redis.port}") int port,
@Value("${spring.redis.password}") String password) {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(host, port));
}
}
2.2、哨兵模式
yml
spring:
redis:
password: your_redis_password
sentinel:
master: mymaster # 主节点名称
nodes:
- 192.168.1.101:26379
- 192.168.1.102:26379
- 192.168.1.103:26379 # 哨兵节点地址列表
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 5000ms
config
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster")
.sentinels(getSentinelNodes());
return new LettuceConnectionFactory(sentinelConfig);
}
private Set<RedisNode> getSentinelNodes() {
return Arrays.stream(redisProperties.getSentinel().getNodes())
.map(address -> address.split(":"))
.map(parts -> new RedisNode(parts[0], Integer.parseInt(parts[1])))
.collect(Collectors.toSet());
}
2.3、集群模式
yml
spring:
redis:
cluster:
nodes:
- 192.168.1.11:6379
- 192.168.1.12:6379
- 192.168.1.13:6379
- 192.168.1.14:6379
- 192.168.1.15:6379
- 192.168.1.16:6379 # 集群节点地址列表
max-redirects: 3 # 重定向的最大次数
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 5000ms
config
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
List<String> nodeStrings = redisProperties.getCluster().getNodes();
Set<RedisNode> nodes = nodeStrings.stream()
.map(address -> address.split(":"))
.map(parts -> new RedisNode(parts[0], Integer.parseInt(parts[1])))
.collect(Collectors.toSet());
return new LettuceConnectionFactory(new RedisClusterConfiguration(nodes));
}
3、RedisTemplate config
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 使用 Jackson 序列化器处理键值对中的对象
GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();
template.setValueSerializer(jacksonSerializer);
template.setHashValueSerializer(jacksonSerializer);
// 使用 String 序列化器处理键
StringRedisSerializer stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
template.afterPropertiesSet();
return template;
}
二、redis常用知识点
1、缓存数据一致性
针对读多写少的⾼并发场景,我们可以使⽤缓存来提升查询速度
查询缓存逻辑
- 我们查询DB之前,先去查询Redis,如果Redis存在,直接返回;
- 如果Redis不存在,从DB查询,且回写到Redis
假设场景如下 - 线程A请求缓存,没有缓存,从DB拿到1。
- 线程B将1更新为2,并且删除缓存,DB的值为2
- 线程A更新缓存,redis为1
我们发现,redis数据为1 ,db数据为2,出现了数据一致性问题。
如何解决一致性问题:
- 延迟双删:更新DB后,等待一段时间,再进行Redis删除!确保其他的线程拿到的都是最新数据!
- 分布式锁:避免并发
- 最终一致性:设置有效期
2、缓存雪崩
缓存雪崩就是Redis的大量热点数据同时过期(失效),因为设置了相同的过期时间,刚好这个时候Redis请求的并发量又很大,就会导致所有的请求落到数据库。
解决办法
- 保证Redis的高可用,防止由于Redis不可用导致全部打到DB
- 加互斥锁或者使用队列,针对同一个key只允许一个线程到数据库查询
- 缓存定时预先更新,避免同时失效
- 通过加随机数,使key在不同的时间过期,或者永不过期
3、缓存击穿
缓存击穿(Cache Breakdown)是指在分布式系统中,当一个热点数据项的缓存过期或失效时,由于该数据项受到极高的并发访问请求,大量的请求会直接穿透到后端数据库,造成数据库瞬时压力剧增,可能影响数据库性能甚至导致其崩溃的现象。比如电商网站的商品秒杀活动等
解决办法
- 互斥锁结合双重检查:第一次检查无锁,检查不存在则加锁进行二次检查,如果还是不存在,则查询DB,然后放进缓存,这样就保证了那一瞬间只有一个线程查询数据库
- 热点数据永不过期或定期异步更新:对于那些非常热门的数据项,可以选择不设置过期时间,或者采用后台任务定期异步更新这些数据项的缓存副本。
4、缓存穿透
缓存穿透(Cache Penetration)是指在分布式系统中,某些恶意请求或者错误的业务逻辑导致查询了大量不存在的数据项。每次这样的请求都会直接打到数据库,造成不必要的压力,并且由于数据不存在,缓存也无法起到应有的作用。
解决办法
- 布隆过滤器:很大程度避免无效查询
- 权限验证:参数验证、黑白名单
- 监控预警系统:监控是否有恶意攻击
布隆过滤器的原理分析
-
初始化
当创建一个布隆过滤器时,首先会初始化一个全为0的位数组(bit array),这个数组的长度通常远大于预期要存储的元素数量。 -
添加元素
- 哈希运算:每当向布隆过滤器中添加一个新的元素时,该元素会被多个独立设计的哈希函数处理。每个哈希函数都会根据输入元素计算出一个索引值。
- 设置位:对于每一个由哈希函数产生的索引值,在位数组中对应的位将被设置为1。如果同一个位置已经被其他元素置为1,则保持不变。
例如,假设我们有三个不同的哈希函数,并且我们要添加一个字符串 “example” 到布隆过滤器中,那么这三个哈希函数可能会分别产生索引值 5、17 和 34。接下来,我们会将位数组中第5位、第17位以及第34位都设为1。
- 查询元素
- 哈希运算:当查询某个元素是否存在于布隆过滤器中时,同样使用相同的哈希函数对该元素进行哈希运算,得到一系列索引值。
- 检查位:然后检查这些索引值所对应的位是否全部为1。如果是,则表示该元素很可能存在于集合中;如果任何一个索引值对应的位置是0,则可以肯定地说该元素不在集合中。
需要注意的是,由于可能存在多个不同元素映射到相同的位置,即使所有指定的位都是1,也不能百分之百确定该元素确实存在于集合中。这种情况下,存在一定的误报率(False Positive Rate),即原本不存在于集合中的元素也被认为可能存在。
-
删除操作
传统意义上的布隆过滤器不支持直接删除元素的操作,因为一旦将某一位从1重置回0,就可能会影响到那些也依赖于此位的其他元素的正确性。不过,有一种变种叫做计数布隆过滤器(Counting Bloom Filter),它可以记录每个位被多少个元素引用,从而允许一定程度上的删除操作。 -
误判率与参数选择
- 误判率:随着插入到布隆过滤器中的元素增多,位数组中更多的位被设置为1,导致后续插入或查询时发生冲突的可能性增加,进而提高了误判的概率。因此,在实际应用中需要权衡误判率和资源消耗之间的关系。
- 参数选择:为了最小化误判率并优化性能,开发者需要精心挑选位数组的大小m、哈希函数的数量k以及预计插入的元素数量n。理论上,可以通过数学公式来推导最优的m和k值,以达到预期的误判率。
5、慢查询
在Redis中,慢查询日志用于记录执行时间超过预设阈值的命令,这些信息可以帮助开发和运维人员定位性能问题。每条慢查询日志包含标识ID、发生时间戳、命令耗时及详细信息。需要注意的是,慢查询只记录命令执行的时间,并不包括命令排队和网络传输时间,因此客户端执行命令的时间可能会大于命令实际执行的时间
-
配置慢查询参数
Redis通过两个主要配置参数来管理慢查询功能:- slowlog-log-slower-than:这个参数定义了什么情况下被认为是“慢”的查询。它的单位是微秒(1秒 = 1000毫秒 = 1000000微秒),默认值为10000微秒(即10毫秒)。如果设置为0,则表示记录所有命令;若设置为负数,则不会记录任何命令。
- slowlog-max-len:该参数指定了慢查询日志最多可以存储多少条记录。实际上,Redis使用了一个列表来存储慢查询日志,slowlog-max-len就是列表的最大长度。它是一个先进先出队列,当新的命令满足慢查询条件时会被插入到列表中,而当列表已满时,最早的记录将会被移除以腾出空间给新的记录
-
查看与管理慢查询日志
- SLOWLOG GET [count]:此命令用来获取指定数量的最新慢查询记录。如果不提供count参数,默认会返回所有的慢查询记录。
- SLOWLOG LEN:该命令用于查询当前保存在内存中的慢查询日志的数量。
- SLOWLOG RESET:这个命令用于清空现有的慢查询日志记录。
-
实际应用中的注意事项
在实际的应用场景中,使用Redis慢查询日志时还需要注意以下几点: -
调整阈值:对于高并发量的环境,建议根据实际情况调整slowlog-log-slower-than的值,例如将其设为1毫秒,以便更敏感地捕捉潜在的问题。
-
增加日志容量:线上环境中应适当增大slowlog-max-len,以确保更多的慢查询能够被记录下来供后续分析。同时,考虑到日志数据的重要性,还可以考虑定期将慢查询日志持久化到外部存储系统中。
-
避免误判:由于Redis采用单线程模型处理请求,一个长时间运行的命令会导致其他命令的级联阻塞。因此,当遇到客户端请求超时的情况时,应该检查对应时间点是否有慢查询的存在,从而判断是否是由慢查询引起的延迟。
6、pipelining
客户端发送一个请求给服务端----->客户端等待服务器响应—>服务器收到请求并处理---->返回客户端
例如,五个命令序列是这样的:
SET X 0
INCR X
INCR X
INCR X
INCR X
客户端到服务端是需要时间的,包括网络传输,连接等等,我们将这个时间称为RTT时间(Round Trip Time)。如果每次都做一次这样的操作,那会很大影响到影响我们的性能。
Redis出现了一个pipelining管道技术,目的就是不要每次执行一次指令都经过一次RTT时间,可以使多个指令只需要经历一次RTT时间,这样子性能一般能提升5到10倍。
验证:
1、不用pipelining
public long noPipelining() {
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
redisTemplate.opsForValue().set("tes" + i, i);
}
long end = System.currentTimeMillis();
return end - start;
}
测试10W个key,耗时40S左右
@RequestMapping(value = "/noPipelining" , method = RequestMethod.GET)
@ResponseBody
public long noPipelining() {
return productService.noPipelining();
}
2、用pipelining
public long pipelining() {
long start = System.currentTimeMillis();
redisTemplate.executePipelined((RedisCallback<Object>)
connection ->
{
for (int i = 0; i <100000 ; i++) {
connection.stringCommands().set(("key"+i).getBytes(),Strin
g.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
return null;
});
long end = System.currentTimeMillis();
return end - start;
}
测试10W个key,耗时1S左右
@RequestMapping(value = "/pipelining" ,method = RequestMethod.GET)
@ResponseBody
public long pipelining() {
return productService.pipelining();
}
我们发现性能提升了将近40倍,但是也不是任意使用pipelining技术,需要注意一下几点
- 原子性:Pipelining 并不能保证命令执行的原子性。也就是说,在 Pipelining 执行期间,其他客户端仍可以向 Redis 发送命令,而且如果某个命令失败了,也不会影响到其他命令的成功执行。因此,如果命令之间存在依赖关系,则不适合使用 Pipelining。
- 错误处理:如果在 Pipelining 过程中发生了错误,Redis 不支持回滚操作。这意味着开发者需要自行设计逻辑来处理可能出现的问题。
- 命令数量限制:虽然理论上 Pipelining 可以包含任意数量的命令,但实际上由于输入缓存区大小有限制(例如,默认最大为 1GB),所以不应该无限制地增加 Pipelining 中的命令数量。
- 避免高延迟命令:为了确保良好的性能表现,应当避免在 Pipelining 内部包含那些执行时间较长的命令,因为这可能会阻塞整个队列。