flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[
    {
        "component": "oceanbase-ce",
        "access_url": "10.86.97.168:2881",
        "user": "root",
        "password": "pwd",
        "connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"
    },
    {
        "component": "obproxy-ce",
        "access_url": "10.86.97.168:2883",
        "user": "root@proxysys",
        "password": "Y6.B4s)pt",
        "connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"
    },
    {
        "component": "ocp-express",
        "access_url": "10.86.97.168:8180",
        "user": "admin",
        "password": "DSxF-{odkdX-bmL6fjrF2{3mLL",
        "connect_url": "http://10.86.97.168:8180"
    }
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)

常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;

2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;

3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_test
                MEMORY_SIZE = '2G',
                MAX_CPU = 1, MIN_CPU = 1,
                LOG_DISK_SIZE = '6G',
                MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;

4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');

5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';

6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)

7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';

8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (
  `dt` varchar(10) NOT NULL ,
  `uuid` varchar(30) DEFAULT NULL,
  `report_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。

SHOW MASTER STATUS;

SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;

/**
 * 就当成MySQL使用就行。
 */
public class OceanBaseCDCLikeMySQLExample {
    public static void main(String[] args) throws Exception {

        List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.86.97.168")
                .port(2883)// oceanbase安装时obproxy-ce组件端口
                .databaseList("flink_test")
                .tableList(String.join(",", SYNC_TABLES))
                .username("root@flink_test_tenant")
                .password("")// 记得修改为实际密码
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");

        // 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);
        // ②可能是数据库ip、port、账号、密码错误。
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1).print();

        env.execute("Print MySQL Snapshot + Binlog");

    }
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/947277.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JavaWeb开发(五)Servlet-ServletContext

1. ServletContext 1.1. ServletContext简介 1.1.1. ServletContext定义 ServletContext即Servlet上下文对象&#xff0c;该对象表示当前的web应用环境信息。 1.1.2. 获取ServletContext对象: &#xff08;1&#xff09;通过ServletConfig的getServletContext()方法可以得到…

ubuntu 如何使用vrf

在Ubuntu或其他Linux系统中&#xff0c;您使用ip命令和sysctl命令配置的网络和内核参数通常是临时的&#xff0c;这意味着在系统重启后这些配置会丢失。为了将这些配置持久化&#xff0c;您需要采取一些额外的步骤。 对于ip命令配置的网络接口和路由&#xff0c;您可以将这些配…

2024秋语法分析作业-B(满分25分)

特别注意&#xff1a;第17条产生式改为 17) Stmt → while ( Cond ) Stmt 【问题描述】 本次作业只测试一个含简单变量声明、赋值语句、输出语句、if语句和while语句的文法&#xff1a; 0) CompUnit → Block 1) Block → { BlockItemList } 2) BlockItemList → BlockItem…

SQL-leetcode-197. 上升的温度

197. 上升的温度 表&#xff1a; Weather ---------------------- | Column Name | Type | ---------------------- | id | int | | recordDate | date | | temperature | int | ---------------------- id 是该表具有唯一值的列。 没有具有相同 recordDate 的不同行。 该表包…

C#编写的金鱼趣味小应用 - 开源研究系列文章

今天逛网&#xff0c;在GitHub中文网上发现一个源码&#xff0c;里面有这个金鱼小应用&#xff0c;于是就下载下来&#xff0c;根据自己的C#架构模板进行了更改&#xff0c;最终形成了这个例子。 1、 项目目录&#xff1b; 2、 源码介绍&#xff1b; 1) 初始化&#xff1b; 将样…

通过无障碍服务(AccessibilityService)实现Android设备全局水印显示

一、无障碍功能简介 首先我们先来了解下无障碍功能的官方介绍&#xff1a; 无障碍服务仅应用于帮助残障用户使用 Android 设备和应用。它们在后台运行&#xff0c;并在触发 AccessibilityEvents 时接收系统的回调。此类事件表示用户界面中的某些状态转换&#xff0c;例如焦点已…

【Blackbox Exporter】prober.Handler源码详细分析

http.HandleFunc(path.Join(*routePrefix, "/probe"), func(w http.ResponseWriter, r *http.Request) {sc.Lock()conf : sc.Csc.Unlock()prober.Handler(w, r, conf, logger, rh, *timeoutOffset, nil, moduleUnknownCounter, allowedLevel)})我们了解到blackbox_ex…

SpringMVC核心、两种视图解析方法、过滤器拦截器 “ / “ 的意义

SpringMVC的执行流程 1. Spring MVC 的视图解析机制 Spring MVC 的核心职责之一是将数据绑定到视图并呈现给用户。它通过 视图解析器&#xff08;View Resolver&#xff09; 来将逻辑视图名称解析为具体的视图文件&#xff08;如 HTML、JSP&#xff09;。 核心流程 Controlle…

基于动力学的MPC控制器设计盲点解析

文章目录 Apollo MPC控制器的设计架构误差模型和离散化预测模型推导目标函数和约束设计优化求解优化OSQP求解器参考文献 Apollo MPC控制器的设计架构 误差模型和离散化 状态变量和控制变量 1、Apollo MPC控制器中状态变量主要有如下6个 matrix_state_ Matrix::Zero(basic_stat…

DC-DC 降压转换器设计提示和技巧

基本 DC-DC 降压转换器电路 在开始之前&#xff0c;我们先回顾一下DC-DC降压转换器的电路&#xff1a; 为了帮助您&#xff0c;我开发了降压设计中“什么影响什么”的矩阵&#xff1a; 主要的权衡是电感&#xff08;与 k 因子成反比&#xff0c;即峰峰值与平均电感电流之比&a…

Unity3D仿星露谷物语开发9之创建农场Scene

1、目标 绘制农场的场景。通过不同Sorting Layer控制物体的显示优先级&#xff0c;绘制Tilemap地图&#xff0c;添加Tilemap Collider碰撞器&#xff0c;同时添加Composite Collider碰撞器优化性能。 ps&#xff1a;绘制Tilemap的技巧&#xff1a;通过"Shift [" 可…

Linux 定时任务:轻松创建与精准执行

Linux 定时任务&#xff1a;轻松创建与精准执行 在 Linux 系统的运维与自动化管理领域&#xff0c;定时任务扮演着举足轻重的角色。它能够让系统在预设的时间点或周期性时段&#xff0c;自动执行特定的脚本、命令&#xff0c;极大地减轻了管理员的工作负担&#xff0c;提升工作…

Linux驱动开发:深入理解I2C时序(二)

在Linux驱动开发中,I2C时序的理解和正确处理是保证I2C设备正常工作和通信的关键。I2C协议的时序特性决定了数据的有效传输和设备间的协作。因此,掌握I2C的时序细节,以及如何在Linux内核中进行时序处理,能够让开发者更好地处理设备通信问题。 本文将继续深入探讨I2C通信协议…

国产编辑器EverEdit - 常用资源汇总

1 国产编辑器EverEdit-常用资源汇总 EverEdit是一款国产文本编辑器&#xff0c;历经超过15年的更新和维护&#xff0c;拥有不输业界顶级商业文本编辑器(EmEditor、UltraEdit)的实力&#xff0c;甚至在某些方面的功能更强(当然&#xff0c;各有千秋)&#xff0c;开发者对文本编辑…

解决uniapp H5页面限制输入框只能输数字问题

工作记录 最最近在做 uniapp 开发的移动端 H5 页面&#xff0c;有个需求是金额输入框只能输入数字&#xff0c;不能输入小数点和其他字符&#xff0c;经过各种尝试&#xff0c;发现其他字符可以通过正则过滤掉&#xff0c;但是输入小数点的话&#xff0c;因为没有触发 input 和…

面试准备备备备

职业技能 放到简历的黄金位置&#xff08;HR刷选简历的重要参考&#xff09; 基本准则&#xff1a;写在简历上的必须能聊&#xff0c;不然就别写 参考公式&#xff1a;职业技能 必要技术 其他技术 针对性的引导面试官&#xff08;让他问一些你想让他问的&#xff09; 寻找合…

npm install --global windows-build-tools --save 失败

注意以下点 为啥下载windows-build-tools&#xff0c;是因为node-sass4.14.1 一直下载不成功&#xff0c;提示python2 没有安装&#xff0c;最终要安装这个&#xff0c;但是安装这个又失败&#xff0c;主要有以下几个要注意的 1、node 版本 14.21.3 不能太高 2、管理员运行 …

Jenkins 中自动化部署 Spring Boot 项目

&#x1f468;&#x1f3fb;‍&#x1f4bb; 热爱摄影的程序员 &#x1f468;&#x1f3fb;‍&#x1f3a8; 喜欢编码的设计师 &#x1f9d5;&#x1f3fb; 擅长设计的剪辑师 &#x1f9d1;&#x1f3fb;‍&#x1f3eb; 一位高冷无情的全栈工程师 欢迎分享 / 收藏 / 赞 / 在看…

【DSP/matlab】fftshift 是什么意思?在信号处理中有什么作用?

文章目录 前言一、定义什么是 fftshift&#xff1f;fftshift 在信号处理中的作用&#xff1a; 前言 dsp_paper 一、定义 fftshift 是一个在信号处理和数字信号处理中常用的函数&#xff0c;特别是在使用快速傅里叶变换&#xff08;FFT&#xff09;时。这个函数的主要作用是将…

【PCIe 总线及设备入门学习专栏 4.2 -- PCI 总线的三种传输模式 】

文章目录 OverviewProgrammed I/O&#xff08;PIO&#xff09;Direct Memory Access (DMA)Peer-to-Peer 本文转自&#xff1a;https://blog.chinaaet.com/justlxy/p/5100053095 Overview 本文来简单地介绍一下PCI Spec规定的三种数据传输模型&#xff1a;Programmed I/O&…