【Flink入门修炼】1-3 Flink WordCount 入门实现

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
image.png

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 创建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
image.png

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上:
image.png

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

wordfrequency
hello1
world1
flink1
flink1
flink1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程序如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 创建上下文环境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 读取一行模拟数据作为输入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 转 SQL,指定字段名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 注册为一个表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

执行,结果输出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/378746.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

54.螺旋矩阵(Java)

题目描述&#xff1a; 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 输入&#xff1a; matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a; [1,2,3,6,9,8,7,4,5] 代码实现&#xff1a; import java.util.ArrayLi…

设置idea中放缩字体大小

由于idea没默认支持ctrl滚轴对字体调节大小&#xff0c;下面一起设置一下吧&#xff01; 点击 文件 -> 设置 按键映射 -> 编辑器操作 -> 搜索栏输入f 点击减小字体大小 -> 选择增加鼠标快捷键 按着ctrl键&#xff0c;鼠标向下滚动后&#xff0c;点击确定即可 然后…

还是蓝海项目?浅谈steam海外道具搬运项目几个常见问题!

做steam这个项目做了已经3年多了。记得刚开始做的时候还是一个很冷门的项目&#xff0c;现在越来越多的朋友也开始了解这个项目。 其中不乏很多已经在别的地方了解过后来找我咨询的朋友。我发现一些同行或者说自媒体太过于虚假宣传&#xff0c;把steam这个项目说的太好了。也有…

2024阿里云GPU服务器租用费用价格表说明

阿里云GPU服务器租用价格表包括包年包月价格、一个小时收费以及学生GPU服务器租用费用&#xff0c;阿里云GPU计算卡包括NVIDIA V100计算卡、T4计算卡、A10计算卡和A100计算卡&#xff0c;GPU云服务器gn6i可享受3折优惠&#xff0c;阿里云百科aliyunbaike.com分享阿里云GPU服务器…

考研数据结构笔记(4)

链表&#xff08;链式存储&#xff09; 单链表定义基本操作的实现单链表的插入按位序插入指定节点的前插指定节点的后插 单链表的删除 小结 单链表 定义 顺序表优点:可随机存取&#xff0c;存储密度高&#xff0c;缺点:要求大片连续空间&#xff0c;改变容量不方便。 单链表优…

计算机网络基本知识(一)

文章目录 概要速率带宽、吞吐量带宽吞吐量 时延发送&#xff08;传输&#xff09;时延传播时延排队时延处理时延时延带宽积 利用率 概要 速率、带宽、吞吐量、时延、利用率 速率 记忆要点&#xff1a;10的三次方 记忆要点&#xff1a;2的10次方 带宽、吞吐量 带宽 单位&…

[C/C++] -- CMake使用

CMake&#xff08;Cross-platform Make&#xff09;是一个开源的跨平台构建工具&#xff0c;用于自动生成用于不同操作系统和编译器的构建脚本。它可以简化项目的构建过程&#xff0c;使得开发人员能够更方便地管理代码、依赖项和构建设置。 CMake 使用一个名为 CMakeLists.tx…

QML中常见热区及层级结构

目录 引言层级结构默认层级结构z值作用范围遮罩实现-1的作用 热区嵌套与普通元素与其他热区与Flickable 事件透传总结 引言 热区有很多种&#xff0c;诸如MouseArea、DropArea、PinchArea等等&#xff0c;基本都是拦截对应的事件&#xff0c;允许开发者在事件函数对事件进行响…

kubectl 创建 Pod 背后到底发生了什么?

想象一下&#xff0c;如果我想将 nginx 部署到 Kubernetes 集群&#xff0c;我可能会在终端中输入类似这样的命令&#xff1a; $ kubectl run --imagenginx --replicas3然后回车。几秒钟后&#xff0c;你就会看到三个 nginx pod 分布在所有的工作节点上。这一切就像变魔术一样…

Java学习15-- 面向对象学习3. 对象的创建分析【★】

&#xff08;本章看不懂多读几遍&#xff0c;弄懂后再往下章看&#xff09; 面向对象学习3. 对象的创建分析 Java Memory Structure: 如上图所示&#xff1a; 主要分为Stack和Heap Memory 其中Stack主要放method包括main 程序从main开始所以main最先进入Stack&#xff0c;等…

蓝桥杯第八届省赛题笔记------基于单片机的电子钟程序设计与调试

题目要求&#xff1a; 一、基本要求 1.1 使用 CT107D 单片机竞赛板&#xff0c;完成“电子钟”功能的程序设计与调试&#xff1b; 1.2 设计与调试过程中&#xff0c;可参考组委会提供的“资源数据包”&#xff1b; 1.3 Keil 工程文件以准考证号命名&#xff0c;保存在…

C语言系列-文件操作

&#x1f308;个人主页: 会编程的果子君 ​&#x1f4ab;个人格言:“成为自己未来的主人~” 为什么使用文件 如果没有文件&#xff0c;我们写的程序的数据是存储在电脑的内存上&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就会丢失了&#xff0c;等再次运行…

MySQL 日志管理

4.6&#xff09;日志管理 MySQL 支持丰富的日志类型&#xff0c;如下&#xff1a; 事务日志&#xff1a;transaction log 事务日志的写入类型为 "追加"&#xff0c;因此其操作为 "顺序IO"&#xff1b; 通常也被称为&#xff1a;预写式日志 write ahead…

[职场] 服务行业个人简历 #笔记#笔记

服务行业个人简历 服务员个人简历范文1 姓名: XXX国籍:中国 目前所在地:天河区民族:汉族 户口所在地:阳江身材: 160cm43kg 婚姻状况:未婚年龄: 21岁 培训认证:诚信徽章: 求职意向及工作经历 人才类型:普通求职 应聘职位: 工作年限:职称:初级 求职类型:全职可到职日期:随时 月薪…

住宅供暖设备行业调研:市场环境将稳定发展阶段中

为使人们生活或进行生产的空间保持在适宜的热状态而设置的供热设施。 向一定的空间加热量的办法&#xff0c;可以直接把产生热量的火炉装在其中;也可以抽出其中的空气&#xff0c;加热后再送回;也可以在其中装置保持在较高温度的物体&#xff0c;向所在空间放热。这种温度较高的…

极值图论基础

目录 一&#xff0c;普通子图禁图 二&#xff0c;Turan问题 三&#xff0c;Turan定理、Turan图 1&#xff0c;Turan定理 2&#xff0c;Turan图 四&#xff0c;以完全二部图为禁图的Turan问题 1&#xff0c;最大边数的上界 2&#xff0c;最大边数的下界 五&#xff0c;…

机器人学、机器视觉与控制 上机笔记(第一版译文版 2.1章节)

机器人学、机器视觉与控制 上机笔记&#xff08;第一版译文版 2.1章节&#xff09; 1、前言2、本篇内容3、代码记录3.1、新建se23.2、生成坐标系3.3、将T1表示的变换绘制3.4、完整绘制代码3.5、获取点*在坐标系1下的表示3.6、相对坐标获取完整代码 4、结语 1、前言 工作需要&a…

【前端web入门第四天】03 显示模式+综合案例热词与banner效果

文章目录: 1. 显示模式 1.1 块级元素,行内元素,行内块元素 1.2 转换显示模式 综合案例 综合案例一 热词综合案例二 banner效果 1. 显示模式 什么是显示模式 标签(元素)的显示方式 标签的作用是什么? 布局网页的时候&#xff0c;根据标签的显示模式选择合适的标签摆放内容。…

编码安全风险是什么,如何进行有效的防护

2011年6月28日晚20时左右&#xff0c;新浪微博突然爆发XSS&#xff0c;大批用户中招&#xff0c;被XSS攻击的用户点击恶意链接后并自动关注一位名为HELLOSAMY的用户&#xff0c;之后开始自动转发微博和私信好友来继续传播恶意地址。不少认证用户中招&#xff0c;也导致该XSS被更…

21、数据结构/单向循环链表练习20240203

一、请编程实现单向循环链表的头插&#xff0c;头删、尾插、尾删。 二、请编程实现单向循环链表约瑟夫环 约瑟夫环&#xff1a;用循环链表编程实现约瑟夫问题 n个人围成一圈&#xff0c;从某人开始报数1, 2, …, m&#xff0c;数到m的人出圈&#xff0c;然后从出圈的下一个人…