Debezium-EmbeddedEngine

提示:一个嵌入式的Kafka Connect源连接器的工作机制

文章目录

  • 前言
  • 一、控制流图
  • 二、代码分析
    • 1.构造函数
    • 2.完成回调
    • 3.连接器回调
    • 4.RUN
  • 总结


前言

工作机制:

* 独立运行:嵌入式连接器在应用程序进程中独立运行,不需要Kafka、Kafka Connect或  Zookeeper进程

* 数据传递:应用程序设置连接器并提供一个消费者函数,连接器将所有包含数据库变更事件的SourceRecord传递给该函数。

* 责任转移:应用程序负责故障恢复、可扩展性和持久性。

* 默认存储:连接器的数据库模式历史和偏移量默认存储在内存中,重启后会丢失。

* 执行与停止:连接器设计为提交给Executor或ExecutorService单线程执行,可以通过调用stop()方法或中断线程来停止。


提示:以下是本篇文章正文内容

一、控制流图

控制流图

二、代码分析

1.构造函数

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,
                           CompletionCallback completionCallback, ConnectorCallback connectorCallback) {
        this.config = config;
        this.consumer = consumer;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (!success) logger.error(msg, error);
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        assert this.config != null;
        assert this.consumer != null;
        assert this.classLoader != null;
        assert this.clock != null;
        keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        Configuration valueConverterConfig = config;
        if (valueConverter instanceof JsonConverter) {
            // Make sure that the JSON converter is configured to NOT enable schemas ...
            valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();
        }
        valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);

        // Create the worker config, adding extra fields that are required for validation of a worker config
        // but that are not used within the embedded engine (since the source records are never serialized) ...
        Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);
        embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        workerConfig = new EmbeddedConfig(embeddedConfig);
    }

构造函数

2.完成回调

/**
 * A callback function to be notified when the connector completes.
 */
public interface CompletionCallback {
    /**
     * Handle the completion of the embedded connector engine.
     * 
     * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error
     *            that prevented startup or premature termination.
     * @param message the completion message; never null
     * @param error the error, or null if there was no exception
     */
    void handle(boolean success, String message, Throwable error);
}

这段代码定义了一个接口 CompletionCallback,其中包含一个方法 handle。该方法用于处理嵌入式连接器引擎的完成状态。
参数:
success: 布尔值,表示连接器是否正常完成。如果为 true,表示连接器正常完成;如果为 false,表示连接器启动失败或提前终止。
message: 完成消息,不能为空。
error: 异常对象,如果没有异常则为 null。 

完成回调

3.连接器回调

    /**
     * Callback function which informs users about the various stages a connector goes through during startup
     */
    public interface ConnectorCallback {

        /**
         * Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
         * completed successfully
         */
        default void connectorStarted() {
            // nothing by default
        }

        /**
         * Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
         * completed successfully
         */
        default void connectorStopped() {
            // nothing by default
        }

        /**
         * Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
         * completed successfully
         */
        default void taskStarted() {
            // nothing by default
        }

        /**
         * Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
         * completed successfully
         */
        default void taskStopped() {
            // nothing by default
        }
    }

这段代码定义了一个接口 ConnectorCallback,其中包含了四个默认方法:connectorStarted、connectorStopped、taskStarted 和 taskStopped。这些方法用于在连接器和任务的不同生命周期阶段通知用户。
connectorStarted:当连接器成功启动时调用。
connectorStopped:当连接器成功停止时调用。
taskStarted:当连接器任务成功启动时调用。
taskStopped:当连接器任务成功停止时调用。
这些方法的默认实现为空,子类可以根据需要重写这些方法来添加自定义的回调逻辑。


连接回调

4 RUN方法核心流程

run()

核心流程参考上图

注:debezium-0.6版本 

总结

EmbeddedEngine方法和成员变量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/919236.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网络安全】SSL(二):Keyless SSL技术细节

未经许可,不得转载。 文章目录 TLS双重目标握手过程是什么?TLS 中的握手类型TLS 术语表RSA 握手协议临时 Diffie-Hellman 握手Diffie-Hellman 握手过程保护密钥服务器其他安全考虑性能提升场景分析持久连接精简握手会话恢复的问题Keyless SSL 的会话恢复功能会话票据恢复会话…

vue2侧边导航栏路由

<template><div><!-- :default-active"$route.path" 和index对应其路径 --><el-menu:default-active"active"class"el-menu-vertical-demo"background-color"#545c64"text-color"#fff"active-text-col…

ChatGPT Search VS Kimi探索版:AI搜索哪家强?!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…

交换机配置从IP(Switch Configuration from IP)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 本人主要分享计算机核心技…

【Redis】基于Redis实现秒杀功能

业务的流程大概就是&#xff0c;先判断优惠卷是否过期&#xff0c;然后判断是否有库存&#xff0c;最好进行扣减库存&#xff0c;加入全局唯一id&#xff0c;然后生成订单。 一、超卖问题 真是的场景下可能会有超卖问题&#xff0c;比如开200个线程进行抢购&#xff0c;抢100个…

STL——vector(1)

博客ID&#xff1a;LanFuRenC系列专栏&#xff1a;C语言重点部分 C语言注意点 C基础 Linux 数据结构 C注意点 今日好题 声明等级&#xff1a;黑色->蓝色->红色 欢迎新粉加入&#xff0c;会一直努力提供更优质的编程博客&#xff0c;希望大家三连支持一下啦 目录 尾…

【东莞石碣】戴尔R740服务器维修raid硬盘问题

1&#xff1a;石碣某塑料工厂下午报修一台戴尔R740服务器硬盘故障&#xff0c;催的还比较着急。 2&#xff1a;工程师经过跟用户确认故障的问题以及故障服务器型号和故障硬盘型号&#xff0c;产品和配件确认好后&#xff0c;公司仓库确认有该款硬盘现货&#xff0c;DELL 12T S…

使用 .NET 创建新的 WPF 应用

本教程介绍如何使用 Visual Studio 创建新的 Windows Presentation Foundation &#xff08;WPF&#xff09; 应用。 使用 Visual Studio&#xff0c;可以向窗口添加控件以设计应用的 UI&#xff0c;并处理这些控件中的输入事件以与用户交互。 在本教程结束时&#xff0c;你有一…

Shell基础(5)

声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团…

嵌入式:STM32的启动(Startup)文件解析

相关阅读 嵌入式https://blog.csdn.net/weixin_45791458/category_12768532.html?spm1001.2014.3001.5482 启动文件(Startup File)是嵌入式系统开发中的核心组件之一&#xff0c;它用于初始化系统并为主程序的运行做好准备。在大多数情况下&#xff0c;启动文件是用汇编语言编…

CH03_反射

第3章&#xff1a;反射 本章目标 掌握反射的原理 熟悉反射的基本运用 本章内容 反射是什么 C# 编译运行过程 首先我们在VS点击编译的时候&#xff0c;就会将C#源代码编译成程序集 程序集以可执行文件 (.exe) 或动态链接库文件 (.dll) 的形式实现 程序集中包含有Microsoft …

HAL_UARTEx_ReceiveToIdle_DMA和HAL_UART_Receive_DMA的区别

功能 HAL_UART_Receive_DMA 仅仅是开启的串口的DMA接收&#xff0c;若是想使用空闲中断 DMA接收则需要开启串口的空闲中断&#xff1b; 而HAL_UARTEx_ReceiveToIdle_DMA函数中则包含了开启串口空闲中断&#xff1b; HAL_UART_Receive_DMA 的接收类型是HAL_UART_RECEPTION_ST…

MyBlog(三) -- APP的应用

文章目录 前言一、APP是什么?二、创建APP三、使用APP1. 注册app2. 添加路由3. 运行过程4. 完善视图函数5. 结果展示 总结 前言 前面我们已经学习了如何创建一个新的项目,并且配置好了项目的启动文件,成功将项目启动! 那么接下来我们的主要任务就是需要完善这个项目中应该包含…

tdengine学习笔记-整体架构及docker安装

官方文档&#xff1a;用 Docker 快速体验 TDengine | TDengine 文档 | 涛思数据 整体架构 TDENGINE是分布式&#xff0c;高可靠&#xff0c;支持水平扩展的架构设计 TDengine分布式架构的逻辑结构图如下 一个完整的 TDengine 系统是运行在一到多个物理节点上的&#xff0c;包含…

【支持向量机(SVM)】:相关概念及API使用

文章目录 1 SVM相关概念1.1 SVM引入1.1.1 SVM思想1.1.2 SVM分类1.1.3 线性可分、线性和非线性的区分 1.2 SVM概念1.3 支持向量概念1.4 软间隔和硬间隔1.5 惩罚系数C1.6 核函数 2 SVM API使用2.1 LinearSVC API 说明2.2 鸢尾花数据集案例2.3 惩罚参数C的影响 1 SVM相关概念 1.1…

git 基础之 merge 和 rebase 的比较

在团队软件开发过程中&#xff0c;代码合并是一个基本且频繁执行的任务。 Git 提供了多种合并代码的策略&#xff0c;其中最常用的是 merge 和 rebase。 尽管二者的终极目标是相同的——整合代码变更——它们的方法和推荐的使用场景却有所区别。本文将详细介绍和比较这两种策…

sagemaker中使用pytorch框架的DLC训练和部署cifar图像分类任务

参考资料 https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-python-sdk/pytorch_cnn_cifar10/pytorch_local_mode_cifar10.ipynbhttps://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html 获取训练数据 # s3://zhaojiew-sa…

jmeter常用配置元件介绍总结之配置元件

系列文章目录 1.windows、linux安装jmeter及设置中文显示 2.jmeter常用配置元件介绍总结之安装插件 3.jmeter常用配置元件介绍总结之线程组 4.jmeter常用配置元件介绍总结之函数助手 5.jmeter常用配置元件介绍总结之取样器 6.jmeter常用配置元件介绍总结之jsr223执行pytho…

vite+vue3+ts编译vue组件后,编译产物中d.ts文件为空

一、前言 使用vue3vitets实现一个UI组件库&#xff0c;为了生成类型文件便于其他项目引用该组件库。根据推荐使用了vite-plugin-dts插件进行ts文件的生成 二、版本 组件版本vue ^3.5.12 vite ^5.4.10 vite-plugin-dts ^4.3.0 typescript ~5.6.2 三、问题描述 使用vitevi…

红外相机和RGB相机外参标定 - 无需标定板方案

1. 动机 在之前的文章中红外相机和RGB相机标定&#xff1a;实现两种模态数据融合_红外相机标定-CSDN博客 &#xff0c;介绍了如何利用标定板实现外参标定&#xff1b;但实测下来发现2个问题&#xff1a; &#xff08;1&#xff09;红外标定板尺寸问题&#xff0c;由于标定板小…