SaaS 电商设计 (五) 私有化部署-实现 binlog 中间件适配

一、 背景

  具体的中间件私有化背景在上文 SaaS` 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysqlES 数据同步的过程中,常用的技术方案有这样几种.

二、 设计主体

2.1 N种方案

方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例)
在这里插入图片描述

如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入.

方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致
在这里插入图片描述

在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入.

方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制
在这里插入图片描述

  • 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点.

  • 业务开发的同时需要同步关心数据的同步
    在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的.

    • 散落在各个业务代码角落的维护成本
      方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升.
      基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离.

    方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.)

在这里插入图片描述

对于第三种方式来说的话,接下来引入了第二个讨论的点.

  • 私有化支持
    就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.
  • 同步目标逻辑的不同支持
    在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美.

2.2 方案4 coding落地

2.2.1 类图

在这里插入图片描述
核心步骤:

step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC .
step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现.
step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler.

2.2.2 核心实现

BinlogHandlerAdapter 完成 binlog client 接口定义.

package com.baixiu.middleware.binlog.adapter;

import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.mq.model.CommonMessage;

/**
 * binlog 适配器接口
 * 适配中间件list:canal,jingwei,drc等。
 * function1:完成不同中间件解析能力
 * function2:完成不同中间件handlerMsg能力
 * @author baixiu
 * @date 2023年12月11日
 */
public interface BinlogHandlerAdapter {


    /**
     * 反序列MQMsg To binlogMsg
     * @param mqMsg mqMsg
     * @return
     */
    BinlogData deserializationMQMsg(CommonMessage mqMsg);

    /**
     * 反序列MQMsg To binlogMsg
     * @param mqMsg mqMsg
     * @return
     */
    void handleBinLogData(BinlogData binLogData) throws Exception;


}

CanalBinlogHandlerAdapter 完成 canal 解析

package com.baixiu.middleware.binlog.adapter;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.baixiu.middleware.binlog.consts.CommonConsts;
import com.baixiu.middleware.binlog.core.AbstractBinlogHandler;
import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter;
import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum;
import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel;
import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import com.baixiu.middleware.mq.model.CommonMessage;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * canal binlog handler adapter
 * 当property配置的clientType=canal时进行注入bean
 * canal client 用以解析 mq -starter 发送过来的消费消息
 * @author baixiu
 * @date 创建时间 2023/12/11 8:39 PM
 */
@Slf4j
public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{

    @Autowired
    private BinlogTableHandlerRouter binlogTableHandlerRouter;

    @Override
    public BinlogData deserializationMQMsg(CommonMessage mqMsg) {
        FlatMessage flatMessage = JSON.parseObject(mqMsg.getText(),FlatMessage.class);

        BinlogData binLogData=new BinlogData ();
        if(flatMessage!=null){
            binLogData.setBinlogDataObject(flatMessage);
        }
        return binLogData;
    }

    @Override
    public void handleBinLogData(BinlogData binLogData) throws Exception {

        if(binLogData==null || binLogData.getBinlogDataObject()==null){
            return;
        }

        FlatMessage flatMessage= (FlatMessage) binLogData.getBinlogDataObject ();
        List<Map<String, String>> rowDatas = flatMessage.getData();
        List<Map<String, String>> oldDatas = flatMessage.getOld();
        String tableName = flatMessage.getTable();
        AbstractBinlogHandler handler = binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);


        for (int i = 0; i < rowDatas.size(); i++) {
            Map<String, String> rowData = rowDatas.get(i);
            Map<String, String> oldData = new HashMap<>(i,0.75f);
            if (oldDatas != null && oldDatas.size() == rowDatas.size()) {
                oldData = oldDatas.get(i);
            }
            Map<String, String> fieldsMaps = Maps.newHashMapWithExpectedSize(20);
            BinlogDataToDiffModel binlogDataToDiffModel = transRowDataToAllBinlogData(handler, rowData, oldData
                    , fieldsMaps, flatMessage.getType());

            switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {
                case INSERT:
                    log.info("Canal.handleMessage.binlogTransConfigToMap.INSERT.{}"
                            , JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());
                    break;
                case UPDATE:
                    log.info("Canal.handleMessage.binlogTransConfigToMap.UPDATE.{}"
                            ,JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());
                    break;
                case DELETE:
                    Map<String, String> delMap = getBeforeColumnsFromBinlogData(handler, oldData);
                    log.info("Canal.handleMessage.binlogTransConfigToMap.DELETE");
                    handler.delete(delMap);
                    break;
                default:
                    log.info("CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{}"
                            ,JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    break;
            }
        }
    }


    public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, Map<String, String> afterColumns
            , Map<String, String> beforeColumns, Map<String, String> fieldsMap, String type) {

        try {

            String[] updateFields = binlogData.getUpdateFields();
            String[] keyFields = binlogData.getFields();

            List<BinlogTableRowDiffModel> changeList = new ArrayList<> ();

            for (String key : afterColumns.keySet()) {

                if (keyFields.length == 1 && ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {
                    fieldsMap.put(key, afterColumns.get(key));
                } else if (ArrayUtils.contains(keyFields, key)) {
                    fieldsMap.put(key, afterColumns.get(key));
                }

                if (beforeColumns != null && !beforeColumns.isEmpty() && beforeColumns.get(key) != null) {
                    BinlogTableRowDiffModel bean = new BinlogTableRowDiffModel();
                    bean.setField(key);
                    bean.setAfter(afterColumns.get(key));
                    bean.setBefore(beforeColumns.get(key));
                    if (updateFields.length == 1 && ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {
                        changeList.add(bean);
                    } else if (ArrayUtils.contains(updateFields, key)) {
                        changeList.add(bean);
                    }
                }
            }
            BinlogDataToDiffModel data = new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));
            log.info("transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{}"
                    ,JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));
            return data;
        } catch (Exception e) {
            log.error("handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}", JSON.toJSONString(binlogData), e);
        }

        return null;
    }


    /**
     * 删除操作
     * 不同的表需要从binlogData中获取的信息不同,这里抽取
     *
     * @return
     */
    private Map<String, String> getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, Map<String, String> beforeColumns) {

        Map<String, String> keys = new HashMap<>();
        if (beforeColumns != null && !beforeColumns.isEmpty()) {
            String[] keyFields = binlogData.getFields();
            for (String key : beforeColumns.keySet()) {
                // 找出关心的字段值
                if (ArrayUtils.contains(keyFields, key)) {
                    keys.put(key, beforeColumns.get(key));
                }
            }
        }
        return keys;
    }
}

AbstractBinlogHandler 抽象binloghandler 处理类.

package com.baixiu.middleware.binlog.core;

import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import java.util.List;
import java.util.Map;

/**
 * @author baixiu
 * @date 创建时间 2023/12/12 11:31 AM
 */
public interface AbstractBinlogHandler {

    /**
     * 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中
     * @return 监控字段
     */
    String[] getFields();

    /**
     * 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中
     * @return 更新字段
     */
    String[] getUpdateFields();

    /**
     * 新增时触发
     * @param fieldValues 唯一字段,用于确定一条数据
     * @param changeList 字段的值发生变化的
     * @throws Exception 业务exception
     */
    void insert(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;

    /**
     * 数据修改时触发
     * @param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值
     * @param changeList  字段的值发生变化的
     * @throws Exception 业务exception
     */
    void update(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;

    /**
     * 删除时触发
     * @param fieldValues 唯一字段,用于确定一条数据
     * @throws Exception 业务exception
     */
    void delete(Map<String, String> fieldValues) throws Exception;

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/247744.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

17--异常处理

1、异常概述 1.1 什么是异常 异常&#xff1a;指的是程序在执行过程中&#xff0c;出现的非正常情况&#xff0c;如果不处理最终会导致JVM的非正常停止。 异常指的并不是语法错误和逻辑错误。语法错了&#xff0c;编译不通过&#xff0c;不会产生字节码文件&#xff0c;根本运…

01.前言

前言 1.什么是前端开发 前端开发是创建 Web 页面或 app 等前端界面呈现给用户的过程核心技术&#xff1a;HTML&#xff0c;CSS&#xff0c;JavaScript 以及衍生出的各种技术&#xff0c;框架等 2.前端开发应用场景 3.前端职业路线 4.什么是CS架构与BS架构 介绍 应用软件&a…

SuperMap iClient3D for Cesium 实现鼠标移动选中模型并显示模型对应字段

SuperMap iClient3D for cesium 实现鼠标移动选中模型并显示模型对应字段 一、实现思路二、数据制作1. 计算出模型中心点并保存到属性表中2. 计算出模型顶部高程3. 模型数据切缓存4. 发布三维服务. 三、代码编写 作者&#xff1a;xkf 一、实现思路 将模型属性数据存储到前端&a…

c++面经总结

C基础语法 C和c的区别 c中new和delete是对内存分配的运算符&#xff0c;取代了c中的malloc和free 标准c中的字符串类取代了标准c函数库头文件中的字符数组处理函数(c中没有字符串类型). 在c中&#xff0c;允许有相同的函数名&#xff0c;不过他们的参数类型不能完全相同&…

JAVA:深入探讨Java 8 Stream的强大功能与用法

1、简述 Java 8引入了Stream API&#xff0c;为处理集合数据提供了一种更为强大和灵活的方式。Stream是一种抽象的数据结构&#xff0c;它允许你以一种声明性的方式处理数据集合。与传统的集合操作不同&#xff0c;Stream并不是一个存储数据的数据结构&#xff0c;而是在源数据…

Leetcode—78.子集【中等】

2023每日刷题&#xff08;五十九&#xff09; Leetcode—78.子集 算法思想 实现代码 class Solution { public:vector<vector<int>> subsets(vector<int>& nums) {int len nums.size();vector<int> path;vector<vector<int>> ans;f…

20231215给AIO-3399J适配Rockchip的原始Andoroid10的挖掘机开发板01

20231215给AIO-3399J适配Rockchip的原始Andoroid10的挖掘机开发板01 2023/12/15 10:49 【请严重注意&#xff1a;】如果刷不适配的SDK&#xff0c;可能会引起您的开发板【硬件发生物理】损坏&#xff01; 如果您按照本步骤刷机引起的一切后果&#xff0c;请自行承担责任&#x…

状态的一致性和FlinkSQL

状态一致性 一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次&#xff0c;但是结果只有一个。 三个级别&#xff1a; 最多一次&#xff1a;1次或0次&#xff0c;有可能丢数据至少一次&#xff1a;1次或n次&#xff0c;出错可能会重试 输入端只要可以做到数据重…

vue3 添加编辑页使用 cron 表达式生成

示例效果图 1、添加组件 <template><div class"v3c"><ul class"v3c-tab"><li class"v3c-tab-item" :class"{ v3c-active: tabActive 1 }" click"onHandleTab(1)">秒</li><li class&qu…

【Linux系统编程二十一】:(进程通信3)--消息队列/信号量(system v标准的内核数据结构的设计模式)

【Linux系统编程二十】&#xff1a;消息队列/信号量(system v标准的内核数据结构的设计模式&#xff09; 一.消息队列二.system v标准的内核数据结构的设计三.四个概念(互斥/临界)四.信号量1.多线程并发访问2.计数器3.原子的4.总结 一.消息队列 一个叫做a进程啊&#xff0c;一个…

乐益达教育网页

目录 一、网页效果 二、html代码 三、CSS代码 四、JS代码 一、网页效果 二、html代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, in…

Milesight VPN server.js 任意文件读取漏洞(CVE-2023-23907)

0x01 产品简介 MilesightVPN 是一款软件&#xff0c;一个 Milesight 产品的 VPN 通道设置过程更加完善&#xff0c;并可通过网络服务器界面连接状态。 0x02 漏洞概述 MilesightVPN server.js接口处存在文件读取漏洞&#xff0c;攻击者可通过该漏洞读取系统重要文件&#xff…

LeetCode-42. 接雨水【栈 数组 双指针 动态规划 单调栈】

LeetCode-42. 接雨水【栈 数组 双指针 动态规划 单调栈】 题目描述&#xff1a;解题思路一&#xff1a;单调栈&#xff0c;维护一个单调递减栈。每当遇到当前元素大于栈顶元素就出栈&#xff0c;在出栈时更新答案。当遇到出栈的情况&#xff0c;若单调栈栈左边有一个元素则必有…

Java医院信息化建设云HIS系统源码

云HIS提供标准化、信息化、可共享的医疗信息管理系统&#xff0c;实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程&#xff0c;提升患者满意度、基层首诊率&#xff0c;通过信息共享、辅助诊疗等手段&#xff0c;提高基层医生的服务能力构建和…

Ubuntu20.04 下编译安装 ffmpeg 和 ffplay

Ubuntu20.04 下编译安装 ffmpeg 和 ffplay 一、下载源码包二、安装依赖库三、编译四、添加环境变量五、验证是否成功六、问题 一、下载源码包 1.1 官方下载链接&#xff1a;http://ffmpeg.org/download.html 最新版本为6.1&#xff0c;点击 Download Source Code下载即可 &…

【深度学习】强化学习(二)马尔可夫决策过程

文章目录 一、强化学习问题1、交互的对象2、强化学习的基本要素3、策略&#xff08;Policy&#xff09;4、马尔可夫决策过程1. 基本元素2. 交互过程的表示3. 马尔可夫过程&#xff08;Markov Process&#xff09;4. 马尔可夫决策过程&#xff08;MDP&#xff09;5. 轨迹的概率计…

监控pod 容器外网请求网络带宽,过滤掉内网、基于k8spacket开发、prometheus开发export

首先安装k8spacket 安装k8spacket遇到问题&#xff0c;下载插件一直能不能下载成功&#xff0c;pod不能启动。所有手动下载处理。 helm repo add k8spacket https://k8spacket.github.io/k8spacket-helm-chart helm pull k8spacket/k8spacket打开values.yaml 文件 手动下载插…

Axure元件库的介绍以及个人简介和登录界面案例展示

目录 一. 元件介绍 二. 基本元件的使用 2.1 形状元件 2.2 图片元件 2.3 占位符 2.4 文本 2.5 线段元件 2.6 热区文件 三. 表单元件的使用 3.1 文本框 3.2 文本域 3.3 下拉列表 3.4 列表框 3.5 复选框 3.6 单选按钮 四. 菜单与表格元件的使用 4.1 树 4.2 表格…

【CSS】用 CSS 写一个渐变色边框的输入框

Using_CSS_gradients MDN 多渐变色输入框&#xff0c;群友问了下&#xff0c;就试着写了下&#xff0c;看了看 css 渐变色 MDN 文档&#xff0c;其实很简单&#xff0c;代码记录下&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta ch…

2024美赛备战-美赛必备技能(matlab 和SPSS入门必备)

( 一 )Matlab 1.数值计算和符号计算功能 Matlab 以矩阵作为数据操作的基本单位&#xff0c;它的指令表达式与数学、工程中 常用的符号、表达式十分相似&#xff0c;故用Matlab 来解算问题要比用C、FORTRAN 等 语 言完成相同的事情简捷得多&#xff0c;使学者易于学习和掌握…