Java17 --- redis7缓存双写一致性

一、缓存双写一致性

  1. 如果redis中有数据:需要和数据库中的值相同。
  2. 如果redis中没有数据:数据库中的值要是最新值,且准备回写redis。
  3. 只读缓存。
  4. 读写缓存:①、同步直写策略:写数据库后也同步写redis缓存,缓存和数据库中的数据一致,对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略。②、异步缓写策略:正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,如仓库、物流等功能。异常情况出现了,不得不将失败的动作重新修补,有可能需要借助kafka或者rabbitMQ等消息中间件,实现重试重写。
  5. 双检加锁策略:多个线程同时去查询数据库的这条数据,那么就可以第一个查询数据的请求上使用一个互斥锁来锁住它。其他线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。

1.1、数据库和缓存一致性的更新策略

目的:给缓存设置过期时间,定期清理缓存并回写,是保证最终一致性的解决方案。

可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。就是如果数据库写入成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,要以mysql的数据库写入库为准。

1.1.1、在停机的情况下

给出公告,服务升级,单线程,这样重量级的数据操作最好不要多线程。

1.1.2、先更新数据库,再更新缓存

1、情况1:①、先更新mysql的某商品的库存,当前商品的库存是100,更新为99。②、先更新mysql修改为99成功,然后更新redis。③、出现异常,更新redis失败了,导致MySQL里面的库存是99而redis里面还是100。所以会导致数据库里的数据和缓存redis里面数据不一致,读到redis脏数据。

2、情况2:在多线程环境下,A,B两个线程有快有慢。①、A更新mysql为100。②、B更新mysql为90。③、B先更新redis为90。④、A再更新redis为100。所以导致redis与mysql更新的数据不一致。

1.1.3、先更新缓存,再更新数据库

不推荐:业务上一般把mysql作为底单数据库,保证最后解释。

1.1.4、先删除缓存,再更新数据库

1、请求A进行写操作,删除redis缓存后,工作正在进行中,更新mysql……A还没有彻底更新完mysql,还没commit。

2、请求B开工查询,查询redis发现缓存不存在(被A从redis中删除了)。

3、请求B继续,去数据库查询得到了mysql中的旧值(A还没有更新完)。

4、请求B将旧值写回redis缓存。

5、请求A将新值写入mysql数据库。

这样依然会导致数据不一致的情况发生。

解决方法:采用延时双删策略,A线程删除redis缓存,然后sleep一段时间,这期间就是为了让B线程先从数据库读取数据,再把缺失的数据写入缓存,然后线程A再进行删除。所以,线程A sleep的时间,就需要大于线程B读取数据再写入缓存的时间。这样,其它线程读取数据时,会发现缓存缺失,所以会从数据库中读取最新值,因为这个方案会在第一次删除缓存后,延迟一段时间再次进行删除,所以叫做:延迟双删。

延时双删的不足:

  1. 这个删除该休眠多久呢?

线程A sleep的时间,需要大于线程B读取数据再写入缓存的时间。①、在业务程序运行的时候,统计下线程读数据和写缓存的操作时间,评估出项目的读数据业务逻辑的耗时,以此为基础,然后写数据的休眠时间则在读数据业务的耗时上加百毫秒就行。这样确保请求结束,写请求可以删除读请求造成的缓存脏读。②、新启动一个后台监控程序,如watchdog监控程序会加时。

1.1.5、先更新数据库,在删除缓存

缺点:缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。

解决方案:

  1. 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(如使用Kafaka/RabbitMQ)。
  2. 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。
  3. 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试。
  4. 当重试超过一定次数后,就需要向业务层发送保错信息了,通知运维人员。

总结:

1.2、Redis与Mysql数据双写一致性

1.2.1、canal

主要用途用于MySQL数据库增量日志数据的订阅,消费和解析,是阿里巴巴开发并开源的,采用Java语言开发。

主要功能:1、数据库镜像,2、数据库实时备份。3、索引构建和实时维护(拆分异构索引、倒排索引等)。4、业务cache刷新。5、带业务逻辑的增量数据处理。

工作原理:①、canal模拟MySQL  slave的交互协议,伪装自己为MySQL master发送dump协议。②、MySQL master收到dump请求,开始推送binary log给slave(即canal)

③、canal解析binary  log对象(原始为byte流)。

下载地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

1.2.2、Redis与Mysql数据双写一致性实现

mysql前置配置:

  • 、MySQL 5.7.36
  • 、当前主机二进制日志:SHOW  MASTER STATUS;
  • 、查看:SHOW VARIABLES LIKE 'log_bin';
  • 、开启MySQL的binlog写入功能,在mysql的ini文件中配置

log-bin=mysql-bin #开启binlog

binlog-format=ROW #开启ROW模式

server_id=1 #配置MySQL replction需要定义,不要和canal的slaveid重复

 

 

  • 重启mysql
  • 、再次查看:SHOW VARIABLES LIKE 'log_bin';

 

  • 授权canal连接MySQL

#先检查是否有canal

SELECT*FROM mysql.user

#没有就创建

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

FLUSH PRIVILEGES;

 

 

Canal服务端:

  • 、下载linux版本:

 

解压

 

配置文件

 

 

启动

 

查看日志

 

 

Java程序:

  • 、sql脚本:

CREATE TABLE `a_user`(

`id` BIGINT(20) NOT NULL AUTO_INCREMENT,

`userName` VARCHAR(100) NOT NULL,

PRIMARY  KEY(`id`)

)ENGINE=INNODB  AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

 

public class RedisCanalClientExample {

    public static final Integer _60SECONDS = 60;
    public static final String REDIS_IP_ADDR = "192.168.200.110";

    private static void redisInsert(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.del(columns.get(0).getValue());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args) {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //设置监控的数据库与表
            //connector.subscribe(".*\\..*");
            connector.subscribe("test1.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/721771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络(谢希仁第六版)| 课后习题与答案 | 物理层 | 题目知识点详细分析

计算机网络&#xff08;谢希仁第六版&#xff09;课后习题与答案 物理层 博客只对老师给的重点进行整理&#xff0c;完整的课后习题答案见Gitee下载&#xff1a;《计算机网络教程&#xff08;第6版&#xff09;&#xff08;微课版&#xff09;》习题答案 2-5 请画出数据流1 0 1…

Java基础学习-流程控制语句-顺序结构-分支结构-循环结构

目录 顺序结构&#xff1a; 分支结构&#xff1a; if语句&#xff1a; 第一种格式&#xff1a; if第二种格式&#xff1a; 案例练习 if第三种格式&#xff1a; switch语句&#xff1a; 格式&#xff1a; switch其他知识点&#xff1a; 循环结构&#xff1a; for循环…

软件测试面试题:性能测试关注哪些指标?

问题 在工作中&#xff0c;使用JMeter做压力测试时&#xff0c;需要关注其中的哪些指标&#xff1f; 性能测试关注哪些指标&#xff1f; 考察点 面试官想了解&#xff1a; 是否用过 JMeter 指标进行分析 技术点 涉及的技术点&#xff1a; JMeter 结果分析 回答 性能指…

基于 Vitis HLS 的单个乘法 DSP 映射研究

文章目录 1 自媒体账号2 引言3 整数乘法4 定点乘法5 浮点乘法6 总结 1 自媒体账号 目前运营的自媒体账号如下&#xff1a; 哔哩哔哩 【雪天鱼】: 雪天鱼个人主页-bilibili.com 如果觉得有所收获的话&#xff0c;可以点击我的主页 -> 充电 -> 自定义充电 支持一下&#…

2024加密软件排行榜|最新企业常用加密软件推荐

安秉网盾加密软件&#xff1a; 专注于企业级的透明加密解决方案&#xff0c;确保公司内部文件在公司环境外无法被访问。 审批机制灵活&#xff0c;支持多种审批方式&#xff0c;方便管理。 广泛应用于多个行业&#xff0c;拥有丰富的企业环境适配经验。 适合对内部数据安全有严…

设置角色运动的动画

(1) 打开Assets-UnityTechnologies-Animation-Animators&#xff0c;Create-Animation-Controller,命名为JohnLemon (2) 打开JohnLemon&#xff0c;出现下图 (3) 依次将Assets-UnityTechnologies-Animation-Animation中的JohnIdle和JohnWalk拖放到Base Layer窗口中 (4) 右击Idl…

分享由AI制定一个商城网站的开发计划及推荐的开发语言

商城网站开发计划 一、项目概述 本商城网站开发计划旨在创建一个功能齐全、用户友好的在线购物平台&#xff0c;为顾客提供商品浏览、搜索、购物车管理、订单跟踪、在线支付等服务。商城将支持多种商品分类&#xff0c;包括但不限于电子产品、家居用品、服饰鞋帽等。 二、开…

Nginx缓存之代理缓存配置

Nginx 的缓存功能是集成在代理模块中的&#xff0c;当启用缓存功能时&#xff0c;Nginx 将请求返回的响应数据持久化在服务器磁盘中&#xff0c;响应数据缓存的相关元数据、有效期及缓存内容等信息将被存储在定义的共享内存中。当收到客户端请求时&#xff0c;Nginx 会在共享内…

联邦学习周记|第四周

论文&#xff1a;Active Federated Learning 链接 将主动学习引入FL&#xff0c;每次随机抽几个Client拿来train&#xff0c;把置信值低的Client概率调大&#xff0c;就能少跑几次。 论文&#xff1a;Active learning based federated learning for waste and natural disast…

全新AI图像擦处理工具上线,手机电脑版资源合集下载

下载地址&#xff1a; 安卓手机版&#xff1a; 点击下载 苹果手机版&#xff1a; 点击下载 电脑版&#xff08;支持Mac和Windows&#xff09;&#xff1a; 点击下载 图像处理技术在当今迅速发展&#xff0c;为了满足广大用户的需求&#xff0c;我们推出了一款强大的图像优化…

京东健康·全球医疗AI创新大赛开启!32万奖金池等你来拿!

京东健康全球医疗AI创新大赛是由京东健康发起&#xff0c;以探索医疗行业前沿技术与创新应用为导向、携手产学研各界力量&#xff0c;通过AI创新促进医疗服务行业高质量发展的一场大赛。 本次大赛聚焦“睡眠监测智能算法”与“医疗大模型创新应用”两个课题方向&#xff0c;面…

【ARM-Linux篇】POSIX消息队列

System V消息队列POSIX 消息队列主 要 函 数#include <sys/msg.h> int msgget(key_t key, int oflag) int msgsnd(int msqid, const void * ptr, size_t length, int flag) ssize_t msgrcv (int msqid, void *ptr, size_t length, long type, int flag) int msgctl(int m…

HTML入门教程:深度解析HTML,开启你的前端技术之旅

一、引言 HTML&#xff08;HyperText Markup Language&#xff0c;超文本标记语言&#xff09;是前端开发的基础&#xff0c;它负责构建网页的结构和内容。作为前端技术栈的基石&#xff0c;HTML的掌握程度直接影响到网页的开发效率和用户体验。本教程将带你从零开始&#xff…

计算机网络——传输层重要协议(TCP、UDP)

一、常见名词解释 IP地址&#xff1a;IP地址主要用于标识网络主机、其他网络设备&#xff08;如路由器&#xff09;的网络地址&#xff0c;即IP地址用于定位主机的网络地址&#xff1b; IP地址是一个32位的二进制数&#xff0c;通常被分割为4个 8位⼆进制数&#xff08;也就是…

vite项目配置高德api定位功能

项目场景&#xff1a; 用vite项目集成了一个H5页面的小程序&#xff0c;需要调用高德的定位API&#xff0c;在浏览器中测试的时候&#xff0c;出现了一系列定位失败的情况。 问题1 Get ipLocation failed、Geolocation permission denied 本地http访问下&#xff0c;定位失败…

切割游戏介绍

简介 上大学时&#xff0c;在学校实验室里玩过一个貌似使用VC写的小游戏&#xff0c;一个小球在界面上四处游荡&#xff0c;玩家使用鼠标切割背景&#xff0c;将背景切割剩余到一定的百分比后&#xff0c;就胜利了&#xff0c;后边的背景图会全部展示出来。 使用qt的qml技术&a…

C++类与对象、类的6个默认成员函数、构造函数、析构函数等的介绍

文章目录 前言一、类的6个默认成员函数二、构造函数1. 概念2. 特性1. 无参构造函数2. 带参构造函数3. 编译器默认生成的无参构造函数 3. 构造函数的初始化4. 默认构造函数 三、析构函数1. 概念2. 特性3. 编译器默认生成的析构函数的作用4. 构造函数的使用 总结 前言 C类与对象…

Scikit-Learn支持向量机回归

Scikit-Learn支持向量机回归 1、支持向量机回归1.1、最大间隔与SVM的分类1.2、软间隔最大化1.3、支持向量机回归1.4、支持向量机回归的优缺点2、Scikit-Learn支持向量机回归2.1、Scikit-Learn支持向量机回归API2.2、支持向量机回归初体验2.3、支持向量机回归实践(加州房价预测…

PLC通过Profibus协议转Modbus协议网关接LED大屏通讯

一、背景 Modbus协议和Profibus协议是两种常用于工业控制系统的通信协议&#xff0c;它们在自动化领域中起着重要的作用。Modbus是一种串行通信协议&#xff0c;被广泛应用于各种设备之间的通信&#xff0c;如传感器、执行器、PLC等。而Profibus则是一种现场总线通信协议&…

随想录Day63 | 单调栈 42. 接雨水 84.柱状图中最大的矩形

随想录Day63 | 单调栈 42. 接雨水 84.柱状图中最大的矩形 42. 接雨水 题目链接 42 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 第一次提交 class Solution { public:int trap(vector<int>…