Flink WordCount实践

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	<properties>
        <flink.version>1.17.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

</mirrors>上面一行添加阿里云仓库镜像

	<mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>        
    </mirror>

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world
hello java
hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件读取数据 按行读取
        DataSource<String> lineDS = env.readTextFile("data/words.txt");

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word,1L));
                }
            }
        });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                        String[] words = line.split(" ");

                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

运行结果

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");

                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/530215.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GitHub 仓库 (repository) Watch - Star - Fork - Follow

GitHub 仓库 [repository] Watch - Star - Fork - Follow References 眼睛图标旁边写着 Watch 字样。点击这个按钮就可以 Watch 该仓库&#xff0c;今后该仓库的更新信息会显示在用户的公开活动中。Star 旁边的数字表示给这个仓库添加 Star 的人数。这个数越高&#xff0c;代表…

基于ubuntu22.04系统安装nvidia A100驱动与NVLink启用

1、官方仓库 针对驱动包下载认准nvidia官网 dpkg -i nvidia-driver-local-repo-ubuntu2204-550.54.15_1.0-1_amd64.deb apt update apt search nvidia-driver-5502、安装 根据步骤1apt search nvidia-driver-550查出版本&#xff1a;此驱动包封在nvidia-driver-local-repo-ub…

大语言模型的多模态应用(多模态大语言模型的相关应用)

探索大语言模型在多模态领域的相关研究思路

【黑马头条】-day07APP端文章搜索-ES-mongoDB

文章目录 今日内容1 搭建es环境1.1 拉取es镜像1.2 创建容器1.3 配置中文分词器ik1.4 测试 2 app文章搜索2.1 需求说明2.2 思路分析2.3 创建索引和映射2.3.1 PUT请求添加映射2.3.2 其他操作 2.4 初始化索引库数据2.4.1 导入es-init2.4.2 es-init配置2.4.3 导入数据2.4.4 查询已导…

idea中MySQL数据库的配置

在IntelliJ IDEA中配置数据库可以通过以下步骤进行&#xff1a; 打开IntelliJ IDEA&#xff0c;在菜单栏中选择"View" -> "Tool Windows" -> "Database"&#xff0c;打开Database工具窗口。 在Database工具窗口上方&#xff0c;点击"…

springboot+vue2+elementui+mybatis- 批量导出导入

全部导出 批量导出 报错问题分析 经过排查&#xff0c;原因是因为在发起 axios 请求的时候&#xff0c;没有指定响应的数据类型&#xff08;这里需要指定响应的数据类型为 blob 二进制文件&#xff09; 当响应数据回来后&#xff0c;会执行 axios 后置拦截器的代码&#xff0…

相机模型浅析

相机模型 文章目录 相机模型四个坐标系针孔相机模型世界坐标系到相机坐标系相机坐标系到图像坐标系图像坐标到像素坐标 四个坐标系 ①世界坐标系&#xff1a;是客观三维世界的绝对坐标系&#xff0c;也称客观坐标系。因为数码相机安放在三维空间中&#xff0c;我们需要世界坐标…

主流排序简单集合

排序算法集合 选择排序 图解&#xff1a;以此类推直至 /*选择排序*/ void select_sort(vector<int>& nums) {/*选取一个基准元素逐个与后面的比较*/for (int i 0; i < nums.size() - 1-1; i) {int min i;/*定义随之变化的基准元素*/for (int j i 1; j <…

华为 2024 届校园招聘-硬件通⽤/单板开发——第一套(部分题目分享,完整版带答案,共十套)

华为 2024 届校园招聘-硬件通⽤/单板开发——第一套 部分题目分享&#xff0c;完整版带答案(有答案和解析&#xff0c;答案非官方&#xff0c;未仔细校正&#xff0c;仅供参考&#xff09;&#xff08;共十套&#xff09;获取&#xff08;WX:didadidadidida313&#xff0c;加我…

GEE:研究区(Polygon)样式设置

作者:CSDN @ _养乐多_ 本文将介绍在 Google Earth Engine (GEE)平台上为 polygon (面)数据设置样式的方法和代码,polygon 可以设置成任何颜色,以增加可视化效果更好理解数据分布。 结果如下图所示, 文章目录 一、统一样式1.1 示例代码1.2 示例代码链接二、根据区域名…

基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba并采用前后端分离的企业级微服务多租户系统架构

简介 基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba并采用前后端分离的企业级微服务多租户系统架构。并引入组件化的思想实现高内聚低耦合并且高度可配置化&#xff0c;适合学习和企业中使用。 真正实现了基于RBAC、jwt和oauth2的无状态统一权限认证的解决方案&#x…

element UI table合并单元格方法

废话不多讲&#xff0c;直接上代码&#xff0c;希望能帮到需要的朋友 // 合并单元格function spanMethod({ row, column, rowIndex, columnIndex }) {//定义需要合并的列字段&#xff0c;有哪些列需要合并&#xff0c;就自定义添加字段即可const fields [declareRegion] // …

hive-3.1.2分布式搭建与hive的三种交互方式

hive-3.1.2分布式搭建&#xff1a; 一、上传解压配置环境变量 在官网或者镜像站下载驱动包 华为云镜像站地址&#xff1a; hive&#xff1a;Index of apache-local/hive/hive-3.1.2 mysql驱动包&#xff1a;Index of mysql-local/Downloads/Connector-J # 1、解压 tar -zx…

采用Flink CDC操作SQL Server数据库获取增量变更数据

采用Flink CDC操作SQL Server数据库获取增量变更数据 Flink CDC 1.12版本引入了对SQL Server的支持&#xff0c;包括SqlServerCatalog和SqlServerTable。在SqlServerCatalog中&#xff0c;你可以根据表名获取对应的字段和字段类型。 SQL Server 2008 开始支持变更数据捕获 (C…

李廉洋:4.10黄金原油走势最新分析及策略。

美联储博斯蒂克重申了他对今年降息一次的预期&#xff0c;但他补充说&#xff0c;如果经济形势发生变化&#xff0c;他对推迟降息或进一步降息持开放态度。博斯蒂克强调了美国经济和劳动力市场的持续强劲&#xff0c;但表示就业市场的疲软迹象将促使他考虑比目前预期的更早和更…

GFS部署实验

目录 1、部署环境 ​编辑 2、更改节点名称 3、准备环境 4、磁盘分区&#xff0c;并挂载 5. 做主机映射--/etc/hosts/ 6. 复制脚本文件 7. 执行脚本完成分区 8. 安装客户端软件 1. 安装解压源包 2. 创建gfs 3. 安装 gfs 4. 开启服务 9、 添加节点到存储信任池中 1…

SVM向量支持机

1.通俗理解 svm&#xff1a;support vector machine目标&#xff1a;利用超平面将两类数据分割开来&#xff0c;这个超平面就是我们要设计的对象 如何设计&#xff1f;我们设计之后会有间隔&#xff0c;间隔越大分类效果就越好&#xff1b;距离决策边界最近的点我们成为支持向…

40.Python从入门到精通—Python3 JSON 数据解析 Python3 日期和时间 什么是时间元组? 获取当前时间 获取格式化的时间

40.Python从入门到精通—Python3 JSON 数据解析 Python3 日期和时间 什么是时间元组&#xff1f; 获取当前时间 获取格式化的时间 Python3 JSON 数据解析Python3 日期和时间什么是时间元组&#xff1f;获取当前时间获取格式化的时间 Python3 JSON 数据解析 Python3 中可以使用…

使用MongoDB 构建AI:轻松应对从预测式AI到生成式AI

毫无疑问&#xff0c;如今从生成式AI (GenAI )中获益最大的&#xff0c;是那些早已运用预测式AI (Predictive AI )的组织。 2023年6月&#xff0c;麦肯锡在2023年6月发布的《生成式人工智能的经济潜力》研究中也得出了与此相同的结论。 原因主要有以下几点&#xff1a; 内部文…

顺序表(C语言实现)

什么是顺序表 顺序表和数组的区别 顺序表本质就是数组 结构体初阶进阶 系统化的学习-CSDN博客 简单解释一下&#xff0c;就像大家去吃饭&#xff0c;然后左边是苍蝇馆子&#xff0c;右边是修饰过的苍蝇馆子&#xff0c;但是那个好看的苍蝇馆子一看&#xff0c;这不行啊&a…