canal server初始化源码分析

CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。

在开始之前,我们可以先了解下,

canal 配置方式
  1. ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
  2. SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
  • canal.properties  (系统根配置文件)
  • instance.properties  (instance级别的配置文件,每个instance一份)
入口代码
   public static void main(String[] args) {
        try {
            logger.info("## set default uncaught exception handler");
            // 设置全局的异常捕获
            setGlobalUncaughtExceptionHandler();

            // 支持rocketmq client 配置日志路径
            System.setProperty("rocketmq.client.logUseSlf4j","true");

            // 加载canal 配置
            logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
             // 初始化启动类
            final CanalStarter canalStater = new CanalStarter(properties);
            // canal.admin.manager 配置地址
            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
            if (StringUtils.isNotEmpty(managerAddress)) {
                // 获取admin 用户名和密码
                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
                if (StringUtils.isEmpty(passwd)) {
                    throw new IllegalArgumentException(
                        "canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
                }
                // 设置默认端口号
                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                // 自否自动注册
                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                //集群地址
                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
                //注册名称
                String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
                if (StringUtils.isEmpty(name)) {
                    // 以本地机器为默认名字
                    name = AddressUtils.getHostName();
                }
                //注册到zk 或者admin中server IP 不配置就是本地IP
                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                if (StringUtils.isEmpty(registerIp)) {
                    registerIp = AddressUtils.getHostIp();
                }
                // 初始化配置客户端
                final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                    user,
                    passwd,
                    registerIp,
                    Integer.parseInt(adminPort),
                    autoRegister,
                    autoCluster,
                    name);
                // 通过http方式加载远程配置 通过admin配置的信息
                PlainCanal canalConfig = configClient.findServer(null);
                if (canalConfig == null) {
                    throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                       + " can't not found config for [" + registerIp + ":" + adminPort
                                                       + "]");
                }
                Properties managerProperties = canalConfig.getProperties();
                // merge local  本地配置优先级更高
                managerProperties.putAll(properties);
                // auto.scan.interval  instance自动扫描的间隔时间,单位秒
                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));
                executor.scheduleWithFixedDelay(new Runnable() {

                    private PlainCanal lastCanalConfig;
                    // 定时异步线程去加载远程配置
                    public void run() {
                        try {
                            if (lastCanalConfig == null) {
                                lastCanalConfig = configClient.findServer(null);
                            } else {
                                //通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载
                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                                if (newCanalConfig != null) {
                                    // 远程配置canal.properties修改重新加载整个应用
                                    canalStater.stop();
                                    Properties managerProperties = newCanalConfig.getProperties();
                                    // merge local
                                    managerProperties.putAll(properties);
                                    canalStater.setProperties(managerProperties);
                                    canalStater.start();

                                    lastCanalConfig = newCanalConfig;
                                }
                            }

                        } catch (Throwable e) {
                            logger.error("scan failed", e);
                        }
                    }

                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
                canalStater.setProperties(managerProperties);
            } else {
                canalStater.setProperties(properties);
            }

            canalStater.start();
            runningLatch.await();
            executor.shutdownNow();
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)

    public PlainCanal findServer(String md5) {
        if (StringUtils.isEmpty(md5)) {
            md5 = "";
        }
        String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
                     + "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);
        return queryConfig(url);
    }

当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。

跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。

   1.  根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置

        // 服务端模式 可通过配置进行变更 canal.serverMode
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (!"tcp".equalsIgnoreCase(serverMode)) {
            ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
            // /plugin 和 /canal/plugin"
            canalMQProducer = loader
                .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
            if (canalMQProducer != null) {
                canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);
                // 初始化mq信息 由canal.serverMode 决定使用哪种mq
                canalMQProducer.init(properties);
            }
        }

        if (canalMQProducer != null) {
            MQProperties mqProperties = canalMQProducer.getMqProperties();
            // disable netty
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            if (mqProperties.isFlatMessage()) {
                // 设置为raw避免ByteString->Entry的二次解析
                System.setProperty("canal.instance.memory.rawEntry", "false");
            }
        }

2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置

3.  非tcp的方式,进行启动mq。

        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            // 当前server上部署的instance列表
            String destinations = CanalController.getDestinations(properties);
            // 启动mq
            canalMQStarter.start(destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }

4. 初始化CanalAdminController:提供canal admin的管理操作

        // start canalAdmin
        String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
        if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
            String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
            String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
            CanalAdminController canalAdmin = new CanalAdminController(this);
            canalAdmin.setUser(user);
            canalAdmin.setPasswd(passwd);
            String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

            logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
                port,
                user,
                passwd,
                ip);

            CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
            canalAdminWithNetty.setCanalAdmin(canalAdmin);
            canalAdminWithNetty.setPort(Integer.parseInt(port));
            canalAdminWithNetty.setIp(ip);
            canalAdminWithNetty.start();
            this.canalAdmin = canalAdminWithNetty;
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330722.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

零售EDI:Babylist EDI 项目案例

Babylist 与各种不同的品牌和零售商合作&#xff0c;包括婴儿用品、玩具、衣物和其他相关产品的制造商。用户可以在 Babylist 上浏览各种不同的产品&#xff0c;并根据自己的需求和喜好选择适合的项目。本文将为大家介绍对接Babylist 的EDI项目案例。 Babylist EDI 需求 传输协…

经典计算机网络面试题

1.说说HTTP常用的状态码及其含义&#xff1f; 状态码 类别 1xx 信息性状态码 2xx 成功状态码 3xx 重定向状态码 4xx 客户端错误状态码 5xx 服务端错误状态码 日常开发中的状态码&#xff1a; 状态码 含义 101 切换请求协议 200 请求成功 301 永久性重定向&…

线上党建展厅有哪些功能,如何搭建一个成功的线上党建展厅

引言&#xff1a; 随着互联网的飞速发展&#xff0c;线上党建展厅成为党建宣传的新平工具&#xff0c;它提供了一个方便的党建学习、交流和展示的空间。那么线上党建展厅有哪些功能&#xff0c;如何搭建一个成功的线上党建展厅呢&#xff1f; 一、线上党建展厅有哪些功能 1.组…

Wheeltec小车的开发实录(0)

配置静态ip&#xff08;可以联网&#xff09; 首先在你正常链接网络的时候打开“Connection Information”(我的是wifi&#xff0c;而且是手机热点&#xff0c;所以我手机就相当于一台路由器) 查看路由ip 观察到Default Route 是192.168.***.225这就是我手机的地址&#xff0…

node.js(express.js)+mysql实现登录功能

文章目录 前言实现步骤 实现步骤一、检测登录表单的数据是否合法&#xff08;3&#xff09;新建schema/user.js&#xff08;4&#xff09;在routes/use.js中引入schema/user.js中的方法reg_login_schema&#xff0c;代码如下&#xff1a; 二、根据用户名查询用户的数据三、判断…

嵌入式-Stm32-江科大基于标准库的GPIO4个小实验

文章目录 一 、硬件介绍二 、实验&#xff1a;LED闪烁、LED流水灯、蜂鸣器提示2.1 需求1&#xff1a;面包板上的LED以1s为周期进行闪烁。亮0.5s,灭0.5s.....2.2 需求2: 8个LED实现流水灯 三、硬件介绍-按键开关、光敏电阻四、 实验 按键控制LED、光敏传感器控制蜂鸣器4.1 需求1…

Go 语言中高效切片拼接和 GO 1.22 提供的新方法

Table Contents 切片拼接的必要性基本拼接方法及其局限性使用 append 函数高效拼接的策略控制容量和避免副作用利用 Go 1.22 的新特性切片动态扩容的深入理解内存重新分配与数据迁移性能优化策略结论在 Go 语言中,切片拼接是一项常见的操作,但如果处理不当,可能会导致性能问…

【计算机网络】【新加坡南洋理工大学】【Computer Control Network】【广域网和局域网简介】【中英对照(自译)】

一、说明 仅供学习使用。 二、广域网&#xff08;WAN&#xff09;和局域网&#xff08;LAN&#xff09;简介

基于springboot+vue的在线拍卖系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

Android: alarm定时很短时,比如500ms,测试执行mPowerManager.forceSuspend()后,系统不会suspend

参考文档&#xff1a; https://blog.csdn.net/weixin_35691921/article/details/124961404 Android: alarm定时很短时&#xff0c;比如500ms&#xff0c;然后执行mPowerManager.forceSuspend()后&#xff0c;系统不会suspend&#xff0c;原因分析&#xff1a; static int ala…

软件测试|sqlalchemy relationship

简介 SQLAlchemy是一个流行的Python ORM&#xff08;对象关系映射&#xff09;库&#xff0c;它允许我们以面向对象的方式管理数据库。在SQLAlchemy中&#xff0c;relationship是一个重要的功能&#xff0c;用于建立表之间的关系。在本文中&#xff0c;我们将详细探讨relation…

渗透测试之Kali如何利用CVE-2019-0708漏洞渗透Win7

环境: 1.攻击者IP:192.168.1.10 系统: KALI2022(vmware 16.0) 2.靶机IP:192.168.1.8 系统:Windows 7 6.1.7601 Service Pack 1 Build 7601 已开启远程协助RDP服务开启了3389端口 问题描述: KALI 如何利用CVE-2019-0708漏洞渗透Win7 解决方案: 1.打开kali,msf搜索…

[足式机器人]Part2 Dr. CAN学习笔记- Kalman Filter卡尔曼滤波器Ch05

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - Kalman Filter卡尔曼滤波器 Ch05 1. Recursive Algirithm 递归算法2. Data Fusion 数据融合Covarince Matrix协方差矩阵State Space状态空间方程 Observation观测器3. Step by step : Deriatio…

在自定义数据集上训练 YOLOv8 进行目标检测

这是目标检测中令人惊叹的 AI 模型之一。在这种情况下&#xff0c;您无需克隆存储库、设置要求并配置模型&#xff0c;就像在 YOLOv5 及其之前的版本中所做的那样。 在 YOLOv8 中&#xff0c;不需要执行这些手动任务。您只需安装 Ultralytics 即可&#xff0c;我将向您展示如何…

mybatis----小细节

1、起别名 在MyBatis中&#xff0c;<typeAliases>元素用于定义类型别名&#xff0c;它可以将Java类名映射为一个更简短的别名&#xff0c;这样在映射文件中可以直接使用别名而不需要完整的类名。 下面是一个示例&#xff1a; 在mybatis核心配置文件中配置typeAliases标…

[go语言]输入输出

目录 知识结构 输入 1.Scan ​编辑 2.Scanf 3.Scanln 4.os.Stdin --标准输入&#xff0c;从键盘输入 输出 1.Print 2.Printf 3.Println 知识结构 输入 为了展示集中输入的区别&#xff0c;将直接进行代码演示。 三者区别的结论&#xff1a;Scanf格式化输入&#x…

“深度剖析Nginx的高级部署“

深度剖析Nginx的高级部署与优化技巧 引言1. Nginx基础概念1.1 Nginx简介1.1.1 什么是Nginx&#xff1f;1.1.2 Nginx的特点与优势 2. Nginx部署安装MySQL运行java 总结 引言 在现代网络架构中&#xff0c;Nginx作为一款高性能的开源Web服务器&#xff0c;广泛应用于反向代理、负…

个人网站制作 Part 8 添加电子邮件通知与社交媒体集成 | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 添加电子邮件通知&#x1f528;使用Nodemailer&#x1f527;步骤 1: 安装Nodemailer &#x1f680; 社交媒体集成&#x1f528;使用社交媒体API&#x1f527;步骤 2: 集成…

Springcloud+Vue智慧工地源码 AI智能识别

智慧工地解决方案 一、现状描述 建筑工程建设具有明显的生产规模大宗性与生产场所固定性的特点。建筑企业70%左右的工作都发生在施工现场&#xff0c;施工阶段的现场管理对工程成本、进度、质量及安全等至关重要。同时随着工程建设规模不断扩大&#xff0c;工艺流程纷繁复杂&…

通讯录(C语言版)(静态通讯录)

✨欢迎来到脑子不好的小菜鸟的文章✨ &#x1f388;创作不易&#xff0c;麻烦点点赞哦&#x1f388; 所属专栏&#xff1a;项目 我的主页&#xff1a;脑子不好的小菜鸟 文章特点&#xff1a;关键点和步骤讲解放在 代码相应位置 引言&#xff1a; 1.菜单 通讯录也如同游戏&…