flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurations
        Configuration configuration = new Configuration();
        ExecutionConfig executionConfig = new ExecutionConfig();
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

        // Step 2: Create a new StreamGraph instance
        StreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);

        // Step 3: Add a source operator

        GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
        DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);
        SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());
        streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");

        // Step 4: Add a map operator to transform the data
        StreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);
        streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");

        // Step 5: Connect source and map operator
        streamGraph.addEdge(1, 2, 0);

        // Step 6: Add a sink operator to consume the data
        StreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.out.println(value);
                return value;
            }
        });
        SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);
        streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");

        // Step 7: Connect map and sink operator
        streamGraph.addEdge(2, 3, 0);
        streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamGraph.setMaxParallelism(1,1);
        streamGraph.setMaxParallelism(2,1);
        streamGraph.setMaxParallelism(3,1);
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);


        // Step 8: Convert StreamGraph to JobGraph
        JobGraph jobGraph = streamGraph.getJobGraph();


        // Step 9: Set up a MiniCluster for local execution
        MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
                .setNumTaskManagers(10)
                .setNumSlotsPerTaskManager(10)
                .build();
        MiniCluster miniCluster = new MiniCluster(miniClusterConfig);

        // Step 10: Start the MiniCluster
        miniCluster.start();

        // Step 11: Submit the job to the MiniCluster
        JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
        System.out.println("Job completed with result: " + result);

        // Step 12: Stop the MiniCluster
        miniCluster.close();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915586.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

D64【python 接口自动化学习】- python基础之数据库

day64 SQL-DQL-基础查询 学习日期&#xff1a;20241110 学习目标&#xff1a;MySQL数据库-- 133 SQL-DQL-基础查询 学习笔记&#xff1a; 基础数据查询 基础数据查询-过滤 总结 基础查询的语法&#xff1a;select 字段列表|* from 表过滤查询的语法&#xff1a;select 字段…

Unity插件-Smart Inspector 免费的,接近虚幻引擎的蓝图Tab管理

习惯了虚幻的一张蓝图&#xff0c;关联所有Tab &#xff08;才发现Unity&#xff0c;的Component一直被人吐槽&#xff0c;但实际上是&#xff1a;本身结构Unity 的GameObject-Comp结构&#xff0c;是好的不能再好了&#xff0c;只是配上 smart Inspector就更清晰了&#xff0…

2024 年Postman 如何安装汉化中文版?

2024 年 Postman 的汉化中文版安装教程

单元测试、集成测试、系统测试、验收测试、压力测试、性能测试、安全性测试、兼容性测试、回归测试(超详细的分类介绍及教学)

目录 1.单元测试 实现单元测试的方法&#xff1a; 注意事项&#xff1a; 2.集成测试 需注意事项&#xff1a; 实现集成测试的方法&#xff1a; 如何实现高效且可靠的集成测试&#xff1a; 3.系统测试 实现系统测试的方法: 须知注意事项&#xff1a; 4.验收测试 实现验…

MySQL 忘记 root 密码,使用跳过密码验证进行登录

操作系统版本&#xff1a;CentOS 7 MySQL 忘记 root 密码&#xff0c;使用跳过密码验证进行登录 修改 /etc/my.cnf 配置文件&#xff0c;在 [mysqld] 后面任意一行添加 skip-grant-tables vim /etc/my.cnf 重启 MySQL systemctl restart mysqld 登录 MySQL&#xff08;无 -…

3D Web渲染引擎HOOPS Communicator:助力企业打造定制化3D可视化产品的强大工具

HOOPS Communicator为开发人员提供了多样化的定制手段&#xff0c;使其在3D网页可视化领域保持领先地位。很多潜在客户都关心如何利用HOOPS Communicator将其打造成自己产品的独特解决方案。展示我们现有合作伙伴的成功案例正是分享此信息的最佳方式。 每家合作伙伴都在产品中…

【stablediffusion】阿里发布新ID保持项目EcomID, 可从单个ID参考图像生成定制的保ID图像,ComfyUI可使用。

今天&#xff0c;我们将向您介绍一款令人兴奋的更新——阿里发布的ID保持项目EcomID。这是一款基于Stable Diffusion技术的AI绘画工具&#xff0c;旨在为您提供一键式生成高质量保ID图像的便捷体验。无论您是AI绘画的新手还是专业人士&#xff0c;这个工具都能为您带来极大的便…

计算机网络(11)和流量控制补充

这一篇对数据链路层中的和流量控制进行详细学习 流量控制&#xff08;Flow Control&#xff09;是计算机网络中确保数据流平稳传输的技术&#xff0c;旨在防止数据发送方发送过多数据&#xff0c;导致接收方的缓冲区溢出&#xff0c;进而造成数据丢失或传输失败。流量控制通常…

【VLANPWN】一款针对VLAN的安全研究和渗透测试工具

关于VLANPWN VLANPWN是一款针对VLAN的安全研究和渗透测试工具&#xff0c;该工具可以帮助广大研究人员通过对VLAN执行渗透测试&#xff0c;来研究和分析目标VLAN的安全状况。该工具专为红队研究人员和安全学习爱好者设计&#xff0c;旨在训练网络工程师提升网络的安全性能&…

ES6代理和反射新特性,详细讲解

代理与反射 es6新增了代理和反射特性&#xff0c;这两个特性为开发者提供了拦截并向基本操作嵌入额外行为的能力。 代理基础 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta charset"UTF-8"&g…

MYSQL 精通索引【快速理解】

目录 1、什么是索引&#xff1f; 2、索引结构 1.为什么不使用二叉树呢&#xff1f; 2.B树数据结果 3.B树 4.Hash结构 3、索引语法 1.创建索引 2.查看索引 3.删除索引 4、SQL性能分析 1.SQL执行频次 2.慢查询日志 3.profile详情 4.EXPLAIN 5、索引规则 1.最左前缀法则 2.索…

光驱验证 MD5 校验和

步骤 1&#xff1a;在 Ubuntu 上打包文件并生成 MD5 校验和 打包文件 使用 tar 命令将文件夹打包成 tar.gz 文件&#xff1a; tar -czvf my_files.tar.gz /path/to/folder 生成 MD5 校验和 使用 md5sum 命令生成打包文件的 MD5 校验和&#xff1a; md5sum my_files.tar.g…

《网络数据安全管理条例》将于2025年1月1日起正式施行,从业者应如何解读?

2024年9月&#xff0c;国务院总理李强签署国务院令&#xff0c;公布了《网络数据安全管理条例》&#xff08;以下简称《条例》&#xff09;&#xff0c;该条例将于2025年1月1日起正式施行。 这一条例的出台&#xff0c;标志着我国在网络数据安全领域的管理迈上了新的台阶&#…

【MMIN】缺失模态想象网络用于不确定缺失模态的情绪识别

代码地址&#xff1a;https://github.com/AIM3RUC/MMIN abstract&#xff1a; 在以往的研究中&#xff0c;多模态融合已被证明可以提高情绪识别的性能。然而&#xff0c;在实际应用中&#xff0c;我们经常会遇到模态丢失的问题&#xff0c;而哪些模态会丢失是不确定的。这使得…

【Java Web】监听器类型及其使用

文章目录 监听器使用监听器类型ServletContextListenerHttpSessionListenerServletRequestListenerServletContextAttributeListenerHttpSessionAttributeListenerServletRequestAttributeListenerHttpSessionBindingListener 监听器&#xff08;Listener&#xff09;组件用于监…

conda创建 、查看、 激活、删除 python 虚拟环境

1、创建 python 虚拟环境 ,假设该环境命名为 “name”。 conda create -n name python3.11 2、查看 python 虚拟环境。 conda info -e 3、激活使用 python 虚拟环境。 conda activate name 4、删除 python 虚拟环境 conda remove -n name --all ​​ 助力快速掌握数据集…

LaTeX之四:如何兼容中文(上手中文简历和中文论文)、在win/mac上安装新字体。

改成中文版 如果你已经修改了.cls文件和主文档&#xff0c;但编译后的PDF仍然显示英文版本&#xff0c;可能有以下几个原因&#xff1a; 编译器问题&#xff1a;确保你使用的是XeLaTeX或LuaLaTeX进行编译&#xff0c;因为它们对Unicode和中文支持更好。你可以在你的LaTeX编辑器…

视频遥控打药履带机器人技术详解

视频遥控打药履带机器人技术是一种集成了遥控操作、视频监控和履带行走系统的现代化农业植保技术。以下是对该技术的详细解析&#xff1a; 一、技术概述 视频遥控打药履带机器人主要由履带行走系统、药箱、喷雾系统、遥控系统以及视频监控系统等部分组成。通过遥控操作&#…

BB1-NHS ester被用于将各种生物活性分子与蛋白质或其他生物大分子进行共轭连接,2082771-52-4

CAS号&#xff1a;2082771-52-4 中文名&#xff1a;BB1-琥珀酰亚胺酯&#xff0c;BB1-活性酯 英文名&#xff1a;BB1-NHS ester&#xff0c;或BB1-Succinimidyl Ester 分子式&#xff1a;C32H32N6O4 分子量&#xff1a;564.63 纯度&#xff1a;≥95% 供应商&#xff1a;陕…

MongoDB在现代Web开发中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 MongoDB在现代Web开发中的应用 MongoDB在现代Web开发中的应用 MongoDB在现代Web开发中的应用 引言 MongoDB 概述 定义与原理 发展…