Canal深入调研
1.canal的设计
1.1 Canal的设计理念
canal的组件化设计非常好,有点类似于tomcat的设计。使用组合设计,依赖倒置,面向接口的设计。
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
1.2 canal的组件
canal server 这个代表了我们部署的一个canal 应用。
canal instance 这个代表了一个canal server中的多个 mysql instance ,从这一点说明一个canal server可以搜集多个库的数据,在canal中叫 destionation。
每个canal instance 有多个组件构成。在conf/spring/default-instance.xml中配置了这些组件。他其实是使用了spring的容器来进行这些组件管理的。
1.3 instance 包含的组件
这里是一个cannal Instance工作所包含的大组件。截取自 conf/spring/default-instance.xml
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser">
<ref local="eventParser" />
</property>
<property name="eventSink">
<ref local="eventSink" />
</property>
<property name="eventStore">
<ref local="eventStore" />
</property>
<property name="metaManager">
<ref local="metaManager" />
</property>
<property name="alarmHandler">
<ref local="alarmHandler" />
</property>
</bean>
1.eventParser 最基本的组件,类似于mysql从库的dump线程,负责从master中获取bin_log,整个parser过程大致可分为6步:
- Connection获取上一次解析成功的位置(如果第一次启动,则获取初始制定的位置或者是当前数据库的binlog位点)
- Connection建立连接,发生BINLOG_DUMP命令
- Mysql开始推送Binary Log
- 接收到的Binary Log通过Binlog parser进行协议解析,补充一些特定信息
- 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
- 存储成功后,定时记录Binary Log位置
2.eventSink 数据的归集,使用设置的filter对bin log进行过滤
说明:
数据过滤:支持通配符的过滤模式,表名,字段内容等
数据路由/分发:解决1:n (1个parser对应多个store的模式)
数据归并:解决n:1 (多个parser对应1个store)
数据加工:在进入store之前进行额外的处理,比如join
1 数据1:n业务 :
为了合理的利用数据库资源,一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注。
2 数据n:1业务:
同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.
3.目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:
定义了3个cursor
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
实现说明:
- Put/Get/Ack cursor用于递增,采用long型存储
- buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高)
4.metaManager 用来存储一些原数据,比如消费到的游标,当前活动的server等信息
1.4 各个组件目前支持的类型
canal采用了spring bean container的方式来组装一个canal instance ,目的是为了能够更加灵活。
1.eventParser 目前只有三种
1.1. MysqlEventParser 用于解析mysql的日志
1.2. GroupEventParser 多个eventParser的集合,理论上是对应了分表的情况,可以通过这个合并到一起
1.3. RdsLocalBinlogEventParser 基于rds的binlog 的复制
2.eventSink 目前只有EntryEventSink 就是基于mysql的binlog数据对象的处理操作
3.eventStore 目前只有一种 MemoryEventStoreWithBuffer,内部使用了一个Ringbuffer 也就是说canal解析的数据都是存在内存中的,并没有到zookeeper当中。
4.metaManager 这个比较多,其实根据元数据存放的位置可以分为三大类,memory,file,zookeeper
canal通过这些组件的选取可以达到不同使用场景的效果,比如单机的话,一般使用file来存储metadata就行了,HA的话一般使用zookeeper来存储metadata。
2. Instance设计
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
-
manager方式: 和你自己的内部web console/manager系统进行对接。(alibaba内部使用方式)
-
spring方式:基于spring xml + properties进行定义,构建spring配置.
-
spring/memory-instance.xml 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析。特点:速度最快,依赖最少
-
spring/file-instance.xml 所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.支持单机持久化
-
spring/default-instance.xml 所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享. 支持HA
-
spring/group-instance.xml 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
-
3. Server设计
server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现:
- Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
- Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。
4.增量订阅/消费设计
具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto - void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处: - get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
- get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:
- 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
- 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
- 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
- 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
5.数据格式
canal采用protobuff(protobuf是google团队开发的用于高效存储和读取结构化数据的工具):
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [发生的变更]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为文本]
上面就是Server跟client交互meesage的消息格式
canal-message example:
比如数据库中的表:
mysql> select * from person;
+----+------+------+------+
| id | name | age | sex |
+----+------+------+------+
| 1 | zzh | 10 | m |
| 3 | zzh3 | 12 | f |
| 4 | zzh4 | 5 | m |
+----+------+------+------+
3 rows in set (0.00 sec)
更新一条数据(update person set age=15 where id=4):
****************************************************
* Batch Id: [2] ,count : [3] , memsize : [165] , Time : 2016-09-07 15:54:18
* Start : [mysql-bin.000003:6354:1473234846000(2016-09-07 15:54:06)]
* End : [mysql-bin.000003:6550:1473234846000(2016-09-07 15:54:06)]
****************************************************
================> binlog[mysql-bin.000003:6354] , executeTime : 1473234846000 , delay : 12225ms
BEGIN ----> Thread id: 67
----------------> binlog[mysql-bin.000003:6486] , name[canal_test,person] , eventType : UPDATE , executeTime : 1473234846000 , delay : 12225ms
id : 4 type=int(11)
name : zzh4 type=varchar(100)
age : 15 type=int(11) update=true
sex : m type=char(1)
----------------
END ----> transaction id: 308
================> binlog[mysql-bin.000003:6550] , executeTime : 1473234846000 , delay : 12240ms
6.Canal的工作过程
6.1 启动时去MySQL 进行dump操作的binlog 位置确定
在启动一个canal instance 的时候,首先启动一个eventParser 线程来进行数据的dump 当他去master拉取binlog的时候需要binlog的位置,这个位置的确定是按照如下的顺序来确定的。
- 在启动的时候判断是否使用zookeeper,如果是zookeeper,看能否拿到 cursor (也就是binlog的信息),如果能够拿到,把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog。
- 通过1拿不到的话(一般是zookeeper当中没1提到的信息,比如第一次搭建的时候,或者因为某些原因zk中的数据被删除了),就去配置文件配置当中的去拿,把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog。
- 通过2依然没有拿到的话,就去mysql 中执行一个sql show master status 这个语句会显示当前mysql binlog最后位置的信息,也就是刚写入的binlog所在的位置信息。把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog。
后面的eventParser的操作就会以内存中(MemoryLogPositionManager)存储的binlog位置去master进行dump操作了。
mysql的show master status
操作如下:
mysql>
mysql> show master status\G
*************************** 1. row ***************************
File: mysql-bin.000028
Position: 635762367
Binlog_Do_DB:
Binlog_Ignore_DB:
Executed_Gtid_Set: 18db0532-6a08-11e8-a13e-52540042a113:1-2784514,
318556ef-4e47-11e6-81b6-52540097a9a8:1-30002,
ac5a3780-63ad-11e8-a9ac-52540042a113:1-5,
be44d87c-4f25-11e6-a0a8-525400de9ffd:1-156349782
1 row in set (0.00 sec)
6.2 数据在dump回来之后进行的归集/过滤(sink)和存储(store)
sink操作是可以支撑将多个eventParser的数据进行过滤filter
filter使用的是instance.properties中配置的filter,当然这个filter也可以由canal的client端在进行subscribe的时候进行设置。如果在client端进行了设置,那么服务端配置文件instance.properties的配置都会失效.
sink 之后将过滤后的数据存储到eventStore当中去。
目前eventStore的实现只有一个MemoryEventStoreWithBuffer,也就是基于内存的Ringbuffer(循环缓冲区),使用这个store有一个特点,这个ringbuffer是基于内存的,大小是有限制的(bufferSize = 16 * 1024 也就是16M),所以,当canal的客户端消费比较慢的时候,ringbuffer中存满了就会阻塞sink操作,那么正读取mysql binlog的eventParser线程也会受阻。这种设计其实也是有道理的。 因为canal的操作是pull 模型,不是producer push的模型,所以他没必要存储太多数据,这样就可以避免了数据存储和持久化管理的一些问题。使数据管理的复杂度大大降低。上面这些整个是canal的parser 线程的工作流程,主要对应的就是将数据从mysql搞下来,做一些基本的归集和过滤,然后存储到内存中。
6.3 binlog的消费者
binlog的主要消费者就是canal的client端。使用的协议是基于tcp的google.protobuf,当然tcp的模式是io多路复用,也就是nio。当我们的client发起请求之后,canal的server端就会从eventStore中将数据传输给客户端。根据客户端的ack机制,将binlog的元数据信息定期同步到zookeeper当中。
7.Canal的HA机制
Canal的HA机制类似于Hadoop组件的HA,都是依托于zk服务,通过zk来进行failover机器的状态切换。
canal的HA分为两部分,canal server和canal client分别有对应的ha实现:
- canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
- canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)
Canal Server:
过程:
- canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
- 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制.
HA配置架构图如下所示:
8.Canal的断点续传实现
数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次你重启client时,会从这最后一个位点继续进行消费)
- 使用default-instance.xml后,默认是通过zk管理位点信息的,所以 meta.data在这里无效的
- meta.data针对的模式是file-instance.xml,启动client消费之后会在对应的destionation目录下记录。
- instance.properties 中的位点与zk 位点 关系(以 default-instance.xml为例)
- 当zk中位点中不存在时,canal启动的位点以 instance.properties中为准,若没有,则通过
show master status
取最新的位点信息 - 当 zk 中存在位点信息时,以zk中的记录为准.
所以在正常数据同步的情况下,zk的位点和instance.properties中配置的位点是不同的。canal 也是通过其来实现断点续传的能力
9.腾讯云MySQL能否支持bin-log读取
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
这几个配置是官网的quick start中给出的mysql应打开的配置,然后根据配置去MySQL数据库中查看配置项是否打开。
MySQL [(none)]> show variables like '%log_bin%';
+---------------------------------+--------------------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql_root/log/20131/mysql-bin |
| log_bin_index | /data/mysql_root/log/20131/mysql-bin.index |
| log_bin_trust_function_creators | ON |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------------------+
6 rows in set (0.00 sec)
MySQL [(none)]> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
从腾讯云的测试MySQL上看log_bin服务已经打开,并且binlog_format也是配置成Row模式,部署了单机canal也能正常启动,destination目录下的h2.mv.db文件中也有数据录入,所以能正常使用。
-
Server 充当了什么角色?
server代表了一个canal的运行进程,对应了一个jvm,instance在server上运行。
-
Instance 是线程还是进程?
Instance是进程,instance的配置其实是在canal server的配置文件中的,在启动server的时候,会把配置的destination全部启动,他们都是运行在同一个server上的。
-
Kafka 的输出的字段说明?
Canal的ServerMode选择Kafka时,输出的Json信息解释
{
// data是update/insert操作具体更新的列及值
"data": [{
"id": "7",
"name": "kris",
"age": "28"
}],
// database是具体操作的数据库
"database": "school",
// es是执行耗时
"es": 1572421295000,
"id": 2,
// isDdl标识是否是ddl语句
"isDdl": false,
// mysqlType是字段mysql类型
"mysqlType": {
"id": "int(11)",
"name": "varchar(255)",
"age": "int(11)"
},
// old是旧数据列表, 用于update, size和data的size一一对应
"old": null,
// pkNames表示Mysql中的主键
"pkNames": null,
// sql是执行的具体的sql语句,一般DDL会产生。
"sql": "",
// sqlType表示字段在java中类型
"sqlType": {
"id": 4,
"name": 12,
"age": 4
},
// table是具体的表名
"table": "student",
// ts是同步时间
"ts": 1572421295689,
// 执行的SQL操作类型
"type": "INSERT"
}
- 如何 bootstrap?
Canal的bootstrap需要使用canal的Adapter的手动ETL功能,仅是server端的话是不支持这种功能的