【入门Flink】- 02Flink经典案例-WordCount

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
        String filePath = Objects.requireNonNull(
               BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataSource<String> lineDS = env.readTextFile(filePath);

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
                new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        String filePath = Objects.requireNonNull(
                StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataStreamSource<String> lineStream = env.readTextFile(filePath);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/116306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

根据Word模板,使用POI生成文档

突然想起来有个小作业&#xff1a;需要根据提供的Word模板填充数据。这里使用POI写了一个小demo验证下。 测试用模板&#xff1a; 执行结果 1.引入依赖坐标 <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId&…

Emscripten + CMakeLists.txt 将 C++ 项目编译成 WebAssembly(.wasm)/js,并编译 Html 测试

背景&#xff1a;Web 端需要使用已有的 C 库&#xff08;使用 CMake 编译&#xff09;&#xff0c;需要将 C 项目编译成 WebAssembly(.wasm) 供 js 调用。 上篇文章《Mac 上安装 Emscripten》 已讲解如何安装配置 Emscripten 环境。 本篇文章主要讲解如何将基于 CMakeLists 配…

CSS解决div行变块 ➕ CSS解决“table中的td文字溢出控制显示字数,显示省略号”的问题

CSS解决div行变块 ➕ CSS解决“table中的td文字溢出控制显示字数&#xff0c;显示省略号”的问题 1. div变块级设置1.1 先看不设置的效果1.2 再看设置之后的效果 2. 解决 table 中 td 内容过长问题2.1 CSS实现&#xff08;文字溢出控制td显示字数&#xff0c;显示省略号&#x…

Bytedance揭秘OpenAI大模型: GPT-3到GPT-4进化路径

文章目录 探秘GPT-3到GPT-4进化之路1、SFT&#xff1a;早期GPT进化的推动者2、RLHF和SFT&#xff1a;编码能力提升的功臣3、代码加入预训练&#xff0c;对推理帮助最大4、“跷跷板”现象 论文地址项目链接Reference GPT-Fathom: Benchmarking Large Language Models to Deciphe…

基于Qt Designer 操作教程

​本章将简介使用 Qt Creator 里自带的 Qt Designer,使用 Qt Designer 比较方便的构造 UI 界面。特点是方便布局,比较形象。 ## 使用 UI 设计器开发程序 在这小节里我们继续学习如何使用 Qt Designer 开发程序,Qt Designer 是属于 Qt Creator 的一个功能而已,大家不要搞混…

window10 mysql8.0 修改端口port不生效

mysql的默认端口是3306&#xff0c;我想修改成3307。 查了一下资料&#xff0c;基本上都是说先进入C:\Program Files\MySQL\MySQL Server 8.0这个目录。 看看有没有my.ini&#xff0c;没有就新建。 我这里没有&#xff0c;就新建一个&#xff0c;然后修改port&#xff1a; […

使用Renesas Flash Programmer(RFP)修改Option Byte及刷写程序

文章目录 前言配置Project修改OPBT程序刷写其他操作总结 前言 瑞萨RH850 P1H-C系列&#xff0c;在之前不知道OPBT对程序影响这么大&#xff0c;导致意外操作了其中的寄存器&#xff0c;板子锁死&#xff0c;不能再次刷写程序。本文记录下使用RFP工具刷写Option Byte需要注意的…

最新Ai系统ChatGPT程序源码+以图生图+Dall-E2绘画+支持GPT4+Midjourney绘画

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

怎样做好金融投资翻译

我们知道&#xff0c; 金融投资翻译所需的译文往往是会议文献、年终报表、信贷审批等重要企业金融资料&#xff0c;其准确性事关整个企业在今后一段时期内的发展战略与经营成效。尤其像年报&#xff0c;对于上市公司来说更是至关重要的。那么&#xff0c;怎样做好金融投资翻译&…

【漏洞复现】Apache Log4j Server 反序列化命令执行漏洞(CVE-2017-5645)

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞扫描3、漏洞验证 1.5、深度利用1、反弹Shell 说明内容漏洞编号CVE-2017-5645漏洞名称Log4j Server …

算法:Java构建二叉树并迭代实现二叉树的前序、中序、后序遍历

先自定义一下二叉树的类&#xff1a; // Definition for a binary tree node. public class TreeNode {int val;TreeNode left;TreeNode right;TreeNode() {}TreeNode(int val) { this.val val; }TreeNode(int val, TreeNode left, TreeNode right) {this.val val;this.left…

【C语言】扫雷游戏的一步一步的实现

文章目录 一、扫雷游戏分析和设计1.1 扫雷游戏的功能说明1.2 游戏的分析和设计1.2.1 数据结构的分析1.2.2 ⽂件结构设计 二、扫雷游戏代码实现总结 一、扫雷游戏分析和设计 1.1 扫雷游戏的功能说明 • 使⽤控制台实现经典的扫雷游戏 • 游戏可以通过菜单实现继续玩或者退出游…

HTTPS的加密方式超详细解读

在了解https的加密方式之前&#xff0c;我们需要先行了解两个特别经典的传统加密方式&#xff1a; 1、对称加密 1.1、定义 需要对加密和解密使用相同密钥的加密算法。所谓对称&#xff0c;就是采用这种加密方法的双方使用方式用同样的密钥进行加密和解密。密钥是控制加密及解…

运维知识点-Docker从小白到入土

Docker从小白到入土 安装问题-有podmanCentos8使用yum install docker -y时&#xff0c;默认安装的是podman-docker软件 安装docker启动dockeryum list installed | grep dockeryum -y remove xxxx安装Docker安装配置下载安装docker启动docker&#xff0c;并设置开机启动下载所…

企业级SpringBoot单体项目模板 —— 使用 AOP + JWT实现登陆鉴权

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;SpringBoot、企业级、项目模板☀️每日 一言&#xff1a;没学会走就学跑从来都不是问题&#xff0c;要问问自己是不是天才&#xff0c;如果不是&#xff0c;那就要一步步来 文章目录 使用JWT实现…

C语言之动态内存管理实现通讯录(完整版)

我们在之前的博客中写过静态版的通讯录&#xff0c;我们今天来写一个动态版的&#xff0c;不需要规定它到底需要多大空间&#xff0c;只要还有内存&#xff0c;我们都可以存放的下&#xff01;同时&#xff0c;函数实现原理&#xff0c;我在通讯录静态版的博客里做了详细的讲解…

当Dubbo遇到高并发:探究流量控制解决方案

系列文章目录 面试Dubbo &#xff0c;却问我和Springcloud有什么区别&#xff1f; 超简单&#xff0c;手把手教你搭建Dubbo工程&#xff08;内附源码&#xff09; 【收藏向】从用法到源码&#xff0c;一篇文章让你精通Dubbo的SPI机制 Dubbo最核心功能——服务暴露的配置、使用…

VSCode 设置平滑光标

1.点击左下角的设置按钮&#xff0c;再点击设置 2.点击文本编辑器&#xff0c;点击光标&#xff0c;勾选控制是否启用平滑插入动画。 3.随便打开一个文件&#xff0c;上下左右移动光标时&#xff0c;会发现非常的流畅。 原创作者&#xff1a;吴小糖 创作时间&#xff1a;2023…

华为RS设备状态及接口配置命令

1、查看硬件信息 ①查看序列号 查看整机序列号 display esn display sn ②、查看功率 电源功率 display power 查看光模块功率 display transceiver interface gigabitethernet 1/0/0 verbose ③、查看风扇 display fan ④、查看温度 display temperature all ⑤、查看硬…

安全防御——二、ENSP防火墙实验学习

安全防御 一、防火墙接口以及模式配置1、untrust区域2、trust区域3、DMZ区域4、接口对演示 二、防火墙的策略1、定义与原理2、防火墙策略配置2.1 安全策略工作流程2.2 查询和创建会话 3、实验策略配置3.1 trust-to-untrust3.2 trust-to-dmz3.3 untrust-to-dmz 三、防火墙的区域…