一、缓存双写一致性
- 如果redis中有数据:需要和数据库中的值相同。
- 如果redis中没有数据:数据库中的值要是最新值,且准备回写redis。
- 只读缓存。
- 读写缓存:①、同步直写策略:写数据库后也同步写redis缓存,缓存和数据库中的数据一致,对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略。②、异步缓写策略:正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,如仓库、物流等功能。异常情况出现了,不得不将失败的动作重新修补,有可能需要借助kafka或者rabbitMQ等消息中间件,实现重试重写。
- 双检加锁策略:多个线程同时去查询数据库的这条数据,那么就可以第一个查询数据的请求上使用一个互斥锁来锁住它。其他线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。
1.1、数据库和缓存一致性的更新策略
目的:给缓存设置过期时间,定期清理缓存并回写,是保证最终一致性的解决方案。
可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。就是如果数据库写入成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,要以mysql的数据库写入库为准。
1.1.1、在停机的情况下
给出公告,服务升级,单线程,这样重量级的数据操作最好不要多线程。
1.1.2、先更新数据库,再更新缓存
1、情况1:①、先更新mysql的某商品的库存,当前商品的库存是100,更新为99。②、先更新mysql修改为99成功,然后更新redis。③、出现异常,更新redis失败了,导致MySQL里面的库存是99而redis里面还是100。所以会导致数据库里的数据和缓存redis里面数据不一致,读到redis脏数据。
2、情况2:在多线程环境下,A,B两个线程有快有慢。①、A更新mysql为100。②、B更新mysql为90。③、B先更新redis为90。④、A再更新redis为100。所以导致redis与mysql更新的数据不一致。
1.1.3、先更新缓存,再更新数据库
不推荐:业务上一般把mysql作为底单数据库,保证最后解释。
1.1.4、先删除缓存,再更新数据库
1、请求A进行写操作,删除redis缓存后,工作正在进行中,更新mysql……A还没有彻底更新完mysql,还没commit。
2、请求B开工查询,查询redis发现缓存不存在(被A从redis中删除了)。
3、请求B继续,去数据库查询得到了mysql中的旧值(A还没有更新完)。
4、请求B将旧值写回redis缓存。
5、请求A将新值写入mysql数据库。
这样依然会导致数据不一致的情况发生。
解决方法:采用延时双删策略,A线程删除redis缓存,然后sleep一段时间,这期间就是为了让B线程先从数据库读取数据,再把缺失的数据写入缓存,然后线程A再进行删除。所以,线程A sleep的时间,就需要大于线程B读取数据再写入缓存的时间。这样,其它线程读取数据时,会发现缓存缺失,所以会从数据库中读取最新值,因为这个方案会在第一次删除缓存后,延迟一段时间再次进行删除,所以叫做:延迟双删。
延时双删的不足:
- 这个删除该休眠多久呢?
线程A sleep的时间,需要大于线程B读取数据再写入缓存的时间。①、在业务程序运行的时候,统计下线程读数据和写缓存的操作时间,评估出项目的读数据业务逻辑的耗时,以此为基础,然后写数据的休眠时间则在读数据业务的耗时上加百毫秒就行。这样确保请求结束,写请求可以删除读请求造成的缓存脏读。②、新启动一个后台监控程序,如watchdog监控程序会加时。
1.1.5、先更新数据库,在删除缓存
缺点:缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。
解决方案:
- 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(如使用Kafaka/RabbitMQ)。
- 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。
- 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试。
- 当重试超过一定次数后,就需要向业务层发送保错信息了,通知运维人员。
总结:
1.2、Redis与Mysql数据双写一致性
1.2.1、canal
主要用途用于MySQL数据库增量日志数据的订阅,消费和解析,是阿里巴巴开发并开源的,采用Java语言开发。
主要功能:1、数据库镜像,2、数据库实时备份。3、索引构建和实时维护(拆分异构索引、倒排索引等)。4、业务cache刷新。5、带业务逻辑的增量数据处理。
工作原理:①、canal模拟MySQL slave的交互协议,伪装自己为MySQL master发送dump协议。②、MySQL master收到dump请求,开始推送binary log给slave(即canal)
③、canal解析binary log对象(原始为byte流)。
下载地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
1.2.2、Redis与Mysql数据双写一致性实现
mysql前置配置:
- 、MySQL 5.7.36
- 、当前主机二进制日志:SHOW MASTER STATUS;
- 、查看:SHOW VARIABLES LIKE 'log_bin';
- 、开启MySQL的binlog写入功能,在mysql的ini文件中配置
log-bin=mysql-bin #开启binlog
binlog-format=ROW #开启ROW模式
server_id=1 #配置MySQL replction需要定义,不要和canal的slaveid重复
- 重启mysql
- 、再次查看:SHOW VARIABLES LIKE 'log_bin';
- 授权canal连接MySQL
#先检查是否有canal
SELECT*FROM mysql.user
#没有就创建
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
Canal服务端:
- 、下载linux版本:
解压
配置文件
启动
查看日志
Java程序:
- 、sql脚本:
CREATE TABLE `a_user`(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`userName` VARCHAR(100) NOT NULL,
PRIMARY KEY(`id`)
)ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
public class RedisCanalClientExample {
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.200.110";
private static void redisInsert(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.del(columns.get(0).getValue());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args) {
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
"example",
"",
"");
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//设置监控的数据库与表
//connector.subscribe(".*\\..*");
connector.subscribe("test1.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}