Redis和数据库的一致性(Canal+MQ)

想要保证缓存与数据库的双写一致,一共有4种方式,即4种同步策略:

  1. 先更新缓存,再更新数据库;
  2. 先更新数据库,再更新缓存;
  3. 先删除缓存,再更新数据库;
  4. 先更新数据库,再删除缓存

首先说好结论,这4种同步策略无论是哪一种,都无法保证数据库和redis的强一致性,只能保证最终一致性,如要保证强一致,那么只能通过加锁来实现,那么就会造成性能问题,即CAP理论中的AP(强一致)和CP(高可用性)进行取舍,绝大多数场景是确保高可用(CP)。 

更新缓存还是删除缓存 

下面,我们来分析一下,应该采用更新缓存还是删除缓存的方式。

1 更新缓存

优点:每次数据变化都及时更新缓存,所以查询时不容易出现未命中的情况。

缺点:更新缓存的消耗比较大。如果数据需要经过复杂的计算再写入缓存,那么频繁的更新缓存,就会影响服务器的性能。如果是写入数据频繁的业务场景,那么可能频繁的更新缓存时,却没有业务读取该数据。

2 删除缓存

优点:操作简单,无论更新操作是否复杂,都是将缓存中的数据直接删除。

缺点:删除缓存后,下一次查询缓存会出现未命中,这时需要重新读取一次数据库。从上面的比较来看,一般情况下,删除缓存是更优的方案。

先操作数据库还是更新缓存

💡 1.先更新数据库再删除缓存

  • 线程A更新数据库成功,线程A删除缓存失败;
  • 线程B读取缓存成功,由于缓存删除失败,所以线程B读取到的是缓存中旧的数据。
  • 最后线程A删除缓存成功,有别的线程访问缓存同样的数据,与数据库中的数据是一样。
  • 最终,缓存和数据库的数据是一致的,但是会有一些线程读到旧的数据。

 

1.2正常情况下没有出现失败场景

在并发场景下,也许会有些许线程像线程b一样读的是旧数据,但在删除缓存后,最终缓存与数据库的数据是一致的,并且都是最新的数据。但线程B在这个过程里读到了旧的数据,可能还有其他线程也像线程B一样,在这两步之间读到了缓存中旧的数据,但因为这两步的执行速度会比较快,所以影响不大。对于这两步之后,其他进程再读取缓存数据的时候,就不会出现类似于进程B的问题了。

2.先删除缓存再更新数据库

  • 线程A删除缓存成功,线程A更新数据库失败;
  • 线程B从缓存中读取数据;由于缓存被删,进程B无法从缓存中得到数据,进而从数据库读取数据;此时数据库中的数据更新失败,线程B从数据库成功获取旧的数据,然后将数据更新到了缓存。
  • 最终,如果没有异步重试的话缓存和数据库的数据是一致的,但仍然是旧的数据。

2.2正常情况下没有出现失败场景

 

进程A的两步操作均成功,但由于存在并发,在这两步之间,进程B访问了缓存。最终结果是,缓存中存储了旧的数据,而数据库中存储了新的数据,二者数据不一致。

这种方式的解决方案也就是在第2步更新数据库后,延迟一会再删一次Redis,也就是延迟双删,这样就可以保证最终数据一致性。

最终结论:

经过对比你会发现,先更新数据库、再删除缓存是影响更小的方案。如果第二步出现失败的情况,则可以采用重试机制解决问题。

最终解决方案

利用(MQ)消息队列和Canal中间件进行删除的补偿

Canal目前在大型企业中热度下降,使用flinkcdc是目前的趋势,而目前主流CDC(变更数据获取)是flink cdc 而flinkcdc插件是基于flink平台(大数据平台)此处只需要简单理解Canal作用并简单实现即可。目前企业中常见的数据同步方案就是CDC中间件+MQ的方案,大型公司一般是有大数据业务,所以使用大数据平台和kafka,此处使用的是Canal+Rabbitmq

Canal安装与部署

Mysql前置准备

在服务中找到Mysql配置文件对应目录

其中一共需要注意四个配置项

server-id=1	                     #master端的ID号【必须是唯一的】;
log-bin=D:\MySQL\binlog\mysql-bin.log	  #同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog_format=row	               #行级,记录每次操作后每行记录的变化。
binlog-do-db=db_xiaomi	           #指定库,缩小监控的范围。

 

1.查看端口号配置对应主要用于集群环境下区分id

2.创建binlog文件存放目录

3.数据的保存格式(一共有三种)

4.指定需要监控的库名(如果该项不指定配置,那么默认所有数据库开启binlog)

设置好后启动服务。

启动后看到在binlog文件目录中看到log文件

💡Mysql的binlog日志三种格式

Canal默认选择的是ROW

  • STATEMENT:基于SQL语句的复制(statement-based replication, SBR)
  • ROW:基于行的复制(row-based replication, RBR)
  • MIXED:混合模式复制(mixed-based replication, MBR)

官网下载地址:Release v1.1.7 · alibaba/canal · GitHub

修改Mysql示例配置文件

修改连接数据库授权的用户和密码

配置好后在bin目录中执行启动命令文件

到此完成

JAVA项目整合Canal

引入依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>

测试demo

public static void main(String[] args) throws InvalidProtocolBufferException {
    CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
    while (true) {
        //2.获取连接
        canalConnector.connect();
        //3.指定要监控的数据库
        canalConnector.subscribe("db.xiaomi.*");
        //4.获取 Message
        Message message = canalConnector.get(100);
        List<CanalEntry.Entry> entries = message.getEntries();
        if (entries.size() <= 0) {
            System.out.println("没有数据,休息一会");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            for (CanalEntry.Entry entry : entries) {
                // 获取表名
                String tableName = entry.getHeader().getTableName();
                //  Entry 类型
                CanalEntry.EntryType entryType = entry.getEntryType();
                //  判断 entryType 是否为 ROWDATA
                if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    //  序列化数据
                    ByteString storeValue = entry.getStoreValue();
                    //  反序列化
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                    // 获取事件类型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    // 获取具体的数据
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    // 遍历并打印数据
                    for (CanalEntry.RowData rowData : rowDatasList) {
                        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                        Map<String, Object> bMap = new HashMap<>();
                        for (CanalEntry.Column column : beforeColumnsList) {
                            bMap.put(column.getName(), column.getValue());
                        }
                        Map<String, Object> afMap = new HashMap<>();
                        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                        for (CanalEntry.Column column : afterColumnsList) {
                            afMap.put(column.getName(), column.getValue());
                        }
                        System.out.println("表名:" + tableName + ",操作类型:" + eventType);
                        System.out.println("改前:" + bMap);
                        System.out.println("改后:" + afMap);
                    }
                }
            }
        }
    }
}

JAVA项目整合RabbitMQ

最终解决思路

即先更新数据库,然后在删除缓存,更新数据库后通过Canal拉去MySQL的binlog日志,将更新消息放入MQ,由MQ异步执行删除操作。

业务思路:

在下订后将库存数据库更新,根据商品的id值进行更新缓存和es;

业务思路图

具体实现

那么Canal类的代码如上图所示,因为他是监听功能,那么就要一直启动保持运行,目前是将Canal类放在页面访问服务实例中,那么在SpringBoot的Application启动时应该也要将Canal启动。Spring提供两种方式实现CommandLineRunner接口@PostConstruct注解来实现。

此处以实现CommandLineRunner接口为例,只适合类似于初始化一些数据

💡此处不适合使用上面的类,应为其中使用了while(true)中写了个死循环一直运行,这样就会导致启动类启动后执行这个类而导致一直阻塞在这里。如果要使用那么应该是在一个单独的服务模块中就可以这样使用。

/**Canal监听类
 * @author 12547
 * @version 1.0
 * @Date 2024/3/19 20:44
 */
@Component
public class CanalRunner implements CommandLineRunner {

  
    @Override
    public void run(String... args) throws Exception {
        System.out.println(">>>>>>>此处并不适用Canal的运行方式<<<<<<<<<<");
        System.out.println(">>>>>>>应该是单独起一个线程<<<<<<<<<<");
        
    }
}

可以看到在启动SpringBoot实例后,执行了该方法。

💡bug解决

但在编写其他测试类的时候发现其一直阻塞在这里而不执行测试代码,推测其一直阻塞线程(因为有死循环)。

解决方案

将其改为异步形式执行。将其改为@Async异步执行。

@Async的使用

应为Async用到线程池相关,所以先自定义一个用于异步的线程池

/** 自定义线程池 bean 用于Async异步调用
 * @author 12547
 * @version 1.0
 * @Date 2024/3/20 15:45
 */
@Configuration
@EnableAsync
public class AsyncConfig {

    /**
     * 自定义线程
     * @return
     */
    @Bean("asyncPoll")
    public ThreadPoolTaskExecutor asyncOperationExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(8);
        // 设置最大线程数
        executor.setMaxPoolSize(20);
        // 设置队列大小
        executor.setQueueCapacity(Integer.MAX_VALUE);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置线程名前缀+分组名称
        executor.setThreadNamePrefix("AsyncOperationThread-");
        executor.setThreadGroupName("AsyncOperationGroup");
        // 所有任务结束后关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 初始化
        executor.initialize();
        return executor;
    }

}

将Canal的运行代码改为异步执行

在启动类后通过调用使其异步执行即可。并经过测试后不再影响测试类的使用。

那么在Canal监听类中,当监听到数据变化后,将变化发送给MQ消息

消费者监听类

/**异步数据更新Redis类
 * @author 12547
 * @version 1.0
 * @Date 2024/3/20 15:49
 */
@Component
public class RedisDataListenerService {

    @Autowired
    private CacheService cacheService;

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * Redis数据更新消费者监听方法
     */
    @RabbitListener(queues = MqConstants.QUEUE_NAME)
    public void updateRedisDataByAsync(Map<String,Object> msg){
        System.out.println("监听到数据变化:");
        System.out.println("数据变化商品id:"+msg.get("id"));  //正常情况Redis应该每个商品id一个key  TODO 需要改造详情缓存查询将List<phone>改为单独的一个phone对象
        redisTemplate.opsForHash().putAll(msg.get("id").toString(),msg);
        System.out.println(msg.get("id").toString());
        System.out.println(cacheService.getHashCache(msg.get("id").toString(), "num"));
    }

}

与Redis实现数据同步基本demo到这差不多了已经,后续可以结合项目进一步优化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/938003.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CNCF云原生生态版图-分类指南(一)- 观测和分析

CNCF云原生生态版图-分类指南&#xff08;一&#xff09;- 观测和分析 CNCF云原生生态版图-分类指南一、观测和分析&#xff08;Observability and Analysis&#xff09;&#xff08;一&#xff09;可观测性&#xff08;Observablility&#xff09;1. 是什么&#xff1f;2. 解决…

JVM运行时数据区内部结构

VM内部结构 对于jvm来说他的内部结构主要分成三个部分&#xff0c;分别是类加载阶段&#xff0c;运行时数据区&#xff0c;以及垃圾回收区域&#xff0c;类加载我们放到之后来总结&#xff0c;今天先复习一下类运行区域 首先这个区域主要是分成如下几个部分 下面举个例子来解释…

C语言学习day22:URLDownloadToFile函数/开发文件下载工具

简言&#xff1a; 在之前我们去下载某个东西都是用的迅雷之类的软件&#xff0c;但是现在&#xff0c;只要提供一个地址&#xff0c;或者一个链接&#xff0c;我们自己去做一个工具去下载。这就是我们这篇的主要内容。 也就是我们的winAPI&#xff1a;URLDownloadToFile函数 …

购物车案例--分模块存储数据,发送请求数据渲染,底部总计数量和价格

shift鼠标右键&#xff0c;打开powershell&#xff0c;新建项目 自定义 只有一个页面&#xff0c;不涉及路由&#xff0c;勾选vuex,css,babel 无需保存预设 回车项目开始创建 项目用vscode打开 将src里的内容全部清空 将第七天的课程准备代码复制粘贴到src中 刷新页面&…

SQL server学习06-查询数据表中的数据(中)

目录 一&#xff0c;聚合函数 1&#xff0c;常用聚合函数 2&#xff0c;具体使用 二&#xff0c;GROP BY子句分组 1&#xff0c;基础语法 2&#xff0c;具体使用 3&#xff0c;加上HAVING对组进行筛选 4&#xff0c;使WHERE记录查询条件 汇总查询&#xff1a;在对数…

YOLOv5-7.0训练过程中出现报错Example: export GIT_PYTHON_REFRESH=quiet

出现报错&#xff1a; This initial message can be silenced or aggravated in the future by setting the $GIT_PYTHON_REFRESH environment variable. Use one of the following values: - quiet|q|silence|s|silent|none|n|0: for no message or exception - warn…

从0到1实现vue3+vite++elementuiPlus+ts的后台管理系统(一)

前言&#xff1a;从这篇文章开始实现vue3vite的后台管理系统&#xff0c;记录下自己搭建后台系统图的过程。 这篇文章完成项目的初始化和基本配置&#xff0c;这一步可以直接跟着vue3官网进行。整个系列只有前端部分&#xff0c;不涉及后端。 vue3官网&#xff1a;https://cn.…

Spring Boot教程之二十五: 使用 Tomcat 部署项目

Spring Boot – 使用 Tomcat 部署项目 Spring Boot 是一个基于微服务的框架&#xff0c;在其中创建可用于生产的应用程序只需很少的时间。Spring Boot 建立在 Spring 之上&#xff0c;包含 Spring 的所有功能。如今&#xff0c;它正成为开发人员的最爱&#xff0c;因为它是一个…

java中操作线程

文章目录 前言创建与运行线程1. 创建线程①、方法1(直接new)②、方法2(使用Runnable配合Thread进行new操作)③、方法3&#xff08;FutureTask对象实现&#xff09;④、线程创建原理特别注意&#xff01; 2查看与杀死线程①、 Windows下 &#xff1a;②、 Java下 &#xff1a; 3…

【redis】redix在Linux下的环境配置和redis的全局命令

˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN 如…

百度智能云千帆AppBuilder升级,百度AI搜索组件上线,RAG支持无限容量向量存储!

百度智能云千帆 AppBuilder 发版升级&#xff01; 进一步降低开发门槛&#xff0c;落地大模型到应用的最后一公里。在千帆 AppBuilder 最新升级的 V1.1版本中&#xff0c;企业级 RAG 和 Agent 能力再度提升&#xff0c;同时组件生态与应用集成分发更加优化。 • 企业级 RAG&am…

uniappp配置导航栏自定义按钮(解决首次加载图标失败问题)

1.引入iconfont的图标&#xff0c;只保留这两个文件 2.App.vue引入到全局中 import "./static/fonts/iconfont.css"3.pages.json中配置text为图标对应的unicode {"path": "pages/invite/invite","style": {"h5": {"…

融云分享基于 Rust 的鸿蒙 SDK 开发实践

12 月 5 日&#xff0c;以“同心聚力&#xff0c;共建共享鸿蒙新生态”为主题的“鸿蒙生态伙伴 SDK 开发者论坛”在京举行。 融云凭借对鸿蒙生态的率先适配和创新贡献&#xff0c;荣获华为鸿蒙生态“HarmonyOS NEXT SDK 星河奖”。 本次论坛邀请了多位行业领导者参与&#xff…

iperf3 带宽性能测量工具

随笔记录 目录 1 背景介绍 2. Iperf3 2.1 基本框架介绍 2.2 Iperf3 发送TCP包 2.3 IPerf 发送UDP 包 1 背景介绍 基于测试USER DMA 压力测试需求。 2. Iperf3 2.1 基本框架介绍 Iperf3 发送数据包 TCP/UDP 包 1. 查看网卡配置信息 2. 此处因共用一张板卡&#xff0…

Java-27 深入浅出 Spring - 实现简易Ioc-03 在上节的业务下手动实现IoC

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 大数据篇正在更新&#xff01;https://blog.csdn.net/w776341482/category_12713819.html 目前已经更新到了&#xff1a; MyBatis&#xff…

vue使用pdfh5.js插件,显示pdf文件白屏

pdfh5&#xff0c;展示文件白屏&#xff0c;无报错 实现效果图解决方法(降版本)排查问题过程发现问题查找问题根源1、代码写错了&#xff1f;2、预览文件流的问题&#xff1f;3、pdfh5插件更新了&#xff0c;我的依赖包没更新&#xff1f;4、真相大白 彩蛋 实现效果图 解决方法…

【机器学习算法】——决策树之集成学习:Bagging、Adaboost、Xgboost、RandomForest、XGBoost

集成学习 **集成学习(Ensemble learning)**是机器学习中近年来的一大热门领域。其中的集成方法是用多种学习方法的组合来获取比原方法更优的结果。 使用于组合的算法是弱学习算法&#xff0c;即分类正确率仅比随机猜测略高的学习算法&#xff0c;但是组合之后的效果仍可能高于…

C/S软件授权注册系统(Winform+WebApi+.NET8+EFCore版)

适用软件&#xff1a;C/S系统、Winform桌面应用软件。 运行平台&#xff1a;Windows .NETCore&#xff0c;.NET8 开发工具&#xff1a;Visual Studio 2022&#xff0c;C#语言 数据库&#xff1a;Microsoft SQLServer 2012&#xff0c;Oracle 21c&#xff0c;MySQL8&#xf…

Big Model weekly | 第49期

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 01 Magnetic Preference Optimization: Achieving Last-iterate Convergence for Language Models Alignment 自我对弈方法在多个领域增强模型能力方面展现出了显著的成功。在基于人类反馈的强化学习&#xff0…

如何建设金融数据中心

目录 总则 概述 要求 基本原则 数据中心治理 概述 战略管控 战略规划 战略实施 延伸阅读 总则 概述 本文以描述金融数据中心的治理域内容为基础,从金融数据中心建设、运营及安全保障的角度出 发,逐一描述场地环境、网络通信、运行管理和风险管控等能力域的具体…