Flink 从入门到实战

Flink中的批和流

批处理的特点是有界、持久、大量,非常适合需要访问全部记录才能完成的计算工作,一般用于离线统计

流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统 传输的每个数据项执行操作,一般用于实时统计。

一个无界流可以分解为多个有界流

性能 Flink > Spark > Hadoop

Flink的四种安装模式:

  1. Local

  2. Standalone

  3. standaloneHA

  4. Yarn

flink在使用input、output执行测试文件WordCount.jar 的时候,报出找不到文件的错误(但是文件路径存在),原因是:

因为我们的flink是task节点在执行任务的,task在三台机器上都有分布,我们当前文件只在一台服务器中,所以当其他task运行的时候,就会报出找不到文件的错误,将此文件分发到每台服务器中就不会出现这个错误。(我们以后在使用flink的时候,数据都是存放在hdfs上(一式三份),就不存在找不到文件的错误)

Flink-WordCount案例:

  1. 第一版代码

这一版代码比较简单,看代码就可以看懂

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _01WorkCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");

        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        });

        // 将切割好的字符串形成 (word,1)的二元组的形式
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);

            }
        });

        // 聚合
        DataStream<Tuple2<String, Integer>> sumResult = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        }).sum(1);

        sumResult.print();
        env.execute();

    }
}
第二版代码:简化了第一版的书写形式

第一版中 SingleOutputStreamOperator、DataStreamSource的父类其实都是DataStream,所以可以连着写下来
 

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _02WorkCount {

    /**
     *
     *  简化版案例
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
                .flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);

            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        }).sum(1).print();
        env.execute();

    }
}
第三版,使用lambda表达式,更加简化的书写

不过在使用lambda的时候,需要在后面指定一个方法的返回值,要不然会报错

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _03WorkCount_lambda {

    /**
     * lambda 表达式简化版
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {

        // 使用lambda简化的时候,需要指定返回值类型,不指定的话会报错

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
                .flatMap((String line, Collector<String> collector) -> {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }).returns(Types.STRING).map((String word) ->  Tuple2.of(word, 1)

            ).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) ->  tuple2.f0).sum(1).print();
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        env.execute();

    }
}
复习lambda表达式:
  • lambda可以用来简化匿名内部类的书写

  • lambda只能简化函数式接口(有且仅有一个方法的接口)的匿名内部类的书写

省略规则:

  • 只拿小括号里面的 加上 -> 指向大括号

  • 只有一个参数的时候,参数类型可以省略

  • 如果方法体中的代码只有一行,大括号和return等都可以省略(但是需要同时省略)

没省略之前的 (第一版)

省略后(第三版)

第四版,自定义输入与输出的路径地址

可以打包到集群中使用,使用的时候在jar包的后面跟上input路径以及output路径即可

package com.bigdata;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _04WorkCount_zidingyipass {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 将任务的并行度设置为2
        // env.setParallelism(2);

        // 通过args传参
        DataStreamSource<String> dataStream01 = null;
        if (args.length == 0){
            dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
        }else {
            String input = args[0];
            dataStream01 = env.readTextFile(input);
        }


        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
                .flatMap((String line, Collector<String> collector) -> {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }

        }).map((String word) -> Tuple2.of(word, 1)


        ).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0

         // 此处的1 指的是元组的第二个元素,进行相加的意思
        ).sum(1);

        if (args.length == 0){
            sumResult.print();
        }else {
            String output = args[1];
            sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }

        env.execute();

    }
}

打包后执行结果如下:

第五版,采用和flink中相同的书写方式 即带(--input 以及 --output)

也可以打包到集群中使用,使用的时候在jar包的后面跟上 --input +路径以及 -output + 路径即可

package com.bigdata;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _05WorkCount_zidingyipass_input {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 将任务的并行度设置为2
        // env.setParallelism(2);
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String input = "";
        String output = "";
        if (parameterTool.has("output") && parameterTool.has("input")) {
            input = parameterTool.get("input");
            output = parameterTool.get("output");
        } else {
            output = "hdfs://bigdata01:9820/home/wordcount/output";
        }

        // 通过args传参
        DataStreamSource<String> dataStream01 = null;
        if (args.length == 0){
            dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
        }else {
            dataStream01 = env.readTextFile(input);
        }
        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
                .flatMap((String line, Collector<String> collector) -> {
                
                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
        }).returns(Types.STRING).map((String word) -> Tuple2.of(word, 1)

        ).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0

         // 此处的1 指的是元组的第二个元素,进行相加的意思
        ).sum(1);

        if (args.length == 0){
            sumResult.print();
        }else {
            sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/925831.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS 5.0应用开发——列表(List)

【高心星出品】 文章目录 列表&#xff08;List&#xff09;列表介绍列表布局设置主轴方向设置交叉轴方向 列表填充分组列表填充 滚动条位置设置滚动位置滚到监听 列表项侧滑 列表&#xff08;List&#xff09; 列表介绍 列表作为一种容器&#xff0c;会自动按其滚动方向排列…

RBF神经网络预测结合NSGAII多目标优化

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 RBF神经网络预测结合NSGAII多目标优化 rbf神经网络预测结合nsga2多目标优化 题外话&#xff1a; 多目标优化是指在优化问题中同时考虑多个目标函数的优化过程。在多目标优化中&#xff0c;通常存在多个冲突的目标&am…

HTTPTomcatServlet

今日目标: 了解JavaWeb开发的技术栈理解HTTP协议和HTTP请求与响应数据的格式掌握Tomcat的使用掌握在IDEA中使用Tomcat插件理解Servlet的执行流程和生命周期掌握Servlet的使用和相关配置1,Web概述 1.1 Web和JavaWeb的概念 Web是全球广域网,也称为万维网(www),能够通过浏览…

理解Linux的select、poll 和 epoll:从原理到应用场景

I/O 多路复用并不是什么新东西&#xff0c;select 早在 1983 年就出现了&#xff0c;poll 在 1997 年&#xff0c;epoll 是 2002 年的产物。面试题总爱问“多路复用多厉害&#xff1f;”其实它就是把轮询的锅甩给了操作系统&#xff0c;而操作系统不过是用 CPU 指令帮你完成事件…

111.有效单词

class Solution {public boolean isValid(String word) {if(word.length()<3){return false;}int countV0,countC0;//分别统计原音和辅音for(int i0;i<word.length();i){if(Character.isLetterOrDigit(word.charAt(i))){if(word.charAt(i)a||word.charAt(i)e||word.charA…

蓝桥杯每日真题 - 第24天

题目&#xff1a;&#xff08;货物摆放&#xff09; 题目描述&#xff08;12届 C&C B组D题&#xff09; 解题思路&#xff1a; 这道题的核心是求因数以及枚举验证。具体步骤如下&#xff1a; 因数分解&#xff1a; 通过逐一尝试小于等于的数&#xff0c;找到 n 的所有因数…

设计模式 外观模式 门面模式

结构性模式-外观模式 门面模式 适用场景&#xff1a;如果你需要一个指向复杂子系统的直接接口&#xff0c; 且该接口的功能有限&#xff0c; 则可以使用外观模式。 不用关心后面的查询具体操作 /*** 聚合查询接口*/ RestController RequestMapping("/search") Slf…

【数据资产】数据资产管理体系概述

导读&#xff1a;数据资产管理是对企业或组织内部产生的海量数据进行全面、系统、规范的管理&#xff0c;包括数据的收集、存储、处理、分析、利用和保护等环节&#xff0c;旨在挖掘数据价值&#xff0c;提升数据质量&#xff0c;确保数据安全&#xff0c;从而支持业务决策&…

【论文笔记】Tool Learning with Foundation Models 论文笔记

Tool Learning with Foundation Models 论文笔记 文章目录 Tool Learning with Foundation Models 论文笔记摘要背景&#xff1a;工作&#xff1a; 引言工具学习的发展本文工作&#xff08;大纲&目录&#xff09; 背景2.1 工具使用的认知起源2.2 工具分类&#xff1a;用户界…

dbeaver如何批量执行sql脚本

场景:需要对数据库中的表做批量操作,需要加载多个sql文件,并批量执行 1.创建链接文件或链接文件夹(把脚本加载到dbeaver对应的目录下) 2.创建新任务(创建批量执行sql文件的任务) 3.执行任务

SpringBoot小知识(3):热部署知识

一、热部署 热部署是一个非常消耗内存的机制&#xff0c;在实际大型项目开发中几乎用不到&#xff0c;只有小型项目或者分模块或者不停机更新的时候才会用到&#xff0c;仁者见仁智者见智。 1.1 什么是热部署&#xff1f; 热部署是指在不停止应用程序或服务器的情况下&#xf…

信息学奥赛一本通 1448:【例题1】电路维修 | 洛谷 P4667 [BalticOI 2011 Day1] Switch the Lamp On 电路维修

【题目链接】 ybt 1448&#xff1a;【例题1】电路维修 洛谷 P4667 [BalticOI 2011 Day1] Switch the Lamp On 电路维修 【题目考点】 1. 双端队列广搜&#xff08;0-1BFS&#xff09; 【解题思路】 整个电路是由一个个的正方形的电路元件组成&#xff0c;每个正方形有四个…

SQL Server 实战 - 多种连接

目录 背景 一、多种连接 1. 复合连接条件 2. 跨数据库连接 3. 隐连接 4. 自连接 5. 多表外连接 6. UNION ALL 二、一个对比例子 背景 本专栏文章以 SAP 实施顾问在实施项目中需要掌握的 sql 语句为偏向进行选题&#xff1a; 用例&#xff1a;SAP B1 的数据库工具&am…

在openEuler中使用top命令

在openEuler中使用top命令 概述 top 命令是Linux系统中最常用的实时性能监控工具之一,允许用户查看系统的整体状态,包括CPU使用率、内存使用情况、运行中的进程等。本文档将详细介绍如何在openEuler操作系统中有效利用top命令进行系统监控。 启动top命令 打开终端并输入t…

架构-微服务-服务调用Dubbo

文章目录 前言一、Dubbo介绍1. 什么是Dubbo 二、实现1. 提供统一业务api2. 提供服务提供者3. 提供服务消费者 前言 服务调用方案--Dubbo‌ 基于 Java 的高性能 RPC分布式服务框架&#xff0c;致力于提供高性能和透明化的 RPC远程服务调用方案&#xff0c;以及SOA服务治理方案。…

【vue-router】Vue-router如何实现路由懒加载

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

26届JAVA 学习日记——Day17

2024.11.29 周五 今天把苍穹外卖的项目做完啦&#xff0c;准备开始做新的AI项目&#xff0c;今天的时间主要在修改简历&#xff0c;超级简历现在要收费&#xff0c;自己Word慢慢改真的很耗时间。 八股 今日暂无该内容学习。 算法 今日暂无该内容学习。 项目 苍穹外卖-完结…

工控网络安全系列一

工控网络安全 框架图 工业协议、PLC 工业网络安全危机国家安全 工控安全漏洞 禁止出口的技术壁垒&#xff0c;能源、电力、水处理成为重点攻击的安全 对抗和利益 国家、利益集团、民间 4层 工控安全基础设施&#xff1a; 工控网络安全和传统网络安全 工业安全病毒介绍 工业控…

springboot338it职业生涯规划系统--论文pf(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;it职业生涯规划系统的设计与实现 摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以…

技术创新与人才培养并重 软通动力子公司鸿湖万联亮相OpenHarmony人才生态大会

11月27日&#xff0c;由开放原子开源基金会指导&#xff0c;OpenHarmony项目群工作委员会主办的OpenHarmony人才生态大会2024在武汉隆重举办。软通动力子公司鸿湖万联作为OpenHarmony项目群A类捐赠人应邀出席。大会期间&#xff0c;鸿湖万联不仅深度参与了OpenHarmony人才生态年…