1. 背景
笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形:
在业务系统中,对某类对象打了标签,现在需要对这类对象进行过滤并分页查询,支持使用这类对象的属性和所打标签进行过滤。
这就出现了一个问题:标签和打标信息是存储在大数据平台的数据库中的,而业务数据是存储在业务系统数据库中的,这是两个数据库实例。
如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为大数据平台的模块,显然是不能将打标数据分散到各个业务系统数据库中的,所以我们就想到了使用CDC,将相关的打标数据同步到需要这些数据的业务库中。
2. 实践
首先构造出下图所示的计算管道:
整个计算管道就3个节点:
-
MySQL_BinLog(源)1节点,从源端MySQL上用BinLog模式增量拉取数据。它的主要配置如下:
主要是选择数据源,选择表,下面的输出项会根据选择的表模式自动生成。 -
1_1映射1节点,主要是为了使用侧输出功能,筛选过滤出目标库所需的部分打标数据。因为数据标签功能对在平台中定义了数据源的,有唯一标识的数据都能进行打标,所以当前目标数据源需要的打标数据只是其中一部分。这里之所以使用1_1映射节点的侧输出功能,而不是使用过滤节点,主要是因为考虑到后续可能有其它源也需要同步打标数据,用侧输出有更好的扩展性。
因为这里不需要按键分区,所以它的表达式是一个常数就行。然后定义了输出标签,右侧的表达式定义了筛选条件。这里输入数据行直接作为输出数据,无需处理,所以输出行一键使用输入行填充即可。 -
MySQL(目标)1节点,将“规则的打标数据”下沉到目标数据库。
选择目标数据源和表。这里的数据写入方式“全操作”指的是UpdteOrInsertOrDelete,另外还有
- 插入
- 更新
- 插入或更新
- 插入或删除
- 更新或删除
点击下图中的1按钮在开发环境调试。如果中心集群中当前工作空间专属的Flink集群没有运行,平台会自动启动这个Flink集群。
点击上图中的2按钮提交到开发环境,然后去部署。
下图是调试运行界面。
在生产环境,点击相应版本的计算管道进行部署。
选择集群,输入/选择集群标签,如果在指定的集群资源上没有指定标签的Flink集群,平台也会自动启动相应标签名称的集群,并部署Flink任务。