Redis学习(十)|使用消息队列的重试机制实现 MySQL 和 Redis 的数据一致性

文章目录

  • 介绍
  • 原理
  • 整体方案
  • 实现步骤
  • 示例代码
  • 总结
  • 其他:Kafka 重试策略配置
    • 1. 生产者重试策略配置
    • 2. 消费者重试策略配置

介绍

在分布式系统中,保持 MySQL 和 Redis 之间的数据一致性是至关重要的。为了确保数据的一致性,我们通常采取先更新数据库,再删除缓存的方案。然而,在实际应用中,由于网络问题、服务故障等原因,可能会导致数据库更新成功而缓存删除失败,进而导致数据不一致。为了解决这个问题,我们可以引入消息队列的重试机制,以确保缓存删除成功。

原理

重试机制是一种容错机制,用于在消息发送失败或者处理失败时进行重试。通过将数据更新操作封装成消息,并发送到消息队列中,在消费者处理消息时进行重试,可以提高系统的可靠性和稳定性。我们将使用消息队列的重试机制来实现 MySQL 和 Redis 的数据一致性。

整体方案

整体方案如下:

  1. 应用程序首先将数据更新操作发送到消息队列中。
  2. 消费者从消息队列中获取消息,并根据消息中的数据删除 Redis 中的缓存数据。
  3. 如果应用删除缓存失败,可以从消息队列中重新读取数据,然后再次删除缓存,这个就是重试机制。当然,如果重试超过的一定次数,还是没有成功,就需要向业务层发送报错信息了。
  4. 如果删除缓存成功,就要把数据从消息队列中移除,避免重复操作,否则就继续重试。

实现步骤

以下是使用消息队列的重试机制实现 MySQL 和 Redis 数据一致性的基本步骤:

  1. 将数据更新操作封装成消息:在应用程序中,将数据更新操作封装成消息,并发送到消息队列中。消息中应包含数据更新操作的类型(如插入、更新或删除)以及相关的数据。
  2. 消费者消息处理和失败重试:消费者从消息队列中获取消息,并根据消息中的数据更新操作来删除 Redis 中的缓存数据。消息消费失败后自动进行重试。可以根据重试次数和重试间隔来配置重试机制,例如指数退避策略。
  3. 消息确认机制:消费者在成功处理消息后,需要发送确认消息给消息队列,告知消息队列可以删除或标记消息为已处理。这样可以确保消息在成功处理后不会被重新处理,避免重复处理的情况。

示例代码

以下是一个简单的示例代码,演示了如何使用 Java 实现消息队列的重试机制,以确保 MySQL 和 Redis 的数据一致性:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MySQLToKafka {

    private static final String TOPIC_NAME = "mysql_updates";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "cache-deletion-group";

    public static void main(String[] args) {
        // 模拟 MySQL 更新后发送消息到 Kafka
        sendMySQLUpdateToKafka("data_update");

        // 模拟从 Kafka 拉取消息删除 Redis 缓存
        pullMessageFromKafkaAndDeleteCache();
    }

    // 发送 MySQL 更新消息到 Kafka
    private static void sendMySQLUpdateToKafka(String message) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);

        try {
            producer.send(record).get();
            System.out.println("Message sent to Kafka successfully: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    // 从 Kafka 拉取消息并删除 Redis 缓存
    private static void pullMessageFromKafkaAndDeleteCache() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: offset = %d, key = %s, value = %s%n",
                            record.offset(), record.key(), record.value());
                    if (deleteCacheFromRedis(record.value())) {
                        System.out.println("Cache deleted from Redis successfully.");
                        // 处理消息成功后手动提交偏移量
                        consumer.commitAsync();
                    } else {
                        System.out.println("Cache deletion from Redis failed. Kafka will retry.");
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    // 模拟删除 Redis 缓存
    private static boolean deleteCacheFromRedis(String message) {
        // 这里省略删除 Redis 缓存的逻辑,直接模拟删除成功和失败
        // 模拟删除成功的概率为 0.7
        if (Math.random() < 0.7) {
            return true;
        }
        return false;
    }
}

以上代码完成了以下几个功能:

  1. 发送 MySQL 更新消息到 Kafka: sendMySQLUpdateToKafka 方法模拟了 MySQL 更新后发送消息到 Kafka 的过程。它使用 Kafka 生产者将消息发送到指定的 Kafka 主题中。
  2. 从 Kafka 拉取消息并删除 Redis 缓存: pullMessageFromKafkaAndDeleteCache 方法模拟了从 Kafka 拉取消息并删除 Redis 缓存的过程。它使用 Kafka 消费者订阅指定的 Kafka 主题,并轮询获取消息。对于每条消息,它尝试删除对应的 Redis 缓存。如果删除失败,就会打印一条消息表示失败,然后 Kafka 将自动重试该消息。只有在成功删除缓存后,才会手动提交偏移量。
  3. 模拟删除 Redis 缓存: deleteCacheFromRedis 方法用于模拟删除 Redis 缓存的过程。它返回一个布尔值,表示缓存删除操作的成功或失败。在实际应用中,这个方法应该被替换为真正的删除 Redis 缓存的逻辑。

通过这样的流程,模拟了一个简单的数据更新、消息发送、消息消费、缓存删除的完整流程,并且在处理消息失败时利用 Kafka 的自动重试机制进行了处理。

总结

通过引入消息队列的重试机制,可以有效地实现 MySQL 和 Redis 的数据一致性。使用重试机制,可以确保数据在 MySQL 更新后 Redis 的对应缓存能够成功删除,从而保持数据的一致性。这种方法适用于需要处理大量数据更新和异步消息传输的场景,同时也提高了系统的可靠性和稳定性。

其他:Kafka 重试策略配置

1. 生产者重试策略配置

对于 Kafka 生产者,可以通过配置以下参数来定义重试策略:

  • retries: 设置生产者在发生可重试的异常时重试的最大次数。默认值为 2147483647(即最大整数)。
  • retry.backoff.ms: 设置生产者在重试之间等待的时间。默认值为 100 毫秒。

示例配置:

# 设置最大重试次数为 3 次
retries=3
# 设置重试之间的等待时间为 500 毫秒
retry.backoff.ms=500

2. 消费者重试策略配置

对于 Kafka 消费者,可以通过以下参数来定义重试策略:

  • enable.auto.commit: 指定消费者是否自动提交偏移量。默认为 true,表示自动提交。
  • auto.commit.interval.ms: 如果启用了自动提交偏移量,可以通过该参数设置自动提交的间隔时间。默认值为 5000 毫秒。
  • max.poll.interval.ms: 设置消费者在拉取消息之间的最大时间间隔。如果消费者在此间隔内没有发送心跳,将被认为失败,并且将其分区重新分配给其他消费者。默认值为 300000 毫秒(5 分钟)。
  • max.poll.records: 设置消费者在单次调用 poll 方法中拉取的最大记录数。默认值为 500 条。

示例配置:

# 禁用自动提交偏移量
enable.auto.commit=false
# 设置自动提交偏移量的间隔时间为 1000 毫秒
auto.commit.interval.ms=1000
# 设置拉取消息之间的最大时间间隔为 10 秒
max.poll.interval.ms=10000
# 设置单次 poll 方法拉取的最大记录数为 100 条
max.poll.records=100

通过以上配置,可以定制 Kafka 生产者和消费者的重试策略,以适应不同的业务需求和性能要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/600796.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

红外与可见光图像融合评价指标(cddfusion中的代码Evaluator.py)

一、Evaluator.py全部代码&#xff08;可正常调用&#xff09; import numpy as np import cv2 import sklearn.metrics as skm from scipy.signal import convolve2d import math from skimage.metrics import structural_similarity as ssimdef image_read_cv2(path, modeRGB…

minio上传文件失败如何解决

1. 做了什么操作 通过接口上传excel文件&#xff0c;返回响应值 2. 错误如图 2. 如何解决 根据错误描述定位到了部署minio的地方minio通过docker部署&#xff0c;找到docker - compose发现配置文件中minio有两个端口&#xff0c;一个是用于api的&#xff0c;一个是用于管理界面…

AI模型:windows本地运行下载安装ollama运行Google CodeGemma可离线运行数据模型【自留记录】

AI模型&#xff1a;windows本地运行下载安装ollama运行Google CodeGemma可离线运行数据模型【自留记录】 CodeGemma 没法直接运行&#xff0c;需要中间软件。下载安装ollama后&#xff0c;使用ollama运行CodeGemma。 类似 前端本地需要安装 node.js 才可能跑vue、react项目 1…

QX-mini51学习---(2)点亮LED

目录 1什么是ed 2led工作参数 3本节相关原理图分析 4本节相关c 5实践 1什么是ed 半导体发光二极管&#xff0c;将电能转化为光能&#xff0c;耗电低&#xff0c;寿命长&#xff0c;抗震动 长正短负&#xff0c;贴片是绿点处是负极 2led工作参数 3本节相关原理图分析 当…

工业网关设备的种类、功能及其在各种工业场景中的应用-天拓四方

在快速发展的工业信息化时代&#xff0c;工业网关设备作为连接工业设备与云平台的桥梁&#xff0c;发挥着至关重要的作用。本文将详细介绍工业网关设备的种类、功能以及其在各种工业场景中的应用&#xff0c;帮助广大读者更深入地了解这一重要设备。 一、工业网关设备的种类 …

【Linux 基础 IO】文件系统

文章目录 1.初步理解文件2.C语言环境下的文件操作2.1 C库中 fopen、fwrite 的讲解2.2 C文件操作的实例 3.系统调用接口的讲解 1.初步理解文件 &#x1f427;① 打开文件&#xff1a; 本质是进程打开文件&#xff0c;只有程序运行起来文件才被打开&#xff1b; &#x1f427;②文…

Fizzler库+C#:从微博抓取热点的最简单方法

概述 在这篇技术文章中&#xff0c;我们将深入研究如何利用Fizzler库结合C#语言&#xff0c;以实现从微博平台抓取热点信息的功能。微博作为中国乃至全球范围内具有重要影响力的社交媒体平台之一&#xff0c;在互联网信息传播中扮演着举足轻重的角色。通过Fizzler这一强大的.N…

【探索Java编程:从入门到入狱】Day4

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收…

电费自动抄表是什么?什么叫电费自动抄表?

1.电费自动抄表&#xff1a;简述 电费自动抄表是一种现代化电力工程管理方法&#xff0c;根据远程系统收集解决电度表数据&#xff0c;取代了传统的人工抄水表方法。这项技术提高了效率&#xff0c;降低了不正确&#xff0c;并且为消费者和电力公司提供了更多服务项目概率。 …

基于51单片机ESP8266wifi控制机器人—送餐、快递

基于51单片机wifi控制机器人 &#xff08;程序&#xff0b;原理图&#xff0b;PCB&#xff0b;设计报告&#xff09; ​功能介绍 具体功能&#xff1a; 1.L298N驱动电机&#xff0c;机器人行走&#xff1b; 2.装备红外线感应检测到周围环境&#xff0c;进行行程判断&#xf…

Windows环境编译 VVenC 源码生成 Visual Studio 工程

VVenC介绍 Fraunhofer通用视频编码器(VVenC)的开发是为了提供一种公开可用的、快速和有效的VVC编码器实现。VVenC软件基于VTM&#xff0c;其优化包括软件重新设计以减轻性能瓶颈、广泛的SIMD优化、改进的编码器搜索算法和基本的多线程支持以利用并行。此外&#xff0c;VVenC支…

124.反转链表(力扣)

题目描述 代码解决&#xff08;思路1&#xff1a;双指针&#xff09; class Solution { public:ListNode* reverseList(ListNode* head) {ListNode*temp;//保存cur下一个节点ListNode*curhead;ListNode*preNULL;while(cur){tempcur->next;// 保存一下 cur的下一个节点&#…

uniapp 监听APP切换前台、后台插件 Ba-Lifecycle

监听APP切换前台、后台 Ba-Lifecycle 简介&#xff08;下载地址&#xff09; Ba-Lifecycle 是一款uniapp监听APP切换前台、后台的插件&#xff0c;简单易用。 截图展示 也可关注博客&#xff0c;实时更新最新插件&#xff1a; uniapp 常用原生插件大全 使用方法 在 script…

Spring事件

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Spring⛺️稳中求进&#xff0c;晒太阳 Spring事件 简洁 Spring Event&#xff08;Application Event&#xff09;就是一个观察者模式&#xff0c;一个bean处理完任务后希望通知其他Bean的…

数据交换和异步请求(JSONAjax))

目录 一.JSON介绍1.JSON的特点2.JSON的结构3.JSON的值JSON示例4.JSON与字符串对象转换5.注意事项 二.JSON在Java中的使用1.Javabean to json2.List to json3.Map to JSONTypeToken底层解析 三.Ajax介绍1.介绍2.Ajax经典应用场景 四.Ajax原理示意图1. 传统web应用2.Ajax方法 五.…

突然断电,瀚高数据库启动失败

服务器临时断电后&#xff0c;数据库启动不起来 ps -ef|grep postgres 进到数据库的data目录下看下ls 看下 查看临时文件&#xff1a; ls -la /tmp 把这两个5866的文件改个名字张老师 加个bak就行 改完了pg_ctl start起一下

618挑选家用洗地机,需要注意哪些事项?有哪些家用洗地机值得买?

近年来&#xff0c;智能清洁家电越来越受到消费者的欢迎&#xff0c;洗地机作为清洁家电的新宠&#xff0c;凭借其集扫地、拖地、杀菌清洗于一体的强大功能&#xff0c;成为市场上的热销产品。那么&#xff0c;这类洗地机真的好用吗&#xff1f;怎么挑选到好用的家用的洗地机呢…

风电厂数字孪生3D数据可视化交互展示构筑智慧化电厂管理体系

随着智慧电厂成为未来电力企业发展的必然趋势&#xff0c;深圳华锐视点紧跟时代步伐&#xff0c;引领技术革新&#xff0c;推出了能源3D可视化智慧管理系统。该系统以企业现有的数字化、信息化建设为基础&#xff0c;融合云平台、大数据、物联网、移动互联、机器人、VR虚拟现实…

BUUCTF [极客大挑战 2019]EasySQL 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; [极客大挑战 2019]EasySQL 1 密文&#xff1a; 解题思路&#xff1a; 1、根据题目提示&#xff0c;并且网站也存在输入框&#xff0c;尝试进行SQL注入。 首先&#xff0c;判断提交方式&#xff0c;随机输入数据…

EtherCAT开发_4_分布时钟知识点摘抄笔记1

分布时钟 (DC&#xff0c;Distributed Cl ock) 可以使所有EtherCAT设备使用相同的系统时间&#xff0c;从而控制各设备任务的同步执行。从站设备可以根据同步的系统时间产生同步信号&#xff0c;用于中断控制或触发数字量输入输出。支持分布式时钟的从站称为 DC 从站。分布时钟…