Flink(一)【WordCount 快速入门】

前言

        学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

        最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

        这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
        就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

        我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties>
 <flink.version>1.13.0</flink.version>
 <java.version>1.8</java.version>
 <scala.binary.version>2.12</scala.binary.version>
 <slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖-->
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-api</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-to-slf4j</artifactId>
 <version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties 

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 入门案例

0、数据准备

在 根目录下创建 words.txt

hello flink
hello java
hello spark
hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个执行批式数据处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSet
        DataSource<String> lineDS = env.readTextFile("input/words.txt");

        // 3. 将每行数据转换成一个二元组类型
        // 输入类型: String 输出类型: Tuple2
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =
                // String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型
                lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型

        // 4. 根据 word 分组
        UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置

        // 5. 分组内进行聚合
        AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置

        // 6. 打印结果
        res.print();

    }
}

运行结果:

(hadoop,1)
(flink,1)
(hello,4)
(java,1)
(spark,1)

Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");

        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);

        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);

        // 6. 输出结果
        res.print();

        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果:

3> (java,1)
13> (flink,1)
1> (spark,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
5> (hello,4)
15> (hadoop,1)

        我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

        Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

        我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class UnBoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");
        DataStreamSource<String> lineDS = env.socketTextStream(host,port);

        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        
        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);

        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);

        // 6. 输出结果
        res.print();

        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果: 

        可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

        Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

        还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123714.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue3错误排查-POST请求的body参数 传参方式form-data和json

问题&#xff1a;vue3实现登录功能&#xff0c;登录成功后 跳转到登陆后的界面 一秒后 闪退回登录页 对应的输出结果也一闪而过&#xff0c;反复复查了代码&#xff0c;没问题。&#xff08;封装的 post 请求未成功发起&#xff09; 自测&#xff1a;进行断点输出调试。强行跳…

接口幂等性详解

1. 什么是幂等性 幂等性指的是对同一个操作的多次执行所产生的影响与一次执行的影响相同。无论操作执行多少次&#xff0c;系统状态都应该保持一致。 在计算机科学和网络领域中&#xff0c;幂等性通常用来描述服务或操作的特性。对于RESTful API或HTTP方法&#xff0c;一个幂…

049-第三代软件开发-软件部署脚本(一)

第三代软件开发-软件部署脚本(一) 文章目录 第三代软件开发-软件部署脚本(一)项目介绍软件部署脚本(一)其他方式 关键字&#xff1a; Qt、 Qml、 bash、 shell、 脚本 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt Meta-Object…

Java 最常见的面试题:常用的 jvm 调优的参数都有哪些?

Java 最常见的面试题&#xff1a;常用的 jvm 调优的参数都有哪些? 常用的Java虚拟机&#xff08;JVM&#xff09;调优参数有很多&#xff0c;以下是一些重要的参数&#xff1a; -Xms 和 -Xmx&#xff1a;这两个参数分别设置了JVM启动内存的最小值和最大值&#xff0c;单位通…

毅速丨为什么不锈钢材料在金属3D打印中应用广泛

不锈钢材料作为一种常见材料&#xff0c;在金属3D打印中应用广泛&#xff0c;可以说是目前使用率最高的材料&#xff0c;为什么不锈钢大受欢迎&#xff0c;主要由几点原因。 第一、工艺适合性 金属3D打印的工艺&#xff0c;如直接金属激光烧结&#xff08;DMLS&#xff09;或选…

Leetcode—2578.最小和分割【简单】

2023每日刷题&#xff08;二十三&#xff09; Leetcode—2578.最小和分割 实现代码 class Solution { public:int splitNum(int num) {vector<int> a;while(num) {a.push_back(num % 10);num / 10;}int n a.size();sort(a.begin(), a.begin() n);int num1 0;int num…

【Spring】bean的自动装配

目录 一.byName 二.byType 快捷书写 people1 package org.example;public class People1 {public void eat(){System.out.println("吃饭");} }people2 package org.example;public class People2 {public void sleep(){System.out.println("睡觉");} …

【SVN】

SVN 1 svn使用1.1 主干合并到分支1.2 分支合并到主干1.3 分支建立1.4 创建分支1.5 切换分支1.6 合并分支1.7 删除分支 2 概念理解 1 svn使用 1.1 主干合并到分支 首先&#xff0c;在本地trunk中先update一下&#xff0c;有冲突的解决冲突&#xff0c;保证trunk和repository已…

软件测试/测试开发丨接口测试学习笔记,TcpDump与WireShark

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/27859 协议分析工具 网络监听&#xff1a;TcpDump WireShark 代理 Proxy 推荐工具&#xff1a;手工测试charles [全平台]、安全测试burpsuite [全平台 j…

10-Docker-分布式存储算法

01-哈希取余算法分区 哈希取余分区&#xff08;Hash Modulus Partitioning&#xff09;是一种在分布式计算和数据存储中常用的分区策略&#xff0c;其目的是将数据或计算任务分配到多个节点或服务器上&#xff0c;以实现负载均衡和提高性能。这种分区策略的核心思想是使用哈希…

java网络通信:Springboot整合Websocket

网络通信 什么是webSocket&#xff1f;WebSocket 原理springboot整合websocket过程 网络通信三要素&#xff1a;ip地址&#xff08;ipv4、ipv6&#xff09;、端口号&#xff08;应用程序的唯一标识&#xff09;、协议&#xff08;连接和通信的规则&#xff0c;常用&#xff1a;…

基于袋獾算法的无人机航迹规划-附代码

基于袋獾算法的无人机航迹规划 文章目录 基于袋獾算法的无人机航迹规划1.袋獾搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用袋獾算法来优化无人机航迹规划。 1.袋獾搜索算法 …

深入探讨 Presto 中的缓存

Presto是一种流行的开源分布式SQL引擎&#xff0c;使组织能够在多个数据源上大规模运行交互式分析查询。缓存是一种典型的提高 Presto 查询性能的优化技术。它为 Presto 平台提供了显着的性能和效率改进。 缓存通过将频繁访问的数据存储在内存或快速本地存储中&#xff0c;避免…

redis的数据类型及操作

一、redis 的数据库 Redis是一个字典结构的存储服务器。一个Redis实例提供了多个用来存储数据的字典,客户端可以指定将数据存在哪个字典中。这与关系型数据库例中可以创建多个数据库似。因此,可以将每个字典理解为一个独立的数据库。每个数据库对外都是以0开始的递增数字命名…

HTML的表单标签和无语义标签的讲解

HTML的表单标签 表单是让用户输入信息的重要途径, 分成两个部分: 表单域: 包含表单元素的区域. 重点是 form 标签. 表单控件: 输入框, 提交按钮等. 重点是 input 标签 form 标签 使用form进行前后端交互.把页面上,用户进行的操作/输入提交到服务器上 input 标签 有很多形态,能…

本地消息表分布式事务

BASE论文 论文链接&#xff1a;https://queue.acm.org/detail.cfm?id1394128 里面提到&#xff0c; The most critical factor in implementing the queue, however, is ensuring that the backing persistence is on the same resource as the database. This is necessary…

5G边缘计算网关的功能及作用

5G边缘计算网关具有多种功能。 首先&#xff0c;它支持智能云端控制&#xff0c;可以通过5G/4G/WIFI等无线网络将采集的数据直接上云&#xff0c;实现异地远程监测控制、预警通知、报告推送和设备连接等工作。 其次&#xff0c;5G边缘计算网关可以采集各种数据&#xff0c;包…

vue中 process.env 对象为空对象问题

问题&#xff1a;今天在处理vue项目环境问题的时候&#xff0c;发现直接打印 process 对象和打印 process.env 时 env 对象输出结果是不一样的&#xff0c;如下图所示&#xff1a; 在网上搜索了一番后发现还是有挺多朋友对此感到疑惑的&#xff0c;询问了同事&#xff0c;同…

数模之线性规划

线性规划 优化类问题&#xff1a;有限的资源&#xff0c;最大的收益 例子: 华强去水果摊找茬&#xff0c;水果摊上共3个瓜&#xff0c;华强总共有40点体力值,每劈一个瓜能带来40点挑衅值,每挑一个瓜问“你这瓜保熟吗”能带来30点挑衅值,劈瓜消耗20点体力值&#xff0c;问话消耗…

Vue3 简单实现虚拟Table,展示海量单词.利用WebAPI speechSynthesis,朗读英语单词

目录 本页面完整代码 视频演示 完整的页面代码 利用webapi speechSynthesis帮助我们自动郎读英语单词&#xff0c;可以利用这个API&#xff0c;做一些小说朗读或到账提示。 本页面完整代码 用Vue写了一个简单页面&#xff0c;里面还写了一个简单的虚拟Table支持海量数据展示…