Debezium-MySqlConnectorTask

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

MySqlConnectorTask,用于读取MySQL的二进制日志并生成对应的数据变更事件

整体架构流程

技术名词解释

数据库模式(Database Schema)
数据库模式是指数据库中数据的组织结构和定义,它描述了数据库中所有对象(如表、视图、索引、存储过程等)的结构和关系。具体来说,数据库模式包括以下几个方面:
1  表结构:定义了数据库中各个表的名称、列的名称、数据类型、约束条件(如主键、外键、唯一性约束等)。
2  关系:描述了表与表之间的关系,如一对多、多对多等。
3  索引:定义了表上的索引,用于提高查询性能。
4  视图:定义了虚拟表,这些虚拟表基于SQL查询结果,可以简化复杂的查询操作。
5  存储过程和函数:定义了数据库中的存储过程和函数,用于执行特定的业务逻辑。
6  触发器:定义了在特定事件发生时自动执行的操作。

在 DatabaseHistory 接口中的应用
在 DatabaseHistory 接口中,数据库模式的变更记录和恢复功能主要用于以下场景:
    1  记录变更:当数据库模式发生变化时(如添加新表、修改表结构、删除表等),通过 record 方法记录这些变更。
    2  恢复:当需要恢复到某个历史点的数据库模式时,通过 recover 方法恢复到指定的历史状态。
通过这些功能,可以有效地管理和追踪数据库模式的变化,确保数据的一致性和完整性。

技术细节

@Override
    public void start(Map<String, String> props) {
        if (context == null) {
            throw new ConnectException("Unexpected null context");
        }

        // Validate the configuration ...
        final Configuration config = Configuration.from(props);
        if (!config.validate(MySqlConnectorConfig.ALL_FIELDS, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }

        // Create and configure the database history ...
        this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
        if (this.dbHistory == null) {
            throw new ConnectException("Unable to instantiate the database history class " +
                    config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
        }
        Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); // do not remove
                                                                                                                 // prefix
        this.dbHistory.configure(dbHistoryConfig); // validates
        this.dbHistory.start();
        this.running.set(true);

        // Read the configuration ...
        final String user = config.getString(MySqlConnectorConfig.USER);
        final String password = config.getString(MySqlConnectorConfig.PASSWORD);
        final String host = config.getString(MySqlConnectorConfig.HOSTNAME);
        final int port = config.getInteger(MySqlConnectorConfig.PORT);
        final String initialBinLogFilename = config.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);
        final long serverId = config.getLong(MySqlConnectorConfig.SERVER_ID);
        serverName = config.getString(MySqlConnectorConfig.SERVER_NAME.name(), host + ":" + port);
        final boolean keepAlive = config.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);
        final int maxQueueSize = config.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);
        final long timeoutInMilliseconds = config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
        final boolean includeSchemaChanges = config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
        final long pollIntervalMs = config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);
        maxBatchSize = config.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);
        metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);

        // Define the filter using the whitelists and blacklists for tables and database names ...
        Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
                                                        config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
                                                        config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
                                                        config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
        if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
            Predicate<TableId> isBuiltin = (id) -> {
                return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
            };
            tableFilter = tableFilter.and(isBuiltin.negate());
        }

        // Create the queue ...
        events = new LinkedBlockingDeque<>(maxQueueSize);
        batchEvents = new ArrayDeque<>(maxBatchSize);

        // Set up our handlers for specific kinds of events ...
        tables = new Tables();
        tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
        eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);
        eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
        eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
        eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
        eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);
        eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);

        // Set up the log reader ...
        client = new BinaryLogClient(host, port, user, password);
        client.setServerId(serverId);
        client.setKeepAlive(keepAlive);
        if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
        client.registerEventListener(this::enqueue);
        client.registerLifecycleListener(traceLifecycleListener());

        // Set up the event deserializer with additional types ...
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        client.setEventDeserializer(eventDeserializer);

        // Check if we've already processed some of the log for this database ...
        source.setServerName(serverName);
        // Get the offsets for our partition ...
        Map<String, ?> offsets = context.offsetStorageReader().offset(source.partition());
        if (offsets != null) {
            source.setOffset(offsets);
            // And set the client to start from that point ...
            client.setBinlogFilename(source.binlogFilename());
            client.setBinlogPosition(source.binlogPosition());
            // The event row number will be used when processing the first event ...
            logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}",
                        serverName, source.binlogFilename(), source.binlogPosition(), source.eventRowNumber());

            // We have to make our Tables reflect the state of the database at the above source partition (e.g., the location
            // in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the
            // TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements
            // to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string
            // of DDL. However, we should consume no further than offset we recovered above.
            try {
                logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);
                DdlParser ddlParser = new MySqlDdlParser();
                dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);
                tableConverters.loadTables();
                logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));
            } catch (Throwable t) {
                throw new ConnectException("Failure while recovering database schemas", t);
            }
        } else {
            // initializes this position, though it will be reset when we see the first event (should be a rotate event) ...
            client.setBinlogFilename(initialBinLogFilename);
            logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",
                        source.binlogFilename(), source.binlogPosition());
        }

        // Start the log reader, which starts background threads ...
        try {
            logger.debug("Connecting to MySQL server");
            client.connect(timeoutInMilliseconds);
            logger.debug("Successfully connected to MySQL server and beginning to read binlog");
        } catch (TimeoutException e) {
            double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);
            throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host
                    + ":" + port + " with user '" + user + "'", e);
        } catch (AuthenticationException e) {
            throw new ConnectException("Failed to authenticate to the MySQL database at " + host + ":" + port + " with user '" + user + "'",
                    e);
        } catch (Throwable e) {
            throw new ConnectException(
                    "Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);
        }
    }

 

  1. 验证配置:从传入的属性中创建配置对象并验证其有效性。
  2. 创建数据库历史记录:根据配置实例化 DatabaseHistory 对象并启动。
  3. 读取配置参数:从配置中读取各种必要的参数,如用户名、密码、主机、端口等。
  4. 定义表过滤器:根据白名单和黑名单定义表过滤器,忽略内置表。
  5. 创建队列:初始化事件队列和批处理队列。
  6. 设置事件处理器:为不同的事件类型设置处理器。
  7. 设置日志读取器:创建并配置 BinaryLogClient,注册事件监听器和生命周期监听器。
  8. 设置事件反序列化器:配置事件反序列化器以处理特定类型的事件。
  9. 恢复数据库状态:检查是否有已处理的日志,如果有则恢复数据库模式。
  10. 连接到 MySQL 服务器:尝试连接到 MySQL 服务器并开始读取二进制日志。

小结

/**
 * 该类负责配置和初始化MySQL连接器,包括设置数据库和表的过滤条件、创建事件队列、注册事件处理器、设置二进制日志客户端、恢复数据库模式等。
 * 主要功能包括:
 * - 应用数据库和表的黑白名单过滤条件。
 * - 配置是否忽略内置表。
 * - 创建事件队列和批处理事件队列。
 * - 注册不同类型的事件处理器。
 * - 初始化二进制日志客户端并设置相关参数。
 * - 检查并恢复已处理的日志位置。
 * - 连接到MySQL服务器并开始读取二进制日志。
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918923.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【WPF】Prism学习(二)

Prism Commands 1.命令&#xff08;Commanding&#xff09; 1.1. ViewModel的作用&#xff1a; ViewModel不仅提供在视图中显示或编辑的数据&#xff0c;还可能定义一个或多个用户可以执行的动作或操作。这些用户可以通过用户界面&#xff08;UI&#xff09;执行的动作或操作…

如何实现主备租户的无缝切换 | OceanBase应用实践

对于DBA而言&#xff0c;确保数据库的高可用性、容灾等能力是其日常工作中需要持续思考和关注的重要事项。一方面&#xff0c;可以利用数据库自身所具备的功能来实现这些目标&#xff1b;若数据库本身不提供相应功能&#xff0c;DBA则需寻找其他工具来增强数据库的高可用性和容…

壁仞科技上市前最后一波 校招 社招 内推

随着美国大选结束&#xff0c;国内GPU 产业得到空前的的发展空间&#xff0c;国内芯片相关股票一片飘红。 国内大型 GPU厂商壁仞科技&#xff0c;摩尔线程等正紧锣密鼓地加紧上市。 GPGPU 芯片赛道来到了史无前例的红利点&#xff0c;抓住机会&#x1f4aa; 壁仞科技正在火热…

前端监控之sourcemap精准定位和还原错误源码

一、概述 在前端开发中&#xff0c;监控和错误追踪是确保应用稳定性和用户体验的重要环节。 随着前端应用的复杂性增加&#xff0c;JavaScript错误监控变得尤为重要。在生产环境中&#xff0c;为了优化加载速度和性能&#xff0c;前端代码通常会被压缩和混淆。这虽然提升了性…

使用Web Push Notifications提升用户参与度和留存率

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用Web Push Notifications提升用户参与度和留存率 使用Web Push Notifications提升用户参与度和留存率 使用Web Push Notifica…

量化选股日常操作日记-11-ai眼镜-润欣科技

用 微信小程序 梦想兔企业智能风险分析助手 &#xff0c;选择AI眼镜板块&#xff0c;挖掘了几个合适的股&#xff0c;分析下来感觉 润欣科技 比较安全些适合观察&#xff0c;几块到十几块波动&#xff0c;企业基本面也没有特别大问题。就是现在价位在周期波动高位&#xff0c;下…

【WPF】Prism学习(五)

Prism Commands 1.错误处理&#xff08;Error Handling&#xff09; Prism 9 为所有的命令&#xff08;包含AsyncDelegateCommand&#xff09;提供了更好的错误处理。 避免用try/catch包装每一个方法根据不同遇到的异常类型来提供特定的逻辑处理可以在多个命令之间共享错误处…

Intern大模型训练营(八):Llamaindex RAG 实践

1. 基于 LlamaIndex 构建自己的 RAG 知识库 首先在Intern Studio中申请30% A100的开发机。 进入开发机后&#xff0c;创建新的conda环境&#xff0c;命名为 llamaindex&#xff0c;在命令行模式下运行&#xff1a; conda create -n llamaindex python3.10 复制完成后&#…

台式电脑没有声音怎么办?台式电脑没有声音解决详解

台式电脑一般来说都是没有内置扬声器的&#xff0c;需要连接耳机或者是音响才可以播放音乐。那么如果遇到台式电脑没有声音的问题&#xff0c;我们也需要确认这些设备硬件有没问题&#xff0c;知道原因才可以进行处理。下面本文将为你介绍台式电脑没有声音的可能原因和解决方法…

vue2项目中在线预览csv文件

简介 希望在项目中&#xff0c;在线预览.csv文件&#xff0c;本以为插件很多&#xff0c;结果都只是支持excel&#xff08;.xls、.xlsx&#xff09;一到.csv就歇菜。。。 关于文件预览 vue-office&#xff1a;文档、 查看在线演示demo&#xff0c;支持docx、.xlsx、pdf、ppt…

H.265流媒体播放器EasyPlayer.js视频流媒体播放器关于直播流播放完毕是否能监听到

EasyPlayer属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;无须安装任何插件&#xff0c;起播快、延迟低、兼容性强&#xff0c;使用非常便捷。 EasyPlayer.js播放器不仅支持H.264与H.265视频编码格式&#xff0c;也能支持WebS…

WordPress设置自动更新CSS版本号

WordPress 通常会在引用 CSS 文件时添加版本号参数&#xff08;?verx.x.x&#xff09;。如果版本号未更新&#xff0c;浏览器可能继续加载旧的文件。 解决方法&#xff1a;确保你在 functions.php 文件中正确加载了 CSS 文件&#xff0c;并动态更新版本号。例如在functions.p…

【Linux】监控系统Zabbix的安装与配置

文章目录 一、前期准备1、安装LAMP2、配置SELinux与防火墙3、测试Apache4、配置数据库5、创建zabbix数据库及应用 二、server端安装配置1、软件包安装2、配置数据库3、zabbix访问测试4、配置web界面 三、Agent端安装配置1、安装zabbix-agent2、配置3、启动zabbix-agent4、配置防…

Springboot基于GIS的旅游信息管理系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

HarmonyOs鸿蒙开发实战(17)=>沉浸式效果第二种方案一组件安全区方案

1.沉浸式效果的目的 开发应用沉浸式效果主要指通过调整状态栏、应用界面和导航条的显示效果来减少状态栏导航条等系统界面的突兀感&#xff0c;从而使用户获得最佳的UI体验。 2.组件安全区方案介绍 应用在默认情况下窗口背景绘制范围是全屏&#xff0c;但UI元素被限制在安全区内…

微知-DOCA ARGP参数模块的相关接口和用法(config单元、params单元,argp pipe line,回调)

文章目录 1. 背景2. 设置参数的主要流程2.1 初始化2.2 注册某个params的处理方式以及回调函数2.4 定义好前面的params以及init指定config地点后start处理argv 3. 其他4. DOCA ARGP包相关4.1 主要接口4.2 DOCA ARGP的2个rpm包4.2.1 doca-sdk-argp-2.9.0072-1.el8.x86_64.rpm4.2.…

Linux之vim模式下全选命令

在Linux系统中&#xff0c;使用Vim编辑器进行全选操作可以通过以下几种方式实现&#xff1a; 1.使用键盘快捷键 按下 ”ggVG”&#xff08;先按下”g”&#xff0c;再按下”g”&#xff0c;再按下”V”&#xff0c;最后按下”G”&#xff09;可以全选当前文件内容。其中 ”g…

SQL复杂数据类型处理

背景 数据处理中&#xff0c;经常碰到复杂数据类型&#xff0c;需要将他们进行解析才能利用。 复杂数据类型 1、MAP结构转为列 WITH tmp AS ( SELECT {"Users":{"4418":{"UserId":4418,"Score":0,"IsStudent":true},&q…

下一代以区域为导向的电子/电气架构

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 所有人的看法和评价都是暂时的&#xff0c;只有自己的经历是伴随一生的&#xff0c;几乎所有的担忧和畏惧…

CSS盒子的定位>(上篇)#定位属性#相对定位-附练习

一、定位属性 1.定位方式 position属性可以选择4种不同类型的定位方式。 语法格式&#xff1a;position&#xff1a;relation | absolute | fixed参数&#xff1a;①relative生成相对定位的元素&#xff0c;相对于其正常位置进行定位。 ②absolute生成绝对定位的…