基于Flink的流式计算可视化开发实践之配置->任务生成->任务部署过程

1. 引言

在我们大数据平台(XSailboat)的DataStudio模块中实现了基于Hive的业务流程开发和基于Flink的实时计算管道开发。

DataStudio是用来进行数据开发的,属于开发环境,另外还有任务运维模块,负责离线分析任务和实时计算任务在生产环境的部署和运维。在开发环境开发好的业务流程和计算管道可以提交/发布到生产环境。
DataStudio
整个大数据平台的可视化开发其实都是一种配置驱动的思想。在界面上开发编辑的都是一种配置数据,在部署运行的时候,后台会有程序将其转为可执行程序或解释执行配置。

2. Flink计算任务的配置化实现

在我的另一篇文章《Flink的DAG可视化开发实践》中表述了我们对Flink任务的一种模式抽象思路。有了这种模式抽象之后,让Flink计算任务适合于可视化开发,即把一种完全自由的代码开发模式,转变成了一种“输入”、“输出”、“状态存储器”、“个性化配置”、“基础配置”、“前置处理”、“后置处理”等阶段性、部件型的可配置项。

Flink有很多的算子,在原来的算子特性的基础之上,套用模式,并且对它的功能进行一些个性化设定,就可以形成自己的界面开发用的算子及配置。

例如“SQL增量查询”,就是我们自定义实现的一种在支持JDBC和SQL的数据库上,对一个SQL查询,可以基于一个具有增长特性的字段(例如自增长的字段或者最近编辑时间,记录型数据的创建时间等)进行定时增量查询和读取的源节点。
它的个性化配置界面如下:
在这里插入图片描述
这个节点的配置面板中的各个配置项暂且不介绍。从界面中可以看出SQL增量查询的功能配置可以转化成一个多层次结构的Bean。这个Bean是“SQL增量查询”节点实例信息的一个组成部分。
如此,一个计算管道DAG图,就可以用“计算管道(图)–>算子节点–>节点配置”这样的多层数据结构,并映射到成多张关系数据库表,并在其中存储。
这样就实现了算子的配置化及配置存储。

3. 配置转成Flink计算任务

既然是配置驱动的程序,基本都是“配置+解释执行器”的构成方式。配置是多样的,而解释执行器就一种实现。

Flink的计算任务开发,都会开发至少一个带main函数的Java类,在部署运行的时候上传jar包并指定main函数所在Java类。

一般的Java程序,也会开发一个带main函数的类,在MANIFEST.MF文件中指定或在命令行里面指定,这样JVM就知道程序的入口。Flink的计算任务也类似,只是不是用来告诉JVM的,而是用来告诉JobManager的。JobManager从这里进去,执行里面的代码,构建出计算任务图(有多阶段的图,可以不用细究)。构建出图之后,再将其拆分交给一个或多个TaskManager执行。

所以我们的main函数里面的逻辑只是构建了一个计算任务以及每一个该怎么执行的执行流图。这和反应式编程很类似,先是构建计算管路,再塞入数据执行。这和我们一般的函数主动式调用有所不同,那是调用即执行。

既然Flink的Job的main函数里面是构建计算管路,那么我们按照计算管道的DAG图及其配置,生成计算管路即可。即一手解读配置,一手按配置构建算子,组成计算任务。

我们界面上定义的每一个节点,都有专门的构建器,将节点转成相应的算子。所以配置转计算任务的过程是:

1. 提交计算任务的时候,通过参数指定运行的是那个计算管道。因为在平台里面有很多工作空间,每个工作空间里面有许多计算管道。
2. Flink的JobManager运行“执行解释器”的jar,进去其main函数。
3. 在main函数中,获取相关入参,其中就有计算管道id,然后调用其它服务提供的通过id获取计算管道及其配置信息的接口,获取计算管道的详细信息。
4. 解析计算管道详细信息,构建计算任务,将计算管道中的每个算子配置信息转换成Flink的算子。

下面贴出上面例举的SQL增量查询节点的构建器,以便更好理解我们是怎么做的。

... 省略
public class SI_SQLIncQuery_Builder extends StreamSourceNodeBuilder
{
	
	@Override
	public CPipeNodeType getNodeType()
	{
		return CPipeNodeType.SI_SQLIncQuery ;
	}

	@Override
	public void buildStreamFlow(JSONObject aNodeJo, IStreamFlowBuilder aStreamFlowBuilder , WorkContext aCtx) throws Exception
	{
		String nodeId = aNodeJo.optString("id") ;
		String nodeName = aNodeJo.optString("name") ;
		StreamExecutionEnvironment env = aStreamFlowBuilder.getExecutionEnvironment() ;
		JSONObject execConfJo = aNodeJo.optJSONObject("execConf") ;
		JSONObject baseConfJo = aNodeJo.optJSONObject("baseConf") ;
		String dsId = execConfJo.optString("dataSourceId") ;
		JSONObject dsJo = aNodeJo.pathJSONObject("dataSources", dsId);
		ConnInfo connInfo = JacksonUtils.asBean(dsJo.toJSONString() , ConnInfo.class) ;
		DataSource ds = new DataSource() ;
		ds.setId(dsId) ;
		ds.setName(dsJo.optString("name")) ;
		ds.setType(dsJo.optEnum("dataSourceType" , DataSourceType.class)) ;
		WorkEnv workEnv = WorkEnv.valueOf(aCtx.getWorkEnv()) ;
		if(WorkEnv.dev == workEnv)
			ds.setDevConnInfo(connInfo) ;
		else
			ds.setProdConnInfo(connInfo) ;
		
		
		// 查询密码
		WorkContext ctx = aStreamFlowBuilder.getWorkContext();
		KeyPair keyPair = RSAKeyPairMaker.getDefault().newOne().getValue();
		HttpClient client = aCtx.getGatewayClient();
		String cipherText = client.askForString(Request	.GET()
														.path(IApis_Gateway.sGET_DataSourcePassword)
														.queryParam("env", ctx.getWorkEnv())
														.queryParam("id", dsId)
														.queryParam("publicKey", RSAUtils.toString(keyPair.getPublic()))
														.queryParam("usage", "TDengine类型的下沉节点[" + nodeName + "]"));
		
		String password = RSAUtils.decrypt(keyPair.getPrivate(), cipherText);
		((ConnInfo_Pswd)connInfo).setPassword(password);
		
		int periodMs = execConfJo.optInt("periodMs") ;
		List<String> storeStateFields = execConfJo.optJSONArray("storeStateFields").toCollection(CS.arrayList() , XClassUtil.sCSN_String) ;
		DatasetDescriptor dsDesc = JacksonUtils.asBean(execConfJo.optJSONObject("dataset").toJSONString() , DatasetDescriptor.class) ;
		Dataset dataset = new Dataset() ;
        dataset.setDatasetDescriptor(dsDesc) ;
        dataset.setName(nodeName) ;
        dataset.setDataSourceId(ds.getId()) ;
        dataset.setWorkEnv(workEnv) ;
        dataset.setDataSourceType(ds.getType()) ;
		
		JSONArray outRowFieldsJa = execConfJo.optJSONArray("outRowFields") ;
		Assert.notNull(outRowFieldsJa , "没有找到outRowFields!%s" , execConfJo);
		ERowTypeInfo rowTypeInfo = JSONKit.toRowTypeInfo(outRowFieldsJa) ;
		
		// 水位线设置待实现
		WatermarkStrategy<Row> watermarkStrategy = null ;
		WaterMarkGenMethod waterMarkGenMethod = execConfJo.optEnum("waterMarkGenMethod" , WaterMarkGenMethod.class) ;
		if(waterMarkGenMethod == null)
		{
			watermarkStrategy = WatermarkStrategy.noWatermarks() ;
			waterMarkGenMethod = WaterMarkGenMethod.NoWatermarks ;
		}
		else
		{
			switch(waterMarkGenMethod)
			{
			case NoWatermarks :	
			{
				watermarkStrategy = WatermarkStrategy.noWatermarks() ;
			}
				break ;
			case MonotonousTimestamps:
				watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps() ;
				break ; 
			case BoundedOutOfOrderness:
				JSONObject waterMarkGenMethodConfJo = execConfJo.getJSONObject("waterMarkGenMethodConf") ;
				TimeUnit timeUnit = TimeUnit.valueOf(waterMarkGenMethodConfJo.optString("timeUnit")) ;
				int timeLen = waterMarkGenMethodConfJo.optInt("timeLen" , 0) ;
				Assert.isTrue(timeLen>0 , "时间长度必须大于0!") ;
				Duration duration = null ;
				switch(timeUnit)
				{
				case NANOSECONDS:
					duration = Duration.ofNanos(timeLen) ;
					break ;
				case MILLISECONDS:
					duration = Duration.ofMillis(timeLen) ;
					break ;
				case SECONDS:
					duration = Duration.ofSeconds(timeLen) ;
					break ;
				case MINUTES:
					duration = Duration.ofMinutes(timeLen) ;
					break ;
				case HOURS:
					duration = Duration.ofHours(timeLen) ;
					break ;
				case DAYS:
					duration = Duration.ofDays(timeLen) ;
					break ;
				case MICROSECONDS:
					duration = Duration.of(timeLen, ChronoUnit.MICROS) ;
					break ;
				}
				watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness(duration) ;
				break ;
			case MaxWatermarks:
				watermarkStrategy = WatermarkStrategy.forGenerator(new MaxWaterMarkGenSupplier()) ;
				break ;
			default:
				throw new IllegalStateException("未支持的水位线生成方法:"+waterMarkGenMethod) ;
			}
		}

		String timestampExpr = execConfJo.optString("timestampExpr") ;
		if(XString.isNotEmpty(timestampExpr) && waterMarkGenMethod != WaterMarkGenMethod.NoWatermarks)
		{
			watermarkStrategy = watermarkStrategy.withTimestampAssigner(new ExprTimestampAssigner(aCtx.getPipeArgs() 
					, timestampExpr, rowTypeInfo)) ;
		}
		mLogger.info("水位线生成策略是:{} , 时间表达式是:{}" , waterMarkGenMethod , timestampExpr) ;
		SQLIncQuerySourceFunction sourceFunc = new SQLIncQuerySourceFunction(storeStateFields, periodMs
				, dataset
				, ds) ;
		SingleOutputStreamOperator<Row> dss = env.addSource(sourceFunc , nodeName , rowTypeInfo)
				.assignTimestampsAndWatermarks(watermarkStrategy)		// 2023-01-08 这一句是必需的,否则不会产生水位线
				.name(nodeName)
				.uid(nodeId)
				;
		
		int parallelism = baseConfJo.optInt("parallelism", 1) ;
		if(parallelism <=0 )
			mLogger.info("指定的并发度为 {} , 小于1,将不设置,采用缺省并发度。" , parallelism) ;
		else
		{
			dss.setParallelism(parallelism) ;
		}
		
		aStreamFlowBuilder.putFlowPoint(nodeId , dss);
	}
}

3. 计算任务的部署

我们要构建的是一套可视化开发、部署平台,在我们的界面上就能完成开发、调试、部署的过程。我们的大数据平台底层基础设施有Hadoop,所以我们考虑使用Hadoop Yarn的容器部署Flink集群。要使用Yarn容器部署Flink计算任务,首先需要将程序包上传到Hadoop FS中。
在这里插入图片描述
我们这里把我们自己开发的扩展部分(ext_jars,解释执行器及其相关jar)和Flink的原生程序包(app,扩展了一些数据库驱动)分成两部分,在我们进行容器化部署的时候,会将其合并。

Flink集群在容器中以Session模式运行,一个Flink集群可以运行多个计算任务。我们给Flink集群增加了一个标签,以区分各个Flink集群。我们设定开发环境,一个工作空间只能运行一个Flink集群,用来开发调试。一个工作空间,在生产环境可以运行1个或1个以上的不同标签的集群。在生产环境部署的时候,需要通过标签指定部署到那个集群,如果标签不存在,就会部署一个新的指定标签的集群,并在上面部署计算任务。
计算管道部署

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/872493.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

30岁程序员的焦虑:转行还是继续死磕?现在什么方向更有前景?

最适合转入AI大模型的莫过于程序员和在读大学生了吧。 对于程序员来说&#xff0c;码农之路并不是一帆风顺。对于每一个入行IT业的社会青年来说&#xff0c;谁不是抱着想要成为最高峰的技术大咖或者跃进管理岗的小目标&#xff1f; 然而往往更多的人并非互联网吹捧的如此耀眼…

低代码平台:加速企业制造业数字化转型的新引擎

近期&#xff0c;国家发布了中小企业数字化转型试点城市的政策&#xff0c;旨在通过先行先试&#xff0c;探索支持制造业特别是汽车制造行业数字化转型的有效模式。这一政策的出台&#xff0c;为汽车制造企业的数字化转型提供了强有力的政策支持和方向指引&#xff0c;标志着汽…

【论文速读】| SEAS:大语言模型的自进化对抗性安全优化

本次分享论文&#xff1a;SEAS: Self-Evolving Adversarial Safety Optimization for Large Language Models 基本信息 原文作者: Muxi Diao, Rumei Li, Shiyang Liu, Guogang Liao, Jingang Wang, Xunliang Cai, Weiran Xu 作者单位: 北京邮电大学, 美团 关键词: 大语言模…

vue.js项目实战案例详细源码讲解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; 为帮助大家更好地掌握Vue.js项目的开发流程&#xff0c;我将为你讲解一个完整的Vue.js实战案例&#xff0c;并提供详细的源码解析。这个案例将涵盖从项目创建到实现各种功能模块的全过程&#xff0c;适合用于…

基于空间结构光场照明的三维单像素成像

单像素成像是一种新兴的计算成像技术。该技术使用不具备空间分辨能力的单像素探测器来获取目标物体或场景的空间信息。单像素探测器具有高的时间分辨率、光探测效率和探测带宽&#xff0c;因此单像素光学成像技术在散射、弱光等复杂环境下相较于传统面阵成像技术展现了很大优势…

面试题:软件测试缺陷产生的原因有哪些?

软件缺陷产生的原因多种多样&#xff0c;一般可能有以下几种原因&#xff1a; 1.需求表述、理解、编写引起的错误。 2.系统架构设计引起的错误。 3.开发过程缺乏有效的沟通及监督&#xff0c;甚至没有沟通或监督。 4.程序员编程中产生的错误。 5.软件开发工具本身隐藏的问…

哨兵排序算法

代码展示 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h>#define MAXSIZE 20 //直接排序 typedef struct {int r[MAXSIZE 1];int length; } SqList; int InsertSort(SqList* L) {int i, j;for (i 2; i < L->length; i){if (L-…

mysql自增主键插入后返回id与实际插入id不同

加入这一段即可 GeneratedValue(strategy GenerationType.IDENTITY)

张飞硬件10-TVS管篇笔记

TVS管的原理 TVS或称瞬变电压抑制二极管&#xff0c;是在二极管工艺基础上发展起来的新产品&#xff0c;其电路符号和普通稳压管相同&#xff0c;外形也与普通二极管无异。当TVS管两端经受瞬间的高能量冲击时&#xff0c;它能以极高的速度将其阻抗骤然降低&#xff0c;同时吸收…

el-table 单元格,双击编辑

el-table 单元格&#xff0c;双击编辑 实现效果 代码如下 <template><el-table :data"tableData" style"width: 100%"><el-table-column prop"name" label"姓名" width"180"><template slot-scope&q…

【机器学习】梯度提升和随机森林的概念、两者在python中的实例以及梯度提升和随机森林的区别

引言 梯度提升&#xff08;Gradient Boosting&#xff09;是一种强大的机器学习技术&#xff0c;它通过迭代地训练决策树来最小化损失函数&#xff0c;以提高模型的预测性能 随机森林&#xff08;Random Forest&#xff09;是一种基于树的集成学习算法&#xff0c;它通过组合多…

Java队列详细解释

队列 一、什么是队列&#xff08;Queue&#xff09; java队列是一种线性数据结构&#xff0c;它的特点是先进先出。在队列中&#xff0c;元素的添加&#xff08;入队&#xff09;操作在队尾进行&#xff0c;而元素的移除&#xff08;出队&#xff09;操作则在队头进行。因此&a…

最近大模型最火的就业方向有哪些?

在2023和2024年&#xff0c;大语言模型的发展迎来了绝对风口&#xff0c;吸引了大量创业者和投资者。然而&#xff0c;经过一年的发展&#xff0c;许多公司已经销声匿迹。那么&#xff0c;未来大模型方向上还有哪些可以继续发展的方向呢? 基座大模型预训练 现状 - 展现出“胜…

TikTok Live与跨境电商的深度融合:直播带货引领品牌出海

在TikTok Live的应用中&#xff0c;品牌能够利用这一互动性极强的功能开辟新的销售渠道&#xff0c;推动全球业务的增长。本文Nox聚星将和大家探讨TikTok Live如何与跨境电商相结合&#xff0c;分析其应用场景。 一、TikTok Live与跨境电商的结合优势 庞大的用户基础&#xff…

使用 OpenCV 和 NumPy 进行图像处理:HSV 范围筛选实现PS抠图效果

使用 OpenCV 和 NumPy 进行图像处理&#xff1a;HSV 范围筛选实现PS抠图效果 在计算机视觉和图像处理领域&#xff0c;OpenCV 是一个非常强大的库&#xff0c;能够帮助我们执行各种图像操作。在这篇博客中&#xff0c;我们将通过一个简单的示例演示如何使用 OpenCV 和 NumPy 来…

machine learning - 2

泛化误差 也可以认为是预测时的误差。 训练误差 并不是越小越好&#xff0c;太小会过拟合。 获得测试集合的方法&#xff1a; 1&#xff09;&#xff1a; 2&#xff09;&#xff1a;例如&#xff1a;k-折交叉验证法&#xff0c; 就的每k个数据取一个座位测试集 3&#xff0…

1.39TB高清卫星影像更新(WGS84坐标投影)

最近对WGS84版的高清卫星影像数据进行了一次更新&#xff0c;并基于更新区域生成了相应的接图表。 1.39TB高清卫星影像更新 本次数据更新了1576个离线包&#xff0c;共1.39TB大小&#xff0c;并全部生成了更新接图表。 更新接图表范围 更新接图表由每一个离线包文件的范围构…

rancher upgrade 【rancher 升级】

文章目录 1. 背景2. 下载3. 安装4. 检查5. 测试5.1 创建项目5.2 创建应用5.3 删除集群5.4 注册集群 1. 背景 rancher v2.8.2 升级 v2.9.1 2. 下载 下载charts helm repo add rancher-latest https://releases.rancher.com/server-charts/latest helm repo update helm fetc…

支付宝开放平台-开发者社区——AI 日报「9 月 6 日」

1 3天把Llamaill成Mamba&#xff0c; 性能不降&#xff0c;推理更快&#xff01; 新智元 丨阅读原文 康奈尔和普林斯顿的研究人员成功将大型Transformer模型Llama提炼成Mamba模型&#xff0c;并设计了新的推测解码算法&#xff0c;显著提高了模型的推理速度。研究团队采用了渐…

Android OpenGLES开发:EGL环境搭建

努力&#xff0c;不是为了要感动谁&#xff0c;也不是要做给哪个人看&#xff0c;而是要让自己随时有能力跳出自己厌恶的圈子&#xff0c;并拥有选择的权利&#xff0c;用自己喜欢的方式过一生&#xff01; EGL是什么&#xff1f; 谈到openGL开发我们就不得不说EGL&#xff0c…