Debezium-BinaryLogClient

文章目录

    • 概要
    • 核心流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

BinaryLogClient类,用于连接和监听 MySQL 服务器的二进制日志(binlog)

核心流程

技术名词解释

### GTID (Global Transaction Identifier) 理解

#### 定义
GTID(Global Transaction Identifier)是 MySQL 从 5.6 版本开始引入的一种全局事务标识符。每个 GTID 在整个 MySQL 集群中都是唯一的,用于唯一标识一个事务。

#### 格式
GTID 的格式通常为 `source_id:transaction_id`,其中:
- **source_id**:表示生成事务的 MySQL 实例的唯一标识符,通常是实例的 `server_id`。
- **transaction_id**:表示在该实例上执行的事务的顺序号。

例如,`3E11FA47-71CA-11E1-9E33-C80AA9429562:23` 表示在 `server_id` 为 `3E11FA47-71CA-11E1-9E33-C80AA9429562` 的 MySQL 实例上执行的第 23 个事务。

#### 主要用途
1. **事务跟踪**:
   - GTID 可以帮助跟踪事务在主从复制中的传播情况。每个事务在主库上生成后,会被分配一个唯一的 GTID,并在从库上应用时保留相同的 GTID。

2. **简化复制管理**:
   - 使用 GTID 可以简化复制配置和管理。例如,可以通过指定 GTID 范围来同步特定的事务,而不需要手动管理二进制日志文件和位置。

3. **故障恢复**:
   - 在主从切换或故障恢复时,GTID 可以确保从库不会重复应用同一个事务,从而避免数据不一致的问题。

4. **并行复制**:
   - GTID 支持并行复制,即多个线程可以同时应用不同的事务,提高复制性能。

#### 配置
启用 GTID 复制需要在 MySQL 配置文件中设置以下参数:
- `gtid_mode=ON`:启用 GTID 模式。
- `enforce_gtid_consistency=ON`:强制 GTID 一致性,确保所有事务都可以被正确地跟踪和应用。

#### 总结
GTID 是 MySQL 中用于唯一标识事务的全局标识符,有助于简化复制管理和故障恢复。通过 GTID,可以更方便地跟踪和管理事务在主从复制中的传播,确保数据的一致性和可靠性。

技术细节

/**
 * 建立与MySQL服务器的连接并初始化二进制日志复制所需设置。
 * 
 * 该方法首先检查当前客户端是否已连接,如果已连接,则抛出IllegalStateException。
 * 如果未连接,它将尝试使用指定的主机名和端口连接到MySQL服务器。
 * 连接成功后,接收服务器的欢迎包,并进行身份验证。
 * 如果未指定二进制日志文件名,则获取二进制日志文件名和位置。
 * 检查并确认支持的校验和类型,请求二进制日志流。
 * 最后,通知生命周期监听器连接成功,并启动保持活动线程(如果启用)。
 * 监听事件包。
 */
public void connect() throws IOException {
    if (this.connected) {
        throw new IllegalStateException("BinaryLogClient 已经连接");
    } else {
        GreetingPacket greetingPacket;
        IOException e;
        try {
            try {
                // 创建Socket连接
                Socket socket = this.socketFactory != null ? this.socketFactory.createSocket() : new Socket();
                socket.connect(new InetSocketAddress(this.hostname, this.port));
                this.channel = new PacketChannel(socket);
                if (this.channel.getInputStream().peek() == -1) {
                    throw new EOFException();
                }
            } catch (IOException var7) {
                e = var7;
                throw new IOException("连接到 MySQL " + this.hostname + ":" + this.port + " 失败。请确保其正在运行。", e);
            }

            // 接收欢迎包并进行身份验证
            greetingPacket = this.receiveGreeting();
            this.authenticate(greetingPacket.getScramble(), greetingPacket.getServerCollation());

            // 获取二进制日志文件名和位置
            if (this.binlogFilename == null) {
                this.fetchBinlogFilenameAndPosition();
            }

            // 调整二进制日志位置
            if (this.binlogPosition < 4L) {
                if (this.logger.isLoggable(Level.WARNING)) {
                    this.logger.warning("二进制日志位置从 " + this.binlogPosition + " 调整为 " + 4);
                }
                this.binlogPosition = 4L;
            }

            // 获取并确认支持的校验和类型
            ChecksumType checksumType = this.fetchBinlogChecksum();
            if (checksumType != ChecksumType.NONE) {
                this.confirmSupportOfChecksum(checksumType);
            }

            // 请求二进制日志流
            this.requestBinaryLogStream();
        } catch (IOException var10) {
            e = var10;
            if (this.channel != null && this.channel.isOpen()) {
                this.channel.close();
            }
            throw e;
        }

        // 设置连接状态并记录日志
        this.connected = true;
        if (this.logger.isLoggable(Level.INFO)) {
            this.logger.info("连接到 " + this.hostname + ":" + this.port + " at " + this.binlogFilename + "/" + this.binlogPosition + " (sid:" + this.serverId + ", cid:" + greetingPacket.getThreadId() + ")");
        }

        // 通知生命周期监听器连接成功
        synchronized(this.lifecycleListeners) {
            Iterator i$ = this.lifecycleListeners.iterator();
            while(i$.hasNext()) {
                LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
                lifecycleListener.onConnect(this);
            }
        }

        // 启动保持活动线程(如果启用)
        if (this.keepAlive && !this.isKeepAliveThreadRunning()) {
            this.spawnKeepAliveThread();
        }

        // 确保事件数据反序列化器
        this.ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
        synchronized(this.gtidSetAccessLock) {
            if (this.gtidSet != null) {
                this.ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
            }
        }

        // 监听事件包
        this.listenForEventPackets();
    }
}
/**
 * 监听事件数据包的方法
 * 该方法用于持续监听来自服务器的事件数据包,并进行相应的处理
 * 当检测到数据包时,会根据数据包的类型进行处理,包括错误处理和事件处理
 * 如果连接断开或者出现异常,将停止监听并进行相应的异常处理
 * 
 * @throws IOException 如果在读取数据流时发生I/O错误
 */
private void listenForEventPackets() throws IOException {
    // 获取输入流,用于读取服务器发送的数据
    ByteArrayInputStream inputStream = this.channel.getInputStream();

    // 无限循环,持续监听事件数据包
    label202:
    while(true) {
        try {
            // 检查输入流是否有数据可读
            if (inputStream.peek() != -1) {
                // 读取数据包长度
                int packetLength = inputStream.readInteger(3);
                // 跳过1字节的填充
                inputStream.skip(1L);
                // 读取标记字节,用于判断数据包类型
                int marker = inputStream.read();
                // 如果标记为255,表示接收到的是错误数据包
                if (marker == 255) {
                    // 解析错误数据包并抛出异常
                    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
                    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
                }

                // 解析事件数据包
                Event event;
                try {
                    // 根据数据包长度决定是否需要分块读取
                    event = this.eventDeserializer.nextEvent(packetLength == 16777215 ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
                } catch (Exception var20) {
                    // 处理解析异常
                    Exception e = var20;
                    Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
                    // 根据异常类型进行不同处理
                    if (!(cause instanceof EOFException) && !(cause instanceof SocketException)) {
                        if (!this.isConnected()) {
                            continue;
                        }

                        // 通知生命周期监听器解包失败
                        synchronized(this.lifecycleListeners) {
                            Iterator i$ = this.lifecycleListeners.iterator();

                            while(true) {
                                if (!i$.hasNext()) {
                                    continue label202;
                                }

                                LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
                                lifecycleListener.onEventDeserializationFailure(this, e);
                            }
                        }
                    }

                    throw e;
                }

                // 处理解析成功的事件
                if (this.isConnected()) {
                    this.notifyEventListeners(event);
                    this.updateClientBinlogFilenameAndPosition(event);
                    this.updateGtidSet(event);
                }
                continue;
            }
        } catch (Exception var21) {
            // 处理通信异常
            Exception e = var21;
            if (this.isConnected()) {
                synchronized(this.lifecycleListeners) {
                    Iterator i$ = this.lifecycleListeners.iterator();

                    while(i$.hasNext()) {
                        LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
                        lifecycleListener.onCommunicationFailure(this, e);
                    }
                }
            }
        } finally {
            // 确保在结束监听时断开连接
            if (this.isConnected()) {
                this.disconnectChannel();
            }

        }

        // 结束方法
        return;
    }
}
/**
 * 通知事件监听器
 * 当有事件发生时,此方法会被调用以通知所有注册的事件监听器
 * 如果事件的数据是EventDataWrapper类型,则会用外部事件数据替换事件数据
 * 
 * @param event 发生的事件,用于通知监听器
 */
private void notifyEventListeners(Event event) {
    // 检查事件数据是否为EventDataWrapper类型,如果是,则用外部事件数据替换事件数据
    if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
        event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper)event.getData()).getExternal());
    }

    // 同步eventListeners以确保线程安全
    synchronized(this.eventListeners) {
        // 遍历所有事件监听器
        Iterator i$ = this.eventListeners.iterator();

        while(i$.hasNext()) {
            EventListener eventListener = (EventListener)i$.next();

            try {
                // 通知事件监听器处理事件
                eventListener.onEvent(event);
            } catch (Exception var7) {
                Exception e = var7;
                // 如果日志级别为WARNING,记录异常信息
                if (this.logger.isLoggable(Level.WARNING)) {
                    this.logger.log(Level.WARNING, eventListener + " choked on " + event, e);
                }
            }
        }

    }
}
/**
 * 将事件添加到队列中以进行后续批量处理。
 * 
 * @param event 从二进制日志中读取的事件
 */
protected void enqueue(Event event) {
    // 检查事件是否为空,避免空指针异常
    if (event != null) {
        try {
            // 将事件放入队列中
            events.put(event);
        } catch (InterruptedException e) {
            // 处理中断异常,恢复中断状态并抛出连接异常
            Thread.interrupted();
            throw new ConnectException("在等待将事件添加到队列时被中断", e);
        }
    }
}
/**
 * 覆盖 poll 方法以从 MySQL 服务器获取并处理事件。
 * 该方法会持续轮询事件,处理这些事件,并返回处理后的记录列表。
 *
 * @return 处理后的 SourceRecord 列表
 */
@Override
public List<SourceRecord> poll() throws InterruptedException {
    logger.trace("从 MySQL 服务器 '{}' 轮询事件", serverName);
    while (running.get() && (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty())) {
        // 没有事件需要处理,因此暂停一段时间 ...
        metronome.pause();
    }
    logger.trace("准备从 MySQL 服务器 '{}' 处理 {} 个事件", events.size(), serverName);

    // 至少有一些记录需要处理 ...
    List<SourceRecord> records = new ArrayList<>(batchEvents.size());
    while (!batchEvents.isEmpty()) {
        Event event = batchEvents.poll();
        if (event == null) continue;

        // 更新源偏移信息 ...
        EventHeader eventHeader = event.getHeader();
        EventType eventType = eventHeader.getEventType();
        if (eventType == EventType.ROTATE) {
            EventData eventData = event.getData();
            RotateEventData rotateEventData;
            if (eventData instanceof EventDeserializer.EventDataWrapper) {
                rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
            } else {
                rotateEventData = (RotateEventData) eventData;
            }
            source.setBinlogFilename(rotateEventData.getBinlogFilename());
            source.setBinlogPosition(rotateEventData.getBinlogPosition());
            source.setRowInEvent(0);
        } else if (eventHeader instanceof EventHeaderV4) {
            EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
            long nextBinlogPosition = trackableEventHeader.getNextPosition();
            if (nextBinlogPosition > 0) {
                source.setBinlogPosition(nextBinlogPosition);
                source.setRowInEvent(0);
            }
        }
        
        if (!running.get()) break;

        // 如果有处理此事件的处理器,将事件转发给它 ...
        EventHandler handler = eventHandlers.get(eventType);
        if (handler != null) {
            handler.handle(event, source, records::add);
        }
    }
    logger.trace("完成从 MySQL 服务器 '{}' 处理 {} 个事件", serverName);

    if (!this.running.get()) {
        // 应该停止,因此返回已经处理的记录,以防止在 DB 历史已停止的情况下持久化记录 ...
        return null;
    }

    // 已经处理完所有事件,清空批处理队列并返回记录 ...
    assert batchEvents.isEmpty();
    return records;
}

小结

### Debezium 监听和处理 Binlog 事件的简要总结

1. **轮询事件**:
   - `poll` 方法通过 `logger.trace` 记录开始从 MySQL 服务器轮询事件。
   - 使用 `while` 循环不断检查是否有新的事件需要处理。如果没有事件,调用 `metronome.pause()` 暂停一段时间。

2. **准备事件**:
   - 当有事件可用时,记录准备处理的事件数量。
   - 创建一个 `List<SourceRecord>` 来存储处理后的记录。

3. **处理事件**:
   - 从 `batchEvents` 队列中取出事件并进行处理。
   - 根据事件类型更新源偏移信息:
     - 对于 `ROTATE` 事件,更新二进制日志文件名和位置。
     - 对于其他类型的事件,更新二进制日志位置。
   - 如果有相应的事件处理器,调用 `handler.handle` 方法处理事件并将结果添加到 `records` 列表中。

4. **停止处理**:
   - 如果 `running` 标志为 `false`,表示应该停止处理,返回 `null` 以防止在 DB 历史已停止的情况下持久化记录。

5. **返回结果**:
   - 清空 `batchEvents` 队列,确保所有事件都已处理完毕。
   - 返回处理后的 `records` 列表。

### 关键步骤总结
- **轮询和等待**:通过循环和暂停机制等待新事件。
- **事件处理**:根据事件类型更新偏移信息,并调用相应的处理器处理事件。
- **停止机制**:在需要停止时返回 `null`,避免不必要的记录持久化。
- **结果返回**:清空批处理队列并返回处理后的记录列表。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/919876.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式linux中QT信号与槽基本操作与实现

大家好,今天主要给大家分享一下,如何使用linux系统上的QT进行界面开发与实现。 第一:QT的信号与槽基本简介 在操作QT的时候,可以使用里面的信号与槽。所谓信号就是一个对象发出的信号,槽就是当这个对象发出这个信号时,对应连接的槽就发被执行或者触发。 进行信号与槽的连…

03 —— Webpack 自动生成 html 文件

HtmlWebpackPlugin | webpack 中文文档 | webpack中文文档 | webpack中文网 安装 npm install --save-dev html-webpack-plugin 下载html-webpack-plugin本地软件包 npm i html-webpack-plugin --save-dev 配置webpack.config.js让webpack拥有插件功能 const HtmlWebpack…

大模型时代的具身智能系列专题(十二)

Robert Platt(波士顿动力) Robert Platt是美国东北大学Helping Hands机器人实验室主任、计算机科学教授。在加入东北大学之前&#xff0c;Platt 曾是麻省理工学院的研究科学家和美国宇航局的机器人工程师。platt博士毕业于马萨诸塞大学阿默斯特分校计算机科学专业。Platt 的工…

【软件测试】设计测试用例的万能公式

文章目录 概念设计测试用例的万能公式常规思考逆向思维发散性思维万能公式水杯测试弱网测试如何进行弱网测试 安装卸载测试 概念 什么是测试用例&#xff1f; 测试⽤例&#xff08;Test Case&#xff09;是为了实施测试⽽向被测试的系统提供的⼀组集合&#xff0c;这组集合包…

uni-app Vue3语法实现微信小程序样式穿透uview-plus框架

1 问题描述 我在用 uni-app vue3 语法开发微信小程序时&#xff0c;在项目中使用了 uview-plus 这一开源 UI 框架。在使用 up-text 组件时&#xff0c;想要给它添加一些样式&#xff0c;之前了解到微信小程序存在样式隔离的问题&#xff0c;也在uview-plus官网-注意事项中找到…

C++(Qt)软件调试---内存分析工具Heob(26)

C(Qt)软件调试—内存分析工具Heob&#xff08;26&#xff09; 文章目录 C(Qt)软件调试---内存分析工具Heob&#xff08;26&#xff09;[toc]1、概述&#x1f41c;2、环境配置&#x1fab2;3、功能说明4、使用Heob分析qt 程序内存泄漏&#x1f9a7;5、使用Heob检测qt 程序野指针…

uni-app快速入门(八)--常用内置组件(上)

uni-app提供了一套基础组件&#xff0c;类似HTML里的标签元素&#xff0c;不推荐在uni-app中使用使用div等HTML标签。在uni-app中&#xff0c;对应<div>的标签是view&#xff0c;对应<span>的是text&#xff0c;对应<a>的是navigator&#xff0c;常用uni-app…

静态时序分析--时序约束

目录 1.时钟约束1.1创建时钟1.2.生成时钟1.3虚拟时钟1.4 最小时钟脉宽 2.I/O延时约束2.1设置输入延时2.2设置输出延时 3.I/O环境建模约束3.1输入驱动建模3.2输出负载建模 4.时序例外4.1多周期路径设置&#xff08;multicycle path&#xff09;4.2伪路径设置&#xff08;false_p…

解决IntelliJ IDEA的Plugins无法访问Marketplace去下载插件

勾选Auto-detect proxy setting并填入 https://plugins.jetbrains.com 代理URL&#xff0c;可以先做检查连接&#xff1a;

自存 sql常见语句和实际应用

关于连表 查询两个表 SELECT * FROM study_article JOIN study_article_review 查询的就是两个表相乘&#xff0c;结果为两个表的笛卡尔积 相这样 这种并不是我们想要的结果 通常会添加一些查询条件 SELECT * FROM study_articleJOIN study_article_review ON study_art…

目录背景缺少vscode右键打开选项

目录背景缺少vscode右键打开选项 1.打开右键管理 下载地址&#xff1a;https://wwyz.lanzoul.com/iZy9G2fl28uj 2.开始搜索框搜索vscode&#xff0c; 找到其源目录 3.目录背景里面&#xff0c; 加入vscode.exe 3.然后在目录背景下&#xff0c; 右键&#xff0c; code就可以打…

应用于各种小家电的快充协议芯片

前言 随着快充技术的广泛应用&#xff0c;以往小家电的慢充模式已经满足不了人们对充电速度的要求&#xff0c;因此商家纷纷对小家电应用了诱骗取电快充协议芯片 例如&#xff08;XSP16H)&#xff0c;有了快充的支持小家电的充电速度有了很大的提升&#xff0c;节省了很多的充电…

Java基础知识(五)

文章目录 ObjectObject 类的常见方法有哪些&#xff1f; 和 equals() 的区别hashCode() 有什么用&#xff1f;为什么要有 hashCode&#xff1f;为什么重写 equals() 时必须重写 hashCode() 方法&#xff1f; 参考链接 Object Object 类的常见方法有哪些&#xff1f; Object 类…

【spring 】Spring Cloud Gateway 的Filter学习

介绍和使用场景 Spring Cloud Gateway 是一个基于 Spring Framework 5 和 Project Reactor 的 API 网关&#xff0c;它旨在为微服务架构提供一种简单而有效的方式来处理请求路由、过滤、限流等功能。在 Spring Cloud Gateway 中&#xff0c;Filter 扮演着非常重要的角色&#…

[Docker#11] 容器编排 | .yml | up | 实验: 部署WordPress

目录 1. 什么是 Docker Compose 生活案例 2. 为什么要使用 Docker Compose Docker Compose 的安装 Docker Compose 的功能 使用步骤 核心功能 Docker Compose 使用场景 Docker Compose 文件&#xff08;docker-compose.yml&#xff09; 模仿示例 文件基本结构及常见…

学习虚幻C++开发日志——委托(持续更新中)

委托 官方文档&#xff1a;Delegates and Lamba Functions in Unreal Engine | 虚幻引擎 5.5 文档 | Epic Developer Community | Epic Developer Community 简单地说&#xff0c;委托就像是一个“函数指针”&#xff0c;但它更加安全和灵活。它允许程序在运行时动态地调用不…

【Linux】基础02

Linux编译和调试 VI编辑文件 vi : 进入文件编辑 是命令行模式 i &#xff1a;从光标处进入插入模式 dd : 删除光标所在行 n dd 删除指定行数 Esc &#xff1a; 退出插入模式 &#xff1a; 冒号进入末行模式 :wq : 保存退出 :q &#xff1a; 未修改文件可以退出 :q! …

前端:JavaScript (学习笔记)【1】

目录​​​​​​​ 一&#xff0c;介绍JavaScript 二&#xff0c;JavaScript的特点 1&#xff0c;脚本语言 2&#xff0c;基于对象的语言 3&#xff0c;事件驱动 4&#xff0c;简单性 5&#xff0c;安全性 6&#xff0c;跨平台性 7&#xff0c;JS 和java的区别 &…

安卓手机root+magisk安装证书+抓取https请求

先讲一下有这篇文章的背景吧&#xff0c;在使用安卓手机fiddler抓包时&#xff0c;即使信任了证书&#xff0c;并且手机也安装了证书&#xff0c;但是还是无法捕获https请求的问题&#xff0c;最开始不知道原因&#xff0c;后来慢慢了解到现在有的app为了防止抓包&#xff0c;把…

数字化那点事:一文读懂物联网

一、物联网是什么&#xff1f; 物联网&#xff08;Internet of Things&#xff0c;简称IoT&#xff09;是指通过网络将各种物理设备连接起来&#xff0c;使它们可以互相通信并进行数据交换的技术系统。通过在物理对象中嵌入传感器、处理器、通信模块等硬件&#xff0c;IoT将“…