流数据源通常是动态、无界的,看起来与静态、有限的批数据源区别较大,传统的数据库技术在架构上难以直接处理流数据源,只能让位于后来者。heron\samza\storm\spark\flink等计算框架最先完成突破,在流计算技术中占得先发优势。这些框架非常成功,以至于一说到流计算,应用程序员通常都会去转头寻找某种框架,而不宣称是某种框架的计算技术,则通常被认为不适合实现流计算。
虽然计算框架最先突破流计算,但框架本身对流计算的意义不大。框架是针对特定的应用场景,事先准备的结构性代码,比如扩展方式(分布式、集中式)、主控流程(异步、同步)等,具体的应用程序只需往框架里填业务逻辑代码。框架优点鲜明,可以用较低的代码量建立成熟稳定的程序结构,但缺点也不能忽视,应用程序无法脱离框架的结构性束缚,只适用于框架预设的场景,这种预设场景很难面面俱到(否则太复杂太难用),通常只能针对单一场景。实际的应用场景多种多样日日有新,往往与预设场景不符,比如让spark被jasperReport报表调用,或让flink应用于小集群或边缘计算。这种场景下使用框架,开发者除了尽力适应预设场景,就只能对框架做出一定的修补,前者经常要牺牲计算性能或浪费硬件成本,后者则会付出巨大的工作量。如果针对实际场景设计结构性代码(并引入流计算类库),则可避免牺牲浪费和付出。即使实际场景恰好符合预设的场景,流计算框架也不一定是最优方案,比如分布式场景下,在降低结构性代码和计算代码的耦合性方面,分布式流计算框架就不如分布式服务框架(如Zookeeper、HSF、Dubbo)+流计算类库。
对流计算有意义的是访问能力和计算能力。流计算由流数据源和计算组成,也就是流数据访问能力和数据计算能力。访问能力指流数据源接口和流入机制。流数据源的种类很多,包括狭义的字符流和字节流,如http流、文件流、消息流(如kafka),也包括广义的流数据,如RDB的记录流(游标)、NoSQL的文档流。流入机制分为主动和被动两种,即系统主动从外界获取数据,以及系统被动等待外界输入数据。计算能力的门槛较高,作为流计算技术,至少应提供基本的结构化计算函数、基本的流程控制语法、基本的半结构化数据处理能力,即过滤、去重、排序、分组汇总、关联、归并等计算函数;分支结构(如if),循环结构(如for);将json\xml等半结构化数据解析为记录。
访问能力是基础,计算能力是核心。流计算还是计算,只不是数据源是流式的,那么计算能力本身才是根本,尤其是高级计算能力。访问能力的门槛较低,容易复制扩展,同质化严重,并非流计算的核心。高级计算能力的门槛很高,在根本上决定了流计算的开发效率和计算性能,是流计算的核心,比如流批混算、简化复杂计算、高性能计算。很多流计算框架往往将重点放在访问能力上(甚有的连这都不足),计算能力反倒不足,不能简化业务逻辑的复杂度,普遍还不如传统数据库。
高级计算能力:流批混算、简化复杂计算、高性能计算。除了监控等特殊场景外,单纯的流计算在现实中很少见,流数据和批数据的混合计算才是常态,实际上,监控类计算通常也要将批量数据(分钟小时级)和实时数据(秒级)合并后再计算。在实际项目中,虽然存在很多简单的计算逻辑如过滤、去重等,但更有价值的计算逻辑通常较复杂,需要流计算技术提供丰富的计算函数和自由的表达语法,从而直观快速地实现计算目标。流技术的实时性都不差,甚至超过人类感知范围,毫秒级对大多数应用意义并不大,但因为流批混算是常态,且很多流计算技术的批计算性能较弱(远不如RDB),导致整体性能较差,这种情况下必须提供丰富的高性能计算类库,才能提升整体计算性能。流数据经常要写出去再计算,比如暂存成温数据或长存成冷数据,流数据体积普遍偏大,再计算的时间普遍偏长,这种情况下也需要提供高性能的内外存存储格式,才能提升再计算(以及写读)的性能。流计算框架是对计算本身提供的支持并不丰富,专业性不足,高级计算能力还不成熟。
好的流计算技术,应当将重点放在流计算上,从而简化业务逻辑开发的复杂度,同时弱化框架,把结构性代码留给应用程序去做,从而适应各类应用场景。并在保证访问能力的基础上,重点提高计算能力,尤其是高级计算能力。
esProc SPL正是符合这些条件的流计算技术。
SPL是基于JVM的轻量级开源计算类库,支持灵活简单的JDBC集成接口,提供了方便的流数据访问能力和基本的计算能力,支持流批混算、简化复杂计算、高性能计算,是更加专业的计算语言。
灵活简单的集成接口
方便易用的JDBC接口。SPL计算代码以脚本文件的形式存于操作系统目录,Java代码通过JDBC调用SPL脚本文件,调用方法同存储过程。SPL的集成接口灵活简单,适用于广泛的应用场景,比如报表、桌面、Web、单机环境、分布式集群、手机应用、边缘计算。
例子:先编写SPL脚本。
A | B | |
1 | =connect("ORCL") | Oracle连接 |
2 | =A1.cursor@x("select OrderID,Amount,Client,SellerID,OrderDate from orders where OrderDate>? && OrderDate<=?",arg1,arg2) | Oracle有界流 |
3 | … |
将上面SPL脚本存为mix.splx,Java通过JDBC调用脚本:
Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("{call mix(?, ?)}");
statement.setObject(1, "2022-01-01");
statement.setObject(2, "2022-12-31");
statement.execute();
计算外置降低耦合性。SPL没有采用计算代码与非计算代码混合的框架,相反,SPL的计算代码外置于Java的非计算代码,两者各司其职专业互补,一方改动不影响另一方,程序员不用借助框架,就可以用简单代码实现计算逻辑和前端应用的解耦。
解释型语言支持热切换。SPL是解释型语言,修改后可立即执行,无须编译无须停机无须重启 JAVA 应用,程序员无需编写结构性代码,就能实现计算逻辑的热切换。
方便的流数据访问能力
SPL支持丰富的流数据源,既包括狭义的字符流和字节流,如http流、文件流、消息流(如kafka),也包括广义的流数据,如RDB的记录流(游标)、NoSQL的文档流。
例子:过滤Kafka的无界流:
A | B | C | |
1 | =kafka_open("D://kafka.properties","topic-test") | kafka连接 | |
2 | for | =kafka_poll(A1).(json(value)) | 循环取数 |
3 | =B2.news(Datas;~.TagFullName, round(Time,-3):Time, ~.Type, ~.Qualitie, ~.Value) | 转为记录 | |
4 | =B3.select(Value>3.0 && Value<=4.0) | 过滤 | |
5 | … |
SPL支持的流数据源还有:任意RDB、Cassandra、InfluxDB、Redis、Dynamodb、ElasticSearch、HBase、Hive、HDFS、MongoDB、SAP、S3、阿里云等。除了读取,SPL也支持将计算结果写入这些数据源。
主动和被动的流入机制。主动流入机制,即在SPL脚本中通过流数据源接口获取数据并完成计算。参考前面过滤kafka的例子。
被动流入,即SPL脚本被动接收数据并完成计算。例子,实时发现异常工况。部分SPL脚本如下:
A | B | |
1 | =func(A12,learn_interval,extrem_interval) | 调用A12格的子函数,计算极值序列 |
2 | =func(A16,A1.(~(1)), learn_interval,extrem_interval,seq,"up") | 调用A16格的子函数,计算阈值上限 |
3 | … | 余下代码略 |
先在Java中通过API访问工厂设备的传感器,获取实时的时间序列;再通过JDBC调用SPL脚本,主要参数是时间序列Seq,次要参数有学习区间learn_interval、极值区间extrem_interval、报警间隔warn_interval;最后在SPL中接收参数,利用流入的时间序列完成异常发现。
基本计算能力
SPL内置基本结构化计算函数,可以轻松完成日常的SQL式计算。
过滤:data.select(Amount>1000 && Amount<=3000 && like(Client,"*s*"))
排序:data.sort(Client,-Amount)
去重:data.id(Client)
分组汇总:data.groups(year(OrderDate);sum(Amount))
关联:join(T ("D:/data/Orders.csv"):O,SellerId; T("D:/data/Employees.txt"):E,EId)
TopN:data.top(-3;Amount)
SPL还提供了符合SQL92标准的语法,支持集合计算、case when、with、嵌套子查询等。
SPL提供基本流程控制语法,可以方便地实现日常业务逻辑。
分支结构:
A | B | |
2 | … | |
3 | if T.AMOUNT>10000 | =T.BONUS=T.AMOUNT*0.05 |
4 | else if T.AMOUNT>=5000 && T.AMOUNT<10000 | =T.BONUS=T.AMOUNT*0.03 |
5 | else if T.AMOUNT>=2000 && T.AMOUNT<5000 | =T.BONUS=T.AMOUNT*0.02 |
循环结构:
A | B | |
3 | for T | =A3.BONUS=A3.BONUS+A3.AMOUNT*0.01 |
4 | =A3.CLIENT=CONCAT(LEFT(A3.CLIENT,4), "co.,ltd.") | |
5 | … |
SPL具有基本的半结构化数据处理能力,可以方便地处理Json\XML或不规则文本,尤其适合kafka等消息队列或mongoDB等NoSQL。
例如:对多层Json串进行条件查询,并用Json串返回计算结果:
A | B | |
1 | =json(arg_JsonStr) | 解析参数Json串 |
2 | =A1.conj(Orders) | 合并下层记录 |
3 | =A2.select(Amount>1000 && Amount<=2000) | 条件查询 |
4 | =json(A3) | 结果转为Json串 |
高级计算能力:流批混算
计算能力不强的流计算技术往往不支持流批混算,即使表面支持,底层也是批、流两套引擎各算各的,需要事先转为同一类数据(通常是流数据),再用单一引擎(通常是流数据)进行伪混算。
SPL是专业的计算语言,流数据和批数据底层模型统一,单一引擎既能算流数据也能算批数据,自然也能直接进行混合计算,不必事先转为同类数据。
例子:Oracle与txt左关联,其中Oracle表太大无法读入内存。
A | B | |
1 | =connect("ORCL") | Oracle连接 |
2 | =A1.cursor@x("select OrderID,Amount,Client,SellerID,OrderDate from orders where OrderDate>? && OrderDate<=?",arg1,arg2) | Oracle有界流 |
3 | =T("d:/data/Employees.txt") | txt内存表 |
4 | =A2.join(SellerID,A3:EID,Dept) | oracle关联txt |
5 | =A4.groups(Dept, Client; sum(Amount),count(1)) | 分组汇总 |
有些混算在原理上必须转为同类数据再算,比如流+批的归并、批+流(量小且有界)的外关联,这种情况下需要进行流批间的类型转换。计算能力不强的流计算技术有多种流式结构化数据类型和批量结构化数据类型,转换关系繁多且互相难以直接转换,通常要硬编码实现。
SPL是专业的计算语言,只有序表(批)、游标(流)两种结构化数据类型,互相可以方便地转化。例子:将Oracle的流记录与外部传入的Json串进行归并关联。
A | B | |
1 | =connect("ORCL").cursor@x("select * from Orders Order by SellerID") | 有序流数据 |
2 | =json(arg_JsonStr).sort(EID) | 有序批数据 |
3 | =A2.cursor() | 批转流 |
4 | =joinx(A1,SellerID;A3,EID) | 双流归并 |
5 | =A4.groups(#2.Dept;sum(#1.Amount)) | 分组汇总 |
高级计算能力:简化复杂计算
SPL支持有序计算、集合计算、分步计算、关联计算,可以简化复杂的结构化数据计算。例子:计算每个传感器电压最高的3条记录:
data.group(SensorID).(~.top(3;V))
SPL有真正的行号字段,支持有序集合,可以用直观的代码进行计算,即先按SensorID分组,再对各组(即符号~)计算TopN。SPL的集合化更加彻底,可以实现真正的分组,即只分组不汇总,这就可以直观地计算组内数据。
例子:最大连续上涨天数:
A | |
1 | =tbl.sort(day) |
2 | =t=0,A1.max(t=if(price>price[-1],t+1,0)) |
SPL容易表达连续上涨的概念,先按日期排序;再遍历记录,发现上涨则计数器加1。代码中的max是循环函数,可依次遍历每条记录;代码中的[-1]是有序集合的用法,表示上一条,是相对位置的表示方法,price[-1]表示上一个交易日的股价,比整体移行(如SQL中的lag函数)更直观。
例子:求销售额占到一半的前n个客户:
A | B | |
2 | =sales.sort(amount:-1) | /销售额逆序排序,可在SQL中完成 |
3 | =A2.cumulate(amount) | /计算累计序列 |
4 | =A3.m(-1)/2 | /最后的累计即总额 |
5 | =A3.pselect(~>=A4) | /超过一半的位置 |
6 | =A2(to(A5)) | /按位置取值 |
SPL集合化成更彻底,可以用变量方便地表达集合,并在下一步用变量引用集合继续计算,因此特别适合多步骤计算。将大问题分解为多个小步骤,可以方便地实现复杂的计算目标,代码不仅简短,而且易于理解。此外,多步骤计算天然支持调试,无形中提高了开发效率。
SPL还支持游离记录,可以用点号直观地引用关联表,从而简化复杂的关联计算。
SPL有丰富的日期和字符串函数,能有效简化相关计算。
季度增减:elapse@q("2020-02-27",-3) //返回2019-05-27
N个工作日之后的日期:workday(date("2022-01-01"),25) //返回2022-02-04
字符串类函数,判断是否全为数字:isdigit("12345") //返回true
取子串前面的字符串:substr@l("abCDcdef","cd") //返回abCD
按竖线拆成字符串数组:"aa|bb|cc".split("|") //返回["aa","bb","cc"]
SPL还支持年份增减、求季度、按正则表达式拆分字符串、拆出单词、按标记拆HTML等大量函数。
值得一提的是,为了进一步提高开发效率,SPL还创造了独特的函数语法。比如用选项区分类似的函数,只过滤出符合条件的第1条记录,可使用选项@1:
T.select@1(Amount>1000)
从后往前查找第1条记录,可以使用@z:
T.select@z1(Amount>1000)
高级计算能力:高性能计算
SPL提供了大量高性能计算函数。有些函数(及语法)是独创的,可以用更低的时间复杂度达到同样的计算目标,比如遍历复用、倍增分段;有些函数是通用的,代码更简单用法更方便,比如二分查找、哈希索引。
SPL提供了高性能存储格式。高性能算法通常要基于高性能存储,像有序归并、单边分堆都要求数据有序才能实施,SPL 提供了名为组表的高性能外存存储格式,支持列存、有序、压缩、并行、分段、主键、索引等数据特征,信息密度和计算性能远高于普通格式,可极大提升加载速度,可充分释放高性算法的能力。SPL还提供了名为内表的高性能内存存储格式,支持内存压缩,可以与组表方便地互转,允许将更多的数据加载到内存,进行高性能内存计算。参考《SPL 计算性能系列测试:TPCH》
SPL支持温度分层和冷热路由。很多流数据产生的速度很快,计算涉及的数据又太多,必须先按温度(通常是时间)分层存储,并逐级溢出到下一层;计算时再按参数(通常是时间)路由到不同温度的数据,并进行混合计算。SPL支持并可简化这种计算结构,比如:计算涉及历史冷数据+近期温数据+实时热数据;秒/分钟级的热数据用内表存储,定期写入温数据;分钟/小时级的温数据用组表存储,定期写入冷数据;小时/天/月级的冷数据用RDB/数仓/数据湖存储,在必要时参与大跨度的混合计算。实际的项目结构更复杂分层更多,参考《超多位点高频时序数据的实时存储和统计》
SPL已开源免费,欢迎前往乾学院下载试用!