基于SpringBoot实现MySQL与Redis的数据最终一致性

问题场景

在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。

解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside

  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。

Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

public class Like {
    private String postId;
    private int likeCount;

    // 构造函数、getter和setter方法
}

逻辑层

@Service
public class LikeService {
    private final LikeRepository likeRepository;
    private final RedisUtils redisUtils;

    public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {
        this.likeRepository = likeRepository;
        this.redisUtils = redisUtils;
    }

    public Like getLikeInfo(String postId) {
        String cacheKey = "like:" + postId;

        // 从缓存中获取点赞信息
        Like like = (Like) redisUtils.get(cacheKey);

        // 如果缓存中不存在,则从持久层(数据库)获取
        if (like == null) {
            like = likeRepository.findByPostId(postId);

            // 如果数据库中存在数据,则保存到缓存中
            if (like != null) {
                redisUtils.set(cacheKey, like);
            }
        }

        // 如果点赞信息为空,则初始化为0
        if (like == null) {
            like = new Like(postId, 0);
        }

        return like;
    }

    public void addLike(String postId) {
        String cacheKey = "like:" + postId;

        // 在持久层(数据库)新增点赞信息
        Like like = likeRepository.findByPostId(postId);

        if (like == null) {
            like = new Like(postId, 1);
        } else {
            like.setLikeCount(like.getLikeCount() + 1);
        }

        likeRepository.save(like);

        // 更新缓存中的数据
        redisUtils.set(cacheKey, like);
    }
}

canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
version: '2.4'

services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    restart: false
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "33060:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql

  canal:
    image: canal/canal-server:v1.1.5
    container_name: canal
    restart: false
    ports:
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - mysql
    environment:
      - canal.destinations=example
      - canal.instance.mysql.slaveId=1234
      - canal.instance.master.address=mysql:3306
      - canal.instance.dbUsername=root
      - canal.instance.dbPassword=root
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=false
      - canal.instance.gtidon=false
      - canal.instance.filter.regex=.*
      - canal.instance.filter.black.regex=mysql\.slave_.*
      
      
  redis:
    image: redis:latest
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

CREATE TABLE `balance` (
  `id` varchar(50) NOT NULL COMMENT '主键',
  `account` varchar(50) NOT NULL COMMENT '账户',
  `amount` decimal(10,2) NOT NULL COMMENT '金额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

/**
 * @author: liu_pc
 * Date:        2023/8/25
 * Description: 余额信息变更Redis变成处理类
 * Version:     1.0
 */
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {

    private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);

    private final RedisUtils redisUtils;

    private final CanalConfig canalConfig;

    private final Executor executor;

    @Value("${canal.server.open}")
    private boolean open;

    @Autowired
    public BalanceRedisProcessorService(RedisUtils redisUtils,
                                        CanalConfig canalConfig,
                                        @Qualifier("ownThreadPoolExecutor") Executor executor) {
        this.redisUtils = redisUtils;
        this.canalConfig = canalConfig;
        this.executor = executor;
    }


    @PostConstruct
    public void init() {
        Map<String, String> mainMdcContext = Maps.newHashMap();
        mainMdcContext.put("canal-thread", "balance-redis-processor-service");
        MDC.setContextMap(mainMdcContext);
        executor.execute(this);
        logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");
    }


    @Override
    public void run() {
        CanalConnector canalConnector = canalConfig.canalConnector();
        canalConnector.connect();
        // 回滚到未进行ack的地方
        canalConnector.rollback();
        try {
            while (open) {
                // 获取数据 每次获取一百条改变数据
                Message message = canalConnector.getWithoutAck(100);
                //获取这条消息的id
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    Thread.sleep(1000);
                    continue;
                }

                // 处理数据
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        if (eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                String tableName = entry.getHeader().getTableName();

                                // 判断是否是 Balance 表的 amount 字段变更
                                if ("balance".equals(tableName)) {
                                    StringBuilder redisKey = new StringBuilder("balance:");
                                    for (CanalEntry.Column column : columns) {
                                        logger.info("Balance changed in 'balance' dataInfo: {}", column);
                                        if ("id".equals(column.getName())) {
                                            String changeId = column.getValue();
                                            logger.info("当前变更id为:{}", changeId);
                                            redisKey.append(changeId);
                                        }
                                        if ("amount".equals(column.getName())) {
                                            String changeValue = column.getValue();
                                            logger.info(changeValue);
                                            redisUtils.set(redisKey.toString(), changeValue);
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
                // 确认消费完成这条消息
                canalConnector.ack(message.getId());
                logger.info("消费成功");
            }
        } catch (Exception e) {
            logger.warn("canal-消费失败");
        } finally {
            // 关闭连接
            canalConnector.disconnect();
        }
    }
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

mdc_logback

同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树

公众号文章更新更及时,以及一些程序员周边相关更新。

感谢您阅读到这里,不胜感激。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/92017.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(java) 进程调度

目录 进程 首先我们要了解一下什么是进程&#xff1f; 那如何管理进程&#xff1f; PCB中比较重要的属性 进程调度 为什么要进行进程调度&#xff1f; 状态 优先级 上下文 拓展介绍一下寄存器 记账信息 进程 首先我们要了解一下什么是进程&#xff1f; 简单来说…

ubuntu 22.04 LTS openai triton 安装

第一种方法&#xff1a; pip install triton 第二种方法&#xff0c;安装最新的版本&#xff1a; pip install -U --index-url https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/Triton-Nightly/pypi/simple/ triton-nightly 第三种方法&#xff1a; git c…

优化器调整策略

损失函数的作用是衡量模型输出与真实标签的差异。当我们有了这个loss之后&#xff0c;我们就可以通过反向传播机制得到参数的梯度&#xff0c;那么我们如何利用这个梯度进行更新参数使得模型的loss逐渐的降低呢&#xff1f; 优化器的作用 Pytorch的优化器&#xff1a; 管理并…

echarts实现图表标签(label)可拖拽,以及保存拖拽后的位置

需求背景&#xff1a; 当echarts图表中像素点非常多&#xff0c;或者有像素点重合的时候&#xff0c;标签就会被覆盖或者重叠。为了解决这个问题&#xff0c;让用户体验更加友好&#xff0c;于是就实现了对label进行拖拽。用户可以把label拖拽到任何他想要的位置&#xff0c;并…

pandas由入门到精通-数据透视表

采集的数据存储后通常会分为多个文件或数据库,如何将这些文件按需拼接,或按键进行连接十分重要。这节将介绍数据索引的复杂操作如分层索引,stack,unstack,seet_index,reset_index等帮助重构数据,数据的拼接如merge,join,concat,combine_first等帮助连接数据,以及数据透视表…

【C++初阶】模拟实现list

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

Docker安装Jenkins实操记录

前置条件&#xff1a; 1、安装了docker 2、安装了java&#xff08;没有安装情况下&#xff0c;可运行&#xff1a;yum install -y java-1.8.0-openjdk-devel.x86_64&#xff09; 一、拉取镜像 1、docker pull jenkins/jenkins 2、mkdir -p /usr/local/jenkins 3、chmod 777 …

Ubuntu搭建CT_ICP里程计的环境暨CT-ICP部署

CT-ICP部署以及运行复现过程 0.下载资源&#xff0c;并按照github原网址的过程进行。1.查看所需要的各个部分的版本。2.安装clang编译器3.进行超级构建3.1标准进行3.2构建过程中遇到的问题 4.构建并安装CT-ICP库4.1标准进行4.2遇到的问题及解决办法 5.构建 CT-ICP 的 ROS 包装5…

python+mysql+前后端分离国内职位数据分析(源码+文档+指导)

系统阐述的是使用国内python职位数据分析系统的设计与实现&#xff0c;对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计&#xff0c;描述&#xff0c;实现和分析与测试方面来表明开发的过程。开发中使用了 Flask框架和MySql数据库技术搭建系统的整体…

深度学习算法模型转成算能科技平台xx.bmodel模型的方法步骤

目录 1 docker镜像下载 2 SDK下载 3 下载sophon-demo 4 修改docker镜像的脚本 5 创建个文件夹 6.source 7.转模型 1 docker镜像下载 可以在dockerhub看到镜像的相关信息 https://hub.docker.com/r/sophgo/tpuc_dev/tags 用下面的命令下载 docker pull sophgo/tpuc_d…

Vue2向Vue3过度Vuex状态管理工具快速入门

目录 1 Vuex概述1.是什么2.使用场景3.优势4.注意&#xff1a; 2 需求: 多组件共享数据1.创建项目2.创建三个组件, 目录如下3.源代码如下 3 vuex 的使用 - 创建仓库1.安装 vuex2.新建 store/index.js 专门存放 vuex3.创建仓库 store/index.js4 在 main.js 中导入挂载到 Vue 实例…

【微服务】04-Polly实现失败重试和限流熔断

文章目录 1. Polly实现失败重试1.1 Polly组件包1.2 Polly的能力1.3 Polly使用步骤1.4 适合失败重试的场景1.5 最佳实践 2.Polly实现熔断限流避免雪崩效应2.1 策略类型2.2 组合策略 1. Polly实现失败重试 1.1 Polly组件包 PollyPolly.Extensions.HttpMicrosoft.Extensions.Htt…

SMC_Interpolator2Dir反向插补运动

附加函数是&#xff1a; SMC_Interpolator2Dir_SlowTask 函数的位置&#xff1a; 输入&#xff1a; 运行 bExecute 【BOOL】 路径包 poqDataIn 指向SMC_OUTQUEUE的指针 停止 bSlow_Stop 停止BOOL 急停 bEmergency_Stop 紧急停止BOOL 单…

1. HBase中文学习手册之揭开HBase的神秘面纱

揭开Hbase的神秘面纱 1.1 欢迎使用 Apache Hbase1.1.1 什么是 Hbase?1.1.2 Hbase的前世今生1.1.3 HBase的技术选型&#xff1f;1.1.3.1 不适合使用 HBase的场景1.1.3.2 适合使用 HBase的场景 1.1.4 HBase的特点1.1.4.1 HBase的优点1.1.4.2 HBase的缺点 1.1.5 HBase设计架构 1.…

[JavaWeb]【九】web后端开发-SpringBootWeb案例(菜单)

目录 一、准备工作 1.1 需求 1.2 环境搭建 1.2.1 准备数据库&表 1.2.2 创建springboot工程 1.2.3 配置application.properties & 准备对应实体类 1.2.3.1 application.properties 1.2.3.2 实体类 1.2.3.2.1 Emp类 1.2.3.2.2 Dept类 1.2.4 准备对应的Mapper、…

Yolo系列-yolov2

YOLO-V2 更快&#xff01;更强&#xff01; YOLO-V2-BatchNormalization BatchNormalization&#xff08;批归一化&#xff09;是一个常用的深度神经网络优化技术&#xff0c;它可以将输入数据进行归一化处理&#xff0c;使得神经网络更容易进行学习。在YOLOv2中&#xff0c;B…

wxpython + cef 是优秀的 WebView 组件

CEF 即 (Chromium Embedded Framework)&#xff1b;cef 是优秀的 WebView 组件。 pip install wxpython4.2 wxPython-4.2.0-cp37-cp37m-win_amd64.whl (18.0 MB) Successfully installed wxpython-4.2.0 pip install cefpython3 cefpython3-66.1-py2.py3-none-win_amd64.whl …

C语言基础之——指针(上)

前言&#xff1a;小伙伴们又见面啦&#xff01;本期内容&#xff0c;博主将展开讲解有关C语言中指针的上半部分基础知识&#xff0c;一起学习起来叭&#xff01;&#xff01;&#xff01; 目录 一.什么是指针 二.指针类型 1.指针的解引用 2.指针-整数 三.野指针 1.野指针…

Qt --- QTimer

在Qt开发界面的时候&#xff0c;非常多的时候都得使用定时器&#xff0c;定时器具体可以干什么呢&#xff1f;比如&#xff1a;控制时钟、定时改变样式、改变进度等。。。说到这里&#xff0c;经常使用QQ&#xff0c;而不同的时段都会显示不同的背景&#xff0c;我认为如果用Qt…

文本编辑器Vim常用操作和技巧

文章目录 1. Vim常用操作1.1 Vim简介1.2 Vim工作模式1.3 插入命令1.4 定位命令1.5 删除命令1.6 复制和剪切命令1.7 替换和取消命令1.8 搜索和搜索替换命令1.9 保存和退出命令 2. Vim使用技巧 1. Vim常用操作 1.1 Vim简介 Vim是一个功能强大的全屏幕文本编辑器&#xff0c;是L…