Flink学习(七)-单词统计

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;

//2nd 设置流
DataSource xxxDS=env.xxxx();

//3rd 设置转换
Xxx transformation =xxxDS.xxxx();

//4th 设置sink
transformation.print();

//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {
        //1,创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2,获取输入流
        DataSource<String> lineDS = env.readTextFile("input/word.txt");
        //3,处理数据
        FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                //3.1 分隔字符串
                String[] values = value.split(" ");
                //3.2 汇总统计
                for (String word : values) {
                    Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);
                    collector.collect(wordTuple);
                }
            }
        });
        //4,按单词聚合
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);
        //5,分组内聚合
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);

        //6,输出结果
        sum.print();
    }
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> temp = Tuple2.of(word, 1);
                    collector.collect(temp);
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);
        sum.print();
        env.execute();

    }
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                ).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        sum.print();

        env.execute();

    }
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/570977.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

简述MASM宏汇编

Hello , 我是小恒不会java。今天写写x86相关底层的东西 寄存器 8086由BIU和EU组成 8088/8086寄存器有14个。8通用&#xff0c;4段&#xff0c;1指针&#xff0c;1标志 8个通用寄存器&#xff1a;这些寄存器可以用来存储任意类型的数据&#xff0c;包括整数、地址等。8086有8个…

Modbus转Profinet网关接电表与工控机通讯

Modbus转Profinet网关&#xff08;XD-MDPN100/300&#xff09;的主要功能是实现Modbus协议和Profinet协议之间的转换和通信。Modbus转Profinet网关集成了Modbus和Profinet两种协议&#xff0c;支持Modbus RTU主站/从站&#xff0c;并可以与RS485接口的设备&#xff0c;如变频器…

找对方法,单位信息宣传工作向媒体投稿其实也简单

曾经,作为一名肩负单位信息宣传重任的我,每当面对那堆叠如山的稿件与闪烁不定的电脑屏幕,心中总会涌起一股无尽的焦虑与疲惫。尤其在向媒体投稿这个环节,我仿佛陷入了一个难以挣脱的漩涡,邮箱投稿的艰辛、审核的严苛、出稿的迟缓以及成功发表的少之又少,如同一座座无形的大山压…

力扣面试 150二叉搜索树迭代器 中序遍历 栈模拟递归 步骤拆分

Problem: 173. 二叉搜索树迭代器 思路 &#x1f469;‍&#x1f3eb; 三叶 复杂度 时间复杂度: O ( 1 ) O(1) O(1) 空间复杂度: O ( h ) O(h) O(h) Code class BSTIterator { Stack<TreeNode> d new Stack<>();public BSTIterator(TreeNode root){dfsLe…

书生·浦语大模型第二期实战营第七节-OpenCompass 大模型评测实战 笔记和作业

来源&#xff1a; 视频教程&#xff1a;https://www.bilibili.com/video/BV1Pm41127jU/?spm_id_from333.788&vd_sourcef4a51f7f5a63e756f73ad0dff318c1a3 文字教程&#xff1a;https://github.com/InternLM/Tutorial/blob/camp2/opencompass/readme.md 作业来源&#x…

day12 过一遍Nestjs框架(java转ts全栈/3R教室)

介绍&#xff1a;NestJS是Ts技术栈的后端框架&#xff0c;相当于Java中的springboot。 学习方法&#xff1a;与java技术体系进行对比学习。学习目标&#xff1a;nest相关知识也是挺多&#xff0c;但对比学spring的时候&#xff0c;大部分在项目生产中都是套路化的&#xff0c;大…

SpringMVC基础篇(二)

文章目录 1.Postman1.基本介绍Postman是什么&#xff1f; 2.Postman快速入门1.Postman下载点击安装自动安装在系统盘 2.基本操作1.修改字体大小2.ctrl “” 放大页面3.进入创建请求界面 2.需求分析3.具体操作4.保存请求到文件夹中1.点击保存2.创建新的文件夹3.保存成功 3.使用…

Linux系统IO

Linux系统中的IO函数主要包括两大类&#xff1a;标准C库中的函数和Linux系统调用。这些函数可以用于文件操作、网络通信、设备控制等多种IO任务。以下是Linux系统中常用的IO函数和系统调用的概述&#xff1a; 标准C库IO函数 这些函数是高级的、封装好的&#xff0c;并且与操作…

一些好听且有心意的英文全名Burwood新南威尔士州伯伍德喝酒上脸就是乙醛中毒1. 康奈尔大学官宣恢复标化要求2. 香港城市大学(东莞)正式设立!

目录 一些好听且有心意的英文全名 Burwood新南威尔士州伯伍德 喝酒上脸就是乙醛中毒 1. 康奈尔大学官宣恢复标化要求 2. 香港城市大学&#xff08;东莞&#xff09;正式设立&#xff01; 一些好听且有心意的英文全名 在选择好听且有意义的英文全名时&#xff0c;我们可…

[MoeCTF-2022]Sqlmap_boy

title:[MoeCTF 2022]Sqlmap_boy 查看网页源代码&#xff0c;得到提示 <!-- $sql select username,password from users where username".$username." && password".$password.";; --> 用万能密码绕过&#xff0c;用’"闭合 爆数据库…

【NLP】大语言模型基础之GPT

大语言模型基础之GPT GPT简介1. 无监督预训练2. 有监督下游任务微调 GPT-4体系结构1. GPT-4的模型结构2. GPT-4并行策略3. GPT-4中的专家并行GPT-4的特点 参考连接 以ELMo为代表的动态词向量模型开启了语言模型预训练的大门&#xff0c;此后&#xff0c;出现了以GPT和BERT为代表…

Simulink从0搭建模型03-Enabled Subsystem 使能子系统

参考博客 b站视频 【Simulink 0基础入门教程 P4 使能子系统 Enabled Subsystem 的使用介绍】 个人听了这个博主的视频风格觉得很适合我入门学习&#xff0c;讲得很清楚。 另外&#xff0c;视频里面教得很详细了&#xff0c;我也不会再详细写怎么打开创建等步骤&#xff0c;跟着…

年如何在不丢失数据的情况下解锁锁定的 Android 手机?

当您忘记密码、PIN 码或图案并且想要解锁 Android 手机时&#xff0c;您可能会丢失 Android 手机上的数据。但您无需再担心&#xff0c;因为在这里&#xff0c;我们想出了几种解锁锁定的 Android 手机而不丢失数据的方法。 方法 1. 使用 Android Unlock 解锁锁定的 Android 且不…

拿捏 顺序表(1)

目录 1. 顺序表的分类2. 顺序表实现3. 顺序表实现完整代码4. 总结 前言: 一天xxx想存储一组数据, 并且能够轻松的实现删除和增加, 此时数组大胆站出, 但是每次都需要遍历一遍数组, 来确定已经存储的元素个数, 太麻烦了, 于是迎来了顺序表不屑的调侃: 数组你不行啊… 顺序表是一…

MSE实现全链路灰度实践

技术架构包括以下基础设施和云服务&#xff1a; 1个地域&#xff1a;ACK集群、微服务应用、MSE实例均部署在同一地域下。 1个专有网络VPC&#xff1a;形成云上私有网络&#xff0c;确保核心云资源的网络环境&#xff0c;如容器服务ACK、微服务引擎MSE。 ACK集群&#xff1a;简单…

升级 jQuery:努力打造健康的 Web 生态

jQuery 对 Web 的影响始终是显而易见的。当 jQuery 在 2006 年首次推出时&#xff0c;几乎立即成为 Web 开发人员的基本工具。它简化了 JavaScript 编程&#xff0c;使操作 HTML 文档、处理事件、执行动画等变得更加容易。从那时起&#xff0c;它在 Web 标准和浏览器功能的演变…

idea中打印日志不会乱码,但是部署到外部tomcat中乱码了。

问题&#xff1a;如图Tomcat乱码&#xff0c;而且启动时的系统日志不会乱码&#xff0c;webapp中的打印日志才乱码。 idea中的情况如下&#xff1a;正常中文展示。 问题分析&#xff1a;网上分析的原因是Tomcat配置的字符集和web应用的字符集不匹配&#xff0c;网上集中的解决…

Springboot的日常操作技巧

文章目录 1、自定义横幅2、容器刷新后触发方法自定义3、容器启动后触发方法自定义**CommandLineRunner**ApplicationRunner 不定时增加 参考文章 1、自定义横幅 简单就一点你需要把banner.text放到classpath 路径下 &#xff0c;默认它会找叫做banner的文件&#xff0c;各种格式…

“奇观”初见,祁门竞赛上海正式发

布给上下山水、左右人文的“徽州”&#xff0c;另起一笔“烟火” 城市更新从空间营造进入地方创生。何为地方&#xff1f;如何创生&#xff1f;其关键也许在于“持续打开”&#xff0c;源源不断吸引新生力量参与&#xff0c;从在地文化中生长出创作生态。 镶嵌于长三角腹地&a…

Ubuntu Pycharm安装

下载PyCharm&#xff0c;https://www.jetbrains.com/pycharm/download/?sectionlinux 然后按照下图执行安装&#xff1a; 安装的时候可能出现的问题&#xff1a; 问题1&#xff1a;No JDK found. Please validate either PYCHARM_JDK, JDK_HOME or JAVA_HOME environment var…