Streamsets-JDBC模式offset变化逻辑和如何向下传递offset

Streamsets的版本为3.16.0 离线版

offset在jdbc模式中起到非常关键的作用,是滚动查询的基础,offset的准确直接影响数据同步的质量。
本文主要分享一下JDBC Query Consumer中的offset,包括变化逻辑、存储方式、处理器如何获取到最新的offset。
源:JDBC Query Consumer
处理器:Jython Evaluator

变化逻辑

管道的启动方式有三种,启动管道、重置源并启动、带参数启动。
启动管道时,会根据当前的管道ID读取offset,如果为首次启动,那以JDBC标签中设置的initial offset作为offset来进行数据同步。通过过程中会不停的将配置的offsetColumn的值赋给offset,不一定是最大的值,但一定是最后取到的值。取决于Sql Query中的排序规则。
重置源并启动时,字面意思,强制使用initial offset来同步数据。
带参数启动时,对管道添加外置参数,还没需求,没实际用过。

存储方式:

默认情况下,Streamsets使用文件对管道的offset进行持久化,地址在Streamsets的数据目录下,系统路径中找到数据路径。
数据存在的格式为:/{dataDir}/runInfo/{pipelineId}/{version:0}/offset.json
注意:源、处理器和目标不是完全独立的,目标没有结束不会更新offset。
这部分还没研究,猜测:目标执行结束后触发offset的持久化。

处理器如何获取到最新的offset。

在官方文档和网上搜索了很久,无果。offset无法直接获取。
处理器为Jython,既然使用了Jython,肯定想做更多Streamsets做不到的事情,如:标记一下数据、处理一下格式、打印一下日志等等。
但是内置的Jython模块中封装的方法较少,在看源码之前以为是开放程度很高,仔细研究了一下发现支持的方法是固定的几个,全写在注释里面了。
Jython实现方式:

    import java.sql.DriverManager as DriverManager
    import java.lang.Class as Class
    import time
    import json
    
    url = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
    Class.forName("com.mysql.jdbc.Driver")
    username = "root"
    password = "passwd"

	records = sdc.records
	conn = None
	stmt = None
	rs = None
	if len(records) != 0:
	  try:
	    conn = DriverManager.getConnection(url,username,password)
	    if conn is not None:
	      stmt = conn.createStatement()
	      sourceId = records[0].sourceId
	      start_time = time.time()
	      # 源码修改之后 修改之前请用split切割后获取offset
	      originDict = json.loads(sourceId)
	      sql = originDict['preparedQuery']
	      if sql:
            rs = smt.executeQuery(sql)
            rs.last()
            origin_count = rs.getRow()
          end_time = time.time()
          if len(records) != origin_count:
            sdc.log.info("valid error")
            # do something
          else:
	        for record in records:
	          sdc.output.write(record)
	  except Exception as e:
	    raise RuntimeError(e)
	  finally:
	    if rs:
	      rs.close()
	    if stmt:
	      stmt.close()
	    if conn:
	      conn.close()
	else:
	  sdc.log.trace('no more data')

soureId在origin阶段中的声明,存放在每条record的header中,格式为:{sqlQuery[0-100]}::rowCount:{rowCount}:{offset}。
此格式是固定的,在不修改源码的情况下,可以稍微花点时间使用分隔符将offset给分离出来。

final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));

但是不满足需求,解决更新时间作为同步字段丢失数据的问题,为了将查询和写入错峰,同时验证数据的准确性,在处理器中再次查询origin中的sql,对比传递过来的数量和再次查询的数量是否一致。但是sourceId中的sql只有100个字符,有风险。改之。

	// final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));
	// maven 引入 fastjson2
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("preparedQuery", preparedQuery.replaceAll("[\n\r]", ""));
    jsonObject.put("rowCount", rowCount);
    if (StringUtils.isNotEmpty(offsetColumn)) {
      jsonObject.put("offsetColumn", resultSet.getString(offsetColumn));
    }
    final String recordContext = jsonObject.toString();

在源的事件中获取offset:

在源的配置中勾选“制造事件”,添加处理器和目标。
sdc发送给下一阶段的数据和发送给处理事件的处理器的协议是一样的,但是数据内容不同,即同一个方法在不同的阶段返回的数据内容不同。
发送给事件处理器的数据中有offset、query、rows和timestamp。
默认发向事件处理器的事件有两种,分别为jdbc-query-success和jdbc-query-failure。


	records = sdc.records
	if len(records) != 0:
	  try:
	      offset = records[0].value['offset']
	      query= records[0].value['query']
	      rows= records[0].value['rows']

	  for record in records:
	      sdc.output.write(record)
	  except Exception as e:
	    raise RuntimeError(e)
	else:
	  sdc.log.trace('no more data')

通道示意图:
在这里插入图片描述

结语:

Streamsets在网上的资料实在是太少了,官方文档也只是讲基本的使用,开发过程中要大胆猜测。
希望这篇文章对你有帮助,我的实现方案也未必完美,欢迎新的思路和idea,此致敬礼。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/675419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在QGIS中加载MapBox图源

在设计行业中需要多风格地图的调用,不管是规划、建筑设计还是景观,分析图的工作量都大,有好的底图,会事半功倍。 针对不同项目,会选择不同配色的底图,以便让设计内容中的呈现足够清晰。 这里就来分享一个…

如何在自己的电脑上添加静态路由

1.任务栏搜索powershell 选择以管理员身份运行 2.输入 route add -p (永久) 目的网络地址例如192.168.10.0 mask 255.255.255.0(子网掩码)192.168.20.1(下一跳地址)。回车即可生效

238.除以自身以外数组的乘积

给你一个整数数组 nums,返回数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积。 题目数据保证数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度内完…

网络编程(六)

网络编程(六) 广播&组播广播步骤 组播步骤 广播&组播 广播 是一种基于1发送多接收的模型 (发送方和接收方) 广播是在局域网内实现的(发送到广播地址上的消息是会被局域网内同网段的所有主机进行接收&#xf…

[Redis]Set类型

集合类型也是保存多个字符串类型的元素的,但和列表类型不同的是,集合中 1)元素之间是无序的 2)元素不允许重复 一个集合中最多可以存储2^32-1个元素。 Redis 除了支持集合内的增删查改操作,同时还支持多个集合取交…

深入探讨ChatGPT API中的Tokens计算方式和计算库

引言 在现代人工智能应用中,自然语言处理(NLP)技术无疑是最受关注的领域之一。OpenAI推出的ChatGPT,作为一种先进的对话模型,已经在多个领域展示了其强大的语言生成能力。为了更好地使用ChatGPT API,理解其…

操作系统(3) 处理机调度

目录 一、处理机调度概述 1.基本准则 (1)CPU利用率 (2)系统吞吐量 (3)周转时间 (4)等待时间 (5)响应时间 2.进程调度方式 (1&#xff0…

【Linux】深入理解进程的优先级(Linux 2.6版本O(1)调度算法)

进程的优先级 【前置知识】一、进程的优先级(一)为什么要有优先级?(二)进程的优先级的范围 二、操作系统是如何实现进程的优先级?(Linux内核2.6版本O(1)调度算法) 【前置知识】 首先我们要了解…

【excel】设置二级可变联动菜单

文章目录 【需求】在一级菜单选定后,二级菜单联动显示一级菜单下的可选项【步骤】step1 制作辅助列1.列转行2.在辅助列中匹配班级成员 step2 名称管理器step3 制作二级下拉菜单step4 消除二级菜单中的空白 【总结】 之前做完了 【excel】设置可变下拉菜单&#xff…

导航时间与坐标转换

前言: 该章节代码均在Gitee中开源:因为这章是学校作业,所以稍微正经点. 时空位置转换https://gitee.com/Ehundred/navigation-engineering/tree/master/%E5%8D%AB%E6%98%9F%E5%AF%BC%E8%88%AA%E5%8E%9F%E7%90%86/%E5%AF%BC%E8%88%AA%E6%97…

Idea-Linux远程开发部署

第一步:File->Remote Development 第二步: 第三步: 第四步:在Host位置填写Linux虚拟机的IP地址,在Username、Password填写对应的账号密码后点击Test Connection测试连接。 第五步: 第六步:在…

【leetcode--文本对齐(还没整理完)】

根据题干描述的贪心算法,对于每一行,我们首先确定最多的是可以放置多少单词,这样可以得到该行的空格个数,从而确定该行单词之间的空格个数。 根据题目中填充空格的细节,我们分以下三种情况讨论: 当前行是…

Vue——样式绑定的几种方式

文章目录 前言往期回顾绑定对象绑定对象的另一种写法绑定数组数组与对象的嵌套 前言 样式绑定在vue中属于一种很常见的操作。在之前博客中针对样式的绑定操作,介绍了一个指令v-bind。缩写为:xxx。 vue 官网 样式绑定 往期回顾 先简单回顾下最开始绑定标签样式的操…

搭建gateway网关

1.创建springBoot项目 可以将Server URL换成start.aliyun.com 2.配置路由与跨域处理 路由: server:port: 10010 # 网关端口 spring:application:name: gateway # 服务名称cloud:nacos:server-addr: localhost:8848 # nacos地址gateway:routes: # 网关路由配置- i…

Java的冷知识你知道吗?

1、方法参数不能超过255个 在Java中,方法的参数数量是有限制的,最多不能超过255个。这个知识点可能对于大多数程序员来说并不常用,因此即使是经验丰富的Java开发者也可能不清楚这一点。2、Java中的自动装箱与拆箱 自动装箱是Java 5引入的新特…

站点被篡改快照被劫持解决服务方法教程_一招制敌

站点被篡改快照被劫持解决服务方法教程_一招制敌 被篡改表现形式: 站点打不开或跳转到别的网站。 攻击者目的: 报复、勒索、卖防御产品(如DDOS防御产品)。 攻击成本: 工具(如VPN购买)成本、人…

当新手小白有了一块【香橙派OrangePi AIpro】.Demo

当新手小白有了一块【香橙派OrangePi AIpro】.Demo 文章目录 当新手小白有了一块【香橙派OrangePi AIpro】.Demo一、香橙派OrangePi AIpro概述1.简介2.引脚图 二、“点亮”香橙派OrangePi AIpro1.官方工具下载2.官方镜像下载3.镜像烧录4.访问香橙派 AIpro 三、香橙派OrangePi A…

数据结构第三篇【链表的相关知识点一及在线OJ习题】

数据结构第三篇【链表的相关知识点一及在线OJ习题】 链表链表的实现链表OJ习题顺序表和链表的区别和联系 本文章主要讲解关于链表的相关知识,喜欢的可以三连喔 😀😃😄😄😊😊🙃&#…

Dubbo 自定义 Filter 编码实例

Dubbo的Filter机制为我们做应用的扩展设计提供了很多可能性,这里的Filter也是“责任链”机制的一种实现场景,作为Java码农,我们也经常接触到很多责任链的实现场景,如Tomcat进入Servlet前的filter,如Spring Aop代理的链…

性能飙升50%,react-virtualized-list如何优化大数据集滚动渲染

在处理大规模数据集渲染时,前端性能常常面临巨大的挑战。本文将探讨 react-virtualized-list 库如何通过虚拟化技术和 Intersection Observer API,实现前端渲染性能飙升 50% 的突破!除此之外,我们一同探究下该库还支持哪些新的特性…