Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:
  kafka:
      # Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}
public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {
    //日志记录
    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
    //redis操作工具类
    @Resource
    private RedisClient redisClient;
    //监听的队列名称为:canaltopic
    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
        //转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.getIsDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/164096.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Bootloader——预编程流程

预编程目录 前言一、预编程步骤1.1 切换到扩展会话1.2 检查刷写前提条件整车ECU进入扩展会话(补充)1.3 停用故障码存储功能1.4 停止通信(一般报文或网络管理报文)前言 刷写过程定义了刷写前、刷写中、刷写后三个阶段。 一、预编程步骤 预编程步骤用来做刷写前的CAN网络准…

五个必知的速率限制策略,以最大化流量流动

速率限制是一种策略&#xff0c;我们在工作中常常使用&#xff0c;它定义了系统在设定的时间框架内可以处理的最大请求数量。 速率限制定义了系统在指定时间段内可以处理的最大请求数量。 Image.png 速率限制是一种策略&#xff0c;我们在工作中常常使用&#xff0c;它定义了…

我终于体会到了:代码竟然不可以运行,为什么呢?代码竟然可以运行,为什么呢?

废话不多说&#xff0c;直接上图 初看只当是段子&#xff0c;再看已是段中人 事情经过&#xff1a; 我在写动态顺序表的尾插函数时&#xff0c;写出了如下代码&#xff0c;可以跑&#xff0c;但是这段代码有一个bug暂时先不提 //动态顺序表的尾插 void SLPushBack(SL* psl, …

【C++历练之路】list的重要接口||底层逻辑的三个封装以及模拟实现

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; &#x1f354;前言&#xff1a; 在C的世界中&#xff0c;有一种数据结构&#xff0c;它不仅像一个神奇的瑰宝匣&#xff0c;还像一位能够在数据的海洋中航行的智慧舵手。这就是C中的list&#xff0c;一个引人入胜的工具…

数据结构【DS】树的性质

度为m的树 m叉树 至少有一个节点的度m 允许所有节点的度都<m 一定是非空树&#xff0c;至少有m1个节点 可以是空树 节点数 总度数 1m叉树&#xff1a; 高度为h的m叉树 节点数最少为&#xff1a;h具有n个结点的m叉树 最大高度&#xff1a;n度为m的树&#xff1a; 具有…

shell脚本学习笔记07

如何让shell实现 可选择性执行 的功能 用了while进行循环&#xff0c;是死循环&#xff0c;在循环时&#xff0c;使用case进行使用哪个脚本进行执行。使用clear进行每一次操作前的清屏&#xff0c;eof代表输入这个会显示目录。read用来读取输入的值&#xff0c;如果不输入值不会…

ScalableMap

问题引入 传统方案在处理线性地图元素时忽略了其结构性约束&#xff0c;建图距离太近 方法 简介 结构引导BEV特征提取 一种新的层次稀疏地图表示方法 设计渐进解码机制和基于此表示的监督策略 组件 结构引导BEV表征 通过车载摄像头捕捉的环绕视图图像&#xff0c;利用Res…

保险保险保险保险保险QAQ

该买保险啦&#xff01; 一、百万医疗险&#xff1a;事后报销医疗费用1、蓝医保 太平洋保险2、长相安 平安健康3、金医保 人寿保险4、好医保 人保健康 二、重疾险&#xff1a;确诊后一次性给付1、达尔文7号 国联人寿保险公司2、超级玛丽9号 君龙人寿3、守卫者6号 国联人寿保险公…

​软考-高级-系统架构设计师教程(清华第2版)【第7章 系统架构设计基础知识(263~285)-思维导图】​

软考-高级-系统架构设计师教程&#xff08;清华第2版&#xff09;【第7章 系统架构设计基础知识&#xff08;263~285&#xff09;-思维导图】 课本里章节里所有蓝色字体的思维导图

全链路监控--pinpoint

一、pinpoint架构原理 架构组成 Pinpoint Agent:和自己运行的应用关联起来的探针 Pinpoint Collector:收集各种性能数据 Pinpoint-Web: 将收集到的数据显成为 WEB网页显示 HBase Storage: 存储收集到的数据 工作原理 pinpoint的核心思想是在各个服务节点之间彼此调用时&a…

预定义宏指令

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h>int main() {printf("%s\n", __FILE__);printf("%d\n", __LINE__);printf("%s\n", __DATE__);printf("%s\n", __TIME__);return 0; } 运行结果&#xff1a;

记录将excel表无变形的弄进word里面来

之前关于这个问题记录过一篇文章&#xff1a; 将excel中的表快速复制粘贴进word中且不变形-CSDN博客 今天记录另外一种方法&#xff1a;举例表述&#xff0c;excel表如图&#xff1a; 按F12&#xff0c;出现“另存为...”对话框&#xff0c;选择“单个文件网页”&#xff0c;…

Java(二)(String的常见方法,ArrayList的常见方法)

String 创建string对象 package Helloworld;public class dome1 {public static void main(String[] args) {// 1.直接双引号得到字符串对象,封装字符串对象String name "lihao";System.out.println(name);// 2. new String 创建字符串对象,并调用构造器初始化字符…

汇编-指针

一个变量如果包含的是另一个变量的地址&#xff0c; 则该变量就称为指针(pointer) 。指针是操作数组和数据结构的极好工具&#xff0c;因为它包含的地址在运行时是可以修改的。 .data arrayB byte 10h, 20h, 30h, 40h ptrB dword arrayB ptrB1 dword OFFSET arrayBarray…

Linux中系统时间同步

在Windwos中&#xff0c;系统时间的设置很简单&#xff0c;界面操作&#xff0c;通俗易懂&#xff0c;而且设置后&#xff0c;重启&#xff0c;关机都没关系。系统时间会自动保存在BIOS时钟里面&#xff0c;启动计算机的时候&#xff0c;系统会自动在BIOS里面取硬件时间&#x…

庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

文章目录 PreOverview状态变量概述Position 访问方法 Pre 庖丁解牛&#xff1a;NIO核心概念与机制详解 01 接下来我们来看下缓冲区内部细节 Overview 接下来将介绍 NIO 中两个重要的缓冲区组件&#xff1a;状态变量和访问方法 (accessor) 状态变量是"内部统计机制&quo…

「Verilog学习笔记」根据状态转移表实现时序电路

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 分析 可得逻辑表达式为 可得逻辑表达式为 timescale 1ns/1nsmodule seq_circuit(input A ,input clk ,input rst_n,outpu…

小美的排列构造

美团2024届秋招笔试第一场编程真题 贪心问题&#xff0c;得到所有n全排列中相邻两数的和&#xff0c;这些和差距要尽可能小。 显然如果1和2排一起&#xff0c;或者让n和n-1相邻都是错误的。最好的方式是让相邻两数的和接近&#xff08;n1&#xff09;/2。 比如:n 1 n-1 2...…

分组表,分桶表

1&#xff0c;启动Hive服务 &#xff08;1&#xff09;启动HiveServer2服务 nohup hive --service metastore &&#xff08;2&#xff09;启动Metastore服务 nohup hive --service hiveserver2 &&#xff08;3&#xff09;查看进程信息 lsof -i:100002&#xff0c;…

生物信息基础:实用Git命令,掌握这些就够了

我发现有搞了几年生信的朋友还不会用Github管理代码&#xff0c;这不免令人意外。我一直强调基础知识的重要性&#xff0c;而这些知识又是可以在短时间内掌握的。Github管理平时写的代码&#xff0c;要用到Git命令。虽然官方Git命令非常多&#xff0c;但我们只要掌握常用的几个…