Flink_DataStreamAPI_执行环境

DataStreamAPI_执行环境

  • 1创建执行环境
    • 1.1getExecutionEnvironment
    • 1.2createLocalEnvironment
    • 1.3createRemoteEnvironment
  • 2执行模式(Execution Mode)
  • 3触发程序执行

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

在这里插入图片描述

1创建执行环境

我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

1.1getExecutionEnvironment

最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.2createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

1.3createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
  		.createRemoteEnvironment(
    		"host",                   // JobManager主机名
    		1234,                     // JobManager进程端口号
   			"path/to/jarFile.jar"  // 提交给JobManager的JAR包
		);

2执行模式(Execution Mode)

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式(Batch)
专门用于批处理的执行模式。

自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
批执行模式的使用。主要有两种方式:
(1)通过命令行配置
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

(2)通过代码配置
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

3触发程序执行

需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915212.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

46.第二阶段x86游戏实战2-拆解自动打怪流程

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 本次游戏没法给 内容参考于:微尘网络安全 本人写的内容纯属胡编乱造,全都是合成造假,仅仅只是为了娱乐,请不要…

解决C盘空间不足的三种方案

方案一:网上盛传的C盘磁盘碎片整理🧩(原理:将分散的文件片段整理到相邻的磁盘区域,减少文件的碎片化程度)(效果不明显) 方案二:把其他盘的空间给C盘 💽(效果显著&#xf…

同一套SDK 兼容第二块板卡

尽可能分开写,避免兼容性变差

计算机网络高频八股文面试题及参考答案

请简述 TCP 和 UDP 的区别? TCP(传输控制协议)和 UDP(用户数据报协议)是两种不同的传输层协议,它们有以下区别。 从连接方式上看,TCP 是面向连接的协议。在通信之前,需要通过三次握手…

前缀和算法习题篇(上)

1.一维前缀和 题目描述: 解法一:暴力解法:模拟 时间复杂度是O(n*q),会超时。 解法二:前缀和解法:快速求出数组中某一个连续区间的和 快速是指O(1),前缀和思想可把时间复杂度可降到O(q)。 算法思路: 先预处…

uniapp路由与页面跳转详解:API调用与Navigator组件实战

UniApp路由与页面跳转详解:API调用与Navigator组件实战 路由 uniapp页面路由为框架统一管理,开发者需要在page.json里面配置每个路由页面的路径及页面样式。 路由跳转 uniapp有两种页面路由跳转方式,调用API跳转和navigator组件跳转。 调…

linux-DNS解析

dns解析 dns:域名系统,将域名和ip地址互相映射的一个分布式的数据库,方便用户访问互联网。 ip地址:是所有设备和网站在互联网上的唯一地址,通信一定是ip和ip之间的通信。 dns解析:根据域名在互联网当中找…

Playwright 快速入门:Playwright 是一个用于浏览器自动化测试的 Node.js 库

Playwright 是一个用于浏览器自动化测试的 Node.js 库,它支持 Chromium, Firefox 和 WebKit 浏览器引擎。Playwright 提供了一套强大的 API 来进行网页自动化测试,包括页面导航、元素选择、表单提交等操作,并且能够处理现代网页中的异步加载内…

【maven踩坑】一个坑 junit报错 但真正导致这个的不是junit的原因

目录 事件起因环境和工具操作过程解决办法结束语 事件起因 报错一: Internal Error occurred. org.junit.platform.commons.JUnitException: TestEngine with ID junit-vintage failed to discover tests报错二: Internal Error occurred. org.junit.pl…

ONNX: export failure: DLL load failed while importing _message: 找不到指定的程序。

ONNX: export failure 问题其他解决快速解决 问题 使用pytorch导出onnx(Open Neural Network Exchange)模型,结果使用conda安装完onnx之后,问题就出现了 ONNX: export failure: DLL load failed while importing _message: 找不到…

Redis做分布式锁

(一)为什么要有分布式锁以及本质 在一个分布式的系统中,会涉及到多个客户端访问同一个公共资源的问题,这时候我们就需要通过锁来做互斥控制,来避免类似于线程安全的问题 因为我们学过的sychronized只能对线程加锁&…

用Tokio掌握Rust异步编程

在Rust中构建可伸缩且高效的应用程序时,异步编程必不可少。异步编程能显著提高性能,让代码在不阻塞的情况下并发处理多个任务。在本教程中,我们将探索Tokio,介绍异步编程原理及应用场景,并逐步带你编写异步代码。 Toki…

推荐一款3D建模软件:Agisoft Metashape Pro

Agisoft Metashape Pro是一款强大的多视点三维建模设计辅助软件,Agisoft Metashape是一款独立的软件产品,可对数字图像进行摄影测量处理,并生成3D空间数据,用于GIS应用,文化遗产文档和视觉效果制作,以及间接…

记录日志中logback和log4j2不能共存的问题

本文章记录设置两个日志时候,控制台直接报错 标黄处就是错误原因:1. SLF4J(W):类路径包含多个SLF4J提供程序。 SLF4J(W):找到提供程序[org.apache.logging.slf4j. net]。 SLF4J(W):找到提供程序[ch.qos.log .classi…

【论文阅读】Virtual Compiler Is All You Need For Assembly Code Search

阅读笔记:Virtual Compiler Is All You Need For Assembly Code Search 1. 研究背景 逆向工程:逆向工程需要在庞大的二进制文件中快速定位特定功能(例如恶意行为)。传统方法依赖于经验和启发式算法,效率低下。汇编代码搜索:通过自然语言搜索汇编代码功能,能够更高效地处…

洛古---越狱问题【快速幂】

今天和大家讲一个洛古的算法题,我觉得还是比较有含金量的,今天给大家分享一下 题目描述 监狱有 𝑛n个房间,每个房间关押一个犯人,有 𝑚 种宗教,每个犯人会信仰其中一种。如果相邻房间的犯人的宗…

Python3.11.9+selenium,选择证书用多线程+键盘enter解决

Python3.11.9+selenium,选择证书用多线程+键盘enter解决 1、遇到问题:弹出证书选择,无法点击确定 import pyautogui pyautogui.press(enter) 键盘enter也无法点击 2、解决办法:用多线程解决同时执行click链接和Enter点击证书的确定 1、点击操作 # # 通过文本链接文本…

1、使用vscode+eide+stm32cubeMx开发stm32

步骤1:在vscode中安装如下的插件 步骤2:点击Embedded IDE,点击“新建项目”-----空项目-----Cortex-M项目。 步骤3:输入项目名,回车后会要制定保存路径,此时就是一个已项目名命名的文件夹。 步骤4&#xff…

网站小程序app怎么查有没有备案?

网站小程序app怎么查有没有备案?只需要官方一个网址就可以,工信部备案查询官网地址有且只有一个,百度搜索 "ICP备案查询" 找到官方gov.cn网站即可查询! 注:网站小程序app备案查询,可通过输入单位…

SpringCloud篇(注册中心 - Nacos)

目录 一、Nacos安装指南 1. Windows安装 1.1. 下载安装包 1.2. 解压 1.3. 端口配置 1.4. 启动 1.5. 访问 2. Linux安装 2.1. 安装JDK 2.2. 上传安装包 2.3. 解压 2.4. 端口配置 2.5. 启动 3. Nacos的依赖 二、Nacos注册中心的入门使用 1. 认识和安装Nacos 2. 服…