Streamsets的版本为3.16.0 离线版
offset在jdbc模式中起到非常关键的作用,是滚动查询的基础,offset的准确直接影响数据同步的质量。
本文主要分享一下JDBC Query Consumer中的offset,包括变化逻辑、存储方式、处理器如何获取到最新的offset。
源:JDBC Query Consumer
处理器:Jython Evaluator
变化逻辑
管道的启动方式有三种,启动管道、重置源并启动、带参数启动。
启动管道时,会根据当前的管道ID读取offset,如果为首次启动,那以JDBC标签中设置的initial offset作为offset来进行数据同步。通过过程中会不停的将配置的offsetColumn的值赋给offset,不一定是最大的值,但一定是最后取到的值。取决于Sql Query中的排序规则。
重置源并启动时,字面意思,强制使用initial offset来同步数据。
带参数启动时,对管道添加外置参数,还没需求,没实际用过。
存储方式:
默认情况下,Streamsets使用文件对管道的offset进行持久化,地址在Streamsets的数据目录下,系统路径中找到数据路径。
数据存在的格式为:/{dataDir}/runInfo/{pipelineId}/{version:0}/offset.json
注意:源、处理器和目标不是完全独立的,目标没有结束不会更新offset。
这部分还没研究,猜测:目标执行结束后触发offset的持久化。
处理器如何获取到最新的offset。
在官方文档和网上搜索了很久,无果。offset无法直接获取。
处理器为Jython,既然使用了Jython,肯定想做更多Streamsets做不到的事情,如:标记一下数据、处理一下格式、打印一下日志等等。
但是内置的Jython模块中封装的方法较少,在看源码之前以为是开放程度很高,仔细研究了一下发现支持的方法是固定的几个,全写在注释里面了。
Jython实现方式:
import java.sql.DriverManager as DriverManager
import java.lang.Class as Class
import time
import json
url = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
Class.forName("com.mysql.jdbc.Driver")
username = "root"
password = "passwd"
records = sdc.records
conn = None
stmt = None
rs = None
if len(records) != 0:
try:
conn = DriverManager.getConnection(url,username,password)
if conn is not None:
stmt = conn.createStatement()
sourceId = records[0].sourceId
start_time = time.time()
# 源码修改之后 修改之前请用split切割后获取offset
originDict = json.loads(sourceId)
sql = originDict['preparedQuery']
if sql:
rs = smt.executeQuery(sql)
rs.last()
origin_count = rs.getRow()
end_time = time.time()
if len(records) != origin_count:
sdc.log.info("valid error")
# do something
else:
for record in records:
sdc.output.write(record)
except Exception as e:
raise RuntimeError(e)
finally:
if rs:
rs.close()
if stmt:
stmt.close()
if conn:
conn.close()
else:
sdc.log.trace('no more data')
soureId在origin阶段中的声明,存放在每条record的header中,格式为:{sqlQuery[0-100]}::rowCount:{rowCount}:{offset}。
此格式是固定的,在不修改源码的情况下,可以稍微花点时间使用分隔符将offset给分离出来。
final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));
但是不满足需求,解决更新时间作为同步字段丢失数据的问题,为了将查询和写入错峰,同时验证数据的准确性,在处理器中再次查询origin中的sql,对比传递过来的数量和再次查询的数量是否一致。但是sourceId中的sql只有100个字符,有风险。改之。
// final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));
// maven 引入 fastjson2
JSONObject jsonObject = new JSONObject();
jsonObject.put("preparedQuery", preparedQuery.replaceAll("[\n\r]", ""));
jsonObject.put("rowCount", rowCount);
if (StringUtils.isNotEmpty(offsetColumn)) {
jsonObject.put("offsetColumn", resultSet.getString(offsetColumn));
}
final String recordContext = jsonObject.toString();
在源的事件中获取offset:
在源的配置中勾选“制造事件”,添加处理器和目标。
sdc发送给下一阶段的数据和发送给处理事件的处理器的协议是一样的,但是数据内容不同,即同一个方法在不同的阶段返回的数据内容不同。
发送给事件处理器的数据中有offset、query、rows和timestamp。
默认发向事件处理器的事件有两种,分别为jdbc-query-success和jdbc-query-failure。
records = sdc.records
if len(records) != 0:
try:
offset = records[0].value['offset']
query= records[0].value['query']
rows= records[0].value['rows']
for record in records:
sdc.output.write(record)
except Exception as e:
raise RuntimeError(e)
else:
sdc.log.trace('no more data')
通道示意图:
结语:
Streamsets在网上的资料实在是太少了,官方文档也只是讲基本的使用,开发过程中要大胆猜测。
希望这篇文章对你有帮助,我的实现方案也未必完美,欢迎新的思路和idea,此致敬礼。