Streamsets-JDBC模式使用更新时间字段数据同步

StreamSets的开源地址:https://github.com/streamsets/datacollector-oss
Streamsets官网地址:https://streamsets.com/
Streamsets文档地址:https://docs.streamsets.com/portal/datacollector/3.16.x/help/index.html

我又来写Streamsets了,各种原因好久没研究Cassandra了。
本次分享主要介绍Streamsets的JDBC模式、为什么使用时间字段同步数据、遇到的问题和解决方案。
解决方案并不是最完美但也是基于当前条件下最优解,如有疑问,欢迎热烈讨论。
提供的脚本毫无保留,可直接使用。

Streamsets在3.22.2之后就闭源了,更高阶的特性已包装为平台产品。

结合周边讨论和网上的资料来看,Streamsets的活跃度不高,在网上搜的资料太少啦,又随着项目闭源,活跃度更低了,归其原因我分析Streamsets是一个大而全的数据同步工具,整合了市面上基本所有的数据源,但是每个公司不可能用到里面所有的数据源,真正能用到大部分数据源的公司,规模肯定大到不会依赖这种外部的工具,自己手写同步的自由度和效率要更好。

Streamsets对于我们的优势在于开箱即用,相比于手搓代码来实现业务细节,Streamsets将数据同步的每个阶段独立开来,将业务变动最大的数据清洗部分以处理器的形式开放出来,数据的转换和转换的实时配置并生效,直观的监控指标。

版本为Streamsets的3.16.0的离线版本,部署到内网时的最新版本为3.16.0,所以方案和问题的解决方案均以3.16.0为基础。

JDBC模式介绍:

JDBC模式的增量模式只支持新增的数据和不需要修改的数据,且官方建议的offsetColumn为PrimaryKey,如:ID。

Incremental mode
When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, JDBC Query Consumer uses incremental mode.

SELECT * FROM <table_name> WHERE <primaryKey> > ${OFFSET} ORDER BY <primaryKey>

这样支持的场景为不断的增量数据,无法捕获数据的更新。
但是正常的业务系统一般不存在只新增不更新的场景。
全量同步模式每次加载所有的数据,当表的数据量较大时,同步所需的时间和延迟不能接受。

修改为通过update_time来捕获数据变化:

SELECT * FROM user WHERE update_time > ${OFFSET} ORDER BY update_time

在配置管道时将OffsetColumn指定为update_time,业务系统使用mybatis-plus在数据新增和更新时补充创建时间和更新时间。数据库的时间精度为秒。

使用update_time的好处是对于开发者和运维人员可读性更好,在进行历史数据的同步和数据对接时更方便。
该方案看似非常合理,业务侧只要控制好update_time的逻辑,每次数据变化时update_time是不断滚动向前的,滚动查询不断的进行数据同步。
但是too young too simple。
按照Streamsets的处理逻辑,在两种场景下会丢数据。
分别是当单次同步的数据量超过maxBatchSize时,概率性丢数据和并发写入数据库时概率性丢数据。
这两种丢数据的场景是不可控的,时间不可控,完全看运气。但是不确定往往是最可怕的。

为什么会丢数据?

第一种场景:单次同步的数据量超过maxBatchSize
Offset的更新逻辑和jdbc-protolib源码中的逻辑:
origin会当根据sql查询的数据读取不超过配置的maxBatchSize的数量,并将最新的update_time赋值给offset。

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {
	  // ...
      try (Connection connection = dataSource.getConnection()) {
        if (null == resultSet || resultSet.isClosed()) {
          // 执行查询sql语句
          resultSet = statement.executeQuery(preparedQuery);
        }
        // 超过maxBatchSize的数据不发送到下一阶段,留到下次操作时处理。
        while (continueReading(rowCount, batchSize) && (haveNext = resultSet.next())) {
          final Record record = processRow(resultSet, rowCount);
          if (null != record) {
         	// 记录下数据
              batchMaker.addRecord(record);
            }
           // 更新offset
          if (isIncrementalMode) {
            nextSourceOffset = resultSet.getString(offsetColumn);
          } else {
            nextSourceOffset = initialOffset;
          }
          // 后续收尾工作
      }
    }
    return nextSourceOffset;
  }

结合Streamsets的Offset的更新逻辑和jdbc-protolib源码中的逻辑,当一秒内出现多条数据时,会因为精度问题导致数据丢失。

第二种场景:数据并发写入数据库时。
业务侧代码使用mybatis-plus作为ORM来处理数据的读写,当有大数据量写入数据时,如:Excel导入或高并发的数据写入。
mybatis-plus的内置处理逻辑为分批次提交,每次提交1000,所以单个线程写入的qps为1000。
以Excel导入为例,如果批量保存方法没有加@Transaction注解,会大大增加数据丢失的概率。

原因为结合mybatis的处理+没加@Transaction注解导致1000个insert语句一次性发给数据库,这1000条sql语句是以非事务的方式执行,每条数据都是一个完整的事务,执行完毕自动提交,立即可见。
这时当Streamsets触发查询操作时,时机恰好出现在一秒内的前半段,而一秒内的后半段还在数据写入,导致后半段的数据丢失。
场景分析

解决方案:

如果你拿到的是Streamsets的安装包,那第一种场景无法通过配置和升级的方式解决,因为使用的方式和增量模式的设计初衷不符。
有一份折中方案,但不保熟:
1.能力范围内update_time的精度越细越好,越细会有一定的性能损耗,但丢数据的概率大大降低。
2.评估每次同步的数据量大小,maxBatchSize的大小要大于单次同步的数据量。注意内存大小,小心OOM,(插一句:oracle的批量更新会存在连接泄露,需注意。如果有源码顺手改之。)
可以下载一份Streamsets的源码,改之。
代码如下:

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {
	  // ...
      try (Connection connection = dataSource.getConnection()) {
        if (null == resultSet || resultSet.isClosed()) {
          // 执行查询sql语句
          resultSet = statement.executeQuery(preparedQuery);
        }
        while ((haveNext = resultSet.next())) {
        if(continueReading(rowCount, batchSize)){
          final Record record = processRow(resultSet, rowCount);
          if (null != record) {
         	// 记录下数据
              batchMaker.addRecord(record);
            }
           // 更新offset
          if (isIncrementalMode) {
            nextSourceOffset = resultSet.getString(offsetColumn);
          } else {
            nextSourceOffset = initialOffset;
          }
          } else {
          // 当超过maxBatchSize时,继续查找最后一秒的数据。
          	if(!nextSourceOffset.equals(initialOffset) && nextSourceOffset.equals(resultSet.getString(offsetColumn))){
          	if(null != record) batchMaker.addRecord(record);
          }
          // 后续收尾工作
      }
    }
    return nextSourceOffset;
  }

第二种场景出现的原因是在同一秒内同时出现写入和查询操作,查询时无法取出应取出的数据。
解决的思路为错峰,通过配置手段将查询动作和写入动作错开。

// oracle
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < SYSDATE - INTERVAL '1' SECOND order by update_time;
// mysql
select * from user where update_time < '${offset}' and update_time < DATE_SUB(now(), INTERVAL 1 SECOND) order by update_time;
// dm
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < CURRENT_TIMESTAMP- INTERVAL '1' SECOND order by update_time;
// kingbase
select * from user where update_time < '${offset}' and update_time < current_timestamp - INTERVAL '1' SECOND order by update_time;

需要特别注意:因为数据库中存储的时间有可能为业务服务的时间,要保证数据库和业务服务的时区和时间要保持一致。

通道示意图:

新版Streamsets的布局,我的不长这样。streamsets-jdbc示意图
源:无特殊配置
Jython处理器:根据源传过来的数据查询目标表,对数据进行标记。
流选择器:根据数据的标记分发数据,标记为insert的走新增通道,标记为update的走修改通道。
目标:一个配置为INSERT,另一个配置为UPDATE。
Jython脚本:

import java.sql.DriverManager as DriverManager
import java.lang.Class as Class
import time

url = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
Class.forName("com.mysql.jdbc.Driver")
username = "root"
password = "passwd"
batch_size = 1000

primary_key = "id"
table_name = "t_target"
ids = []
db_ids = set()
records = sdc.records
conn = None
stmt = None
rs = None
if len(records) != 0:
  try:
    conn = DriverManager.getConnection(url,username,password)
    if conn is not None:
      stmt = conn.createStatement()
      start_time = time.time()
      for record in records:
        id = record.value[primary_key]
        ids.append(id)
      num_batches = len(ids) // batch_size + (1 if len(ids) % batch_size != 0 else 0)
      for i in range(num_batches):
        start_index = i * batch_size
        end_index = min((i+1) * batch_size,len(ids))
        batch_ids = ids[start_index,end_index]
        sql = "select ' + primary_key + ' from " + table_name + " where '+ primary_key +' in ('"
        for j,id in enumerate(batch_ids):
          if j != 0:
            sql += "','"
          sql += str(id)
        sql += "')"
        rs = stmt.executeQuery(sql)
        while rs.next():
          id = rs.getString(primary_key)
          db_ids.add(id)
      end_time = time.time()
      sdc.log.info('from '+ table_name + 'query:' + str(len(ids)) + 'rows cost:'+str(end_time - start_time) + 's')
      for record in records:
        id = record.value[primary_key]
        if id in db_ids:
          record.value['insert_or_update'] = 'update'
		else:
          record.value['insert_or_update'] = 'insert'
        sdc.output.write(record)
  except Exception as e:
    raise RuntimeError(e)
  finally:
    if rs:
      rs.close()
    if stmt:
      stmt.close()
    if conn:
      conn.close()
else:
  sdc.log.trace('no more data')

结语:

截止到此,也算一套完整的解决方案。拷贝之后可直接食用。
后面有时间会分享一些定位时发现的问题和小技巧。

  • 国产化数据库达梦和人大金仓的适配。
  • 国产化服务器加密环境的打包和部署方案。
  • 为Streamsets减负,轻量化安装包。
  • JDBC模式的性能优化小技巧。
  • 穿插一些Streamsets组件的实现原理。
  • Streamsets CDC模式的配置。
  • 手写一份Streamsets的Stage,用以支撑国产化的需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/669054.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安全测试扫描利器-Burpsuite

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

全自动打包封箱机:解析其在产品质量与安全保障方面的作用

在当今快节奏的生产环境中&#xff0c;全自动打包封箱机以其高效、精准的特点&#xff0c;正逐渐成为生产线上的得力助手。它不仅提升了生产效率&#xff0c;更在产品质量与安全保障方面发挥着举足轻重的作用。星派将详细解析全自动打包封箱机在产品质量与安全保障方面的作用。…

自监督表示学习和神经音频合成实现语音修复

关键词&#xff1a;语音修复、自监督模型、语音合成、语音增强、神经声码器 语音和/或音频修复的目标是增强局部受损的语音和/或音频信号。早期的工作基于信号处理技术&#xff0c;例如线性预测编码、正弦波建模或图模型。最近&#xff0c;语音/音频修复开始使用深度神经网络&a…

Qt | QSplitter(分离器或分隔符)、QSplitterHandle 类(分界线)

​01、一、QSplitter 类(分离器) 1、QSplitter 类继承自 QFrame 类,也就是说该类是一个带有边框的可视部件。 2、QSplitter 类实现分离器,分离器用于分离两个部件,用户可通过拖动部件之间的分界线来调整子部件的大小。 3、QSplitter 的原理(见上图):QSplitter 的实现原理…

AWS中国峰会2024 半日游

亚马逊云科技中国峰会于2024年5月29-30日在上海举办 今年就去了半天&#xff0c;去年也是去过的&#xff0c;不过今年的活动个人感觉比去年略微凌乱了一点。 今年的峰会方向和去年一致&#xff0c;均是AI方向的各项内容&#xff08;基础架构、安全、服务、游戏、驾驶、各行各…

浅谈防勒索病毒的关键

主机加固能否做好防勒索病毒的工作&#xff0c;一直是网络安全领域的重要议题。随着信息技术的飞速发展&#xff0c;勒索病毒等网络威胁层出不穷&#xff0c;对企业和个人数据安全构成了严重威胁。因此&#xff0c;如何通过主机加固提升安全防护能力&#xff0c;防止勒索病毒的…

大模型管理工具Ollama搭建及整合springboot

目录 一、Ollama介绍 1.1 什么是Ollama 1.2 Ollama特点与优势 二、Ollama本地部署 2.1 版本选择 2.2 下载安装包 2.3 执行安装 2.4 Ollama常用命令 三、使用Ollama部署千问大模型 3.1 千问大模型介绍 3.2 部署过程 四、springboot接入Ollama 4.1 引入Ollama依赖 4…

音频信号分析与实践

音频信号分析与实践课程,方便理解音频信号原理和过程 1.音频信号采集与播放 两种采样模式和标准的采样流程 人说话的声音一般在2kHz一下&#xff1a; 采样频率的影响&#xff1a;采样率要大于等于信号特征频率的2倍&#xff1b;一般保证信号完整&#xff0c;需要使用10倍以上的…

python找出100~999之间的水仙花数字

水仙花数字&#xff1a;个位&#xff0c;十位&#xff0c;百位的立方之和等于这个数本身 例如&#xff1a;153 1^35^33^3 for i in range(100, 1000):bw i // 100sw i % 100 // 10gw i % 10if bw ** 3 sw ** 3 gw ** 3 i:print(i)

SpringBoot读取json文件

使用SpringBoot读取json文件作为接口&#xff0c;前端Vue可以通过跨域访问接口数据 一、创建SpringBoot 文件 创建一个 SpringBoot 文件&#xff0c;文件结构目录如下&#xff1a; 二、在pom.xml添加依赖 <!--Spring Boot 依赖--> <parent><artifactId>sp…

WireShark下载安装

下载地址 WireShark站内下载资源&#xff1a;&#xff08;土豪方便下载&#xff09; https://download.csdn.net/download/qq_58662768/89377088 官网下载&#xff1a; Wireshark Go Deep 进入主页后&#xff0c;选择Get Acquainted&#xff0c;再选择Download。 选择合适…

TPL0401B使用教程

1.前言 前面做程控放大器的时候&#xff0c;有除开AD602&#xff0c;还有一个AD620&#xff0c;性能更好&#xff0c;不过是通过外部电阻来控制放大倍数的&#xff0c;不过要是接滑动变阻器就太不优雅了&#xff0c;而且单片机怎么控制滑动变阻器&#xff1f;&#xff08;难不…

linux 内核哪种锁可以递归调用 ?

当数据被多线程并发访问(读/写)时&#xff0c;需要对数据加锁。linux 内核中常用的锁有两类&#xff1a;自旋锁和互斥体。在使用锁的时候&#xff0c;最常见的 bug 是死锁问题&#xff0c;死锁问题很多时候比较难定位&#xff0c;并且影响较大。本文先会介绍两种引起死锁的原因…

简易版本的QFD质量屋

比如餐馆要考虑什么因素最重要&#xff0c;这里列出好吃&#xff0c;快速&#xff0c;便宜三类问题&#xff0c;然后设置上图的权重&#xff0c; 然后设置9&#xff0c;3&#xff0c;1三类因子&#xff0c;9比如是最重要的&#xff0c;3&#xff0c;1&#xff0c;依次没那么重要…

开发一个comfyui的自定义节点-支持输入中文prompt

文章目录 目标功能开发环境实现过程翻译中文CLIP编码拓展仓库地址完整代码目标功能 目前comfyui的prompt提示词输入节点 CLIP Text Encode 只支持输入英文的prompt,而有时候我们需要自己制定一些prompt,所以就得将我们想要的提示词翻译为英文后再复制粘贴到该节点的输入框中…

算法练习第26天|46.全排列、47全排列II

46.全排列 46. 全排列 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/permutations/description/ 题目描述&#xff1a; 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 示例 1&#xff1a;…

C++STL---vector模拟实现

通过上篇文章&#xff0c;我们知道vector的接口实际上和string是差不多的&#xff0c;但是他俩的内部结构却大不一样&#xff0c;vector内有三个成员变量&#xff1a;_start、_finish、_endofstorage: _start指向容器的头元素&#xff0c;_finish指向有效元素末尾的元素&#x…

【计算机毕业设计】359微信小程序校园失物招领系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

Java基础知识点(反射、注解、JDBC、TCP/UDP/URL)

文章目录 反射反射的定义class对象反射的操作 注解注解的定义注解的应用注解的分类基准注解元注解 自定义注解自定义规则自定义demo JDBCTCP/UDP/URLTCPUDPURL 反射 反射的定义 Java Reflection是Java被视为动态语言的基础啊&#xff0c; 反射机制允许程序在执行期间接入Refl…

Vue进阶之Vue无代码可视化项目(一)

Vue无代码可视化项目 项目搭建初始步骤拓展:工程项目从0-1项目规范化package.jsoncpell.jsoncustom-words.txtts-eslint规则.eslintrc.cjsgit钩子检查有没有问题type-checkspellchecklint:stylehusky操作安装pre-commitpnpm的commit规范package.json:commitlint.config.cjs安装…