Flink从入门到实践(一):Flink入门、Flink部署

文章目录

  • 系列文章索引
  • 一、快速上手
    • 1、导包
    • 2、求词频demo
      • (1)要读取的数据
      • (2)demo1:批处理(离线处理)
      • (3)demo2 - lambda优化:批处理(离线处理)
      • (4)demo3:流处理(实时处理)
      • (5)总结:实时vs离线
      • (6)demo4:批流一体
      • (7)对接Socket
  • 二、Flink部署
    • 1、Flink架构
    • 2、Standalone部署
    • 3、自运行flink-web
    • 4、通过参数传递
    • 5、通过webui提交job
    • 6、停止作业
    • 7、常用命令
    • 8、集群
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、快速上手

1、导包

<!-- fink 相关依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>

2、求词频demo

注意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。

(1)要读取的数据

定义data内容:

pk,pk,pk
ruoze,ruoze
hello

(2)demo1:批处理(离线处理)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 使用Flink进行批处理,并统计wc
 *
 *
 * 结果:
 * (bye,2)
 * (hello,3)
 * (hi,1)
 */
public class BatchWordCountApp {

    public static void main(String[] args) throws Exception {
        // step0: Spark中有上下文,Flink中也有上下文,MR中也有
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // step1: 读取文件内容  ==> 一行一行的字符串而已
        DataSource<String> source = env.readTextFile("data/wc.data");

        // step2: 每一行的内容按照指定的分隔符进行拆分  1:N
        source.flatMap(new FlatMapFunction<String, String>() {

                    /**
                     *
                     * @param value 读取到的每一行数据
                     * @param out 输出的集合
                     */
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        // 使用,进行分割
                        String[] splits = value.split(",");
                        for(String split : splits) {
                            out.collect(split.toLowerCase().trim());
                        }
                    }
                })
                .map(new MapFunction<String, Tuple2<String,Integer>>() {

                    /**
                     *
                     * @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)
                     */
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                })
                .groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标
                .sum(1)  // ==> 求词频 sum,传下标
                .print(); // 打印
    }
}

(3)demo2 - lambda优化:批处理(离线处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * lambda表达式优化
 */
public class BatchWordCountAppV2 {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = env.readTextFile("data/wc.data");

        /**
         * lambda语法: (参数1,参数2,参数3...) -> {函数体}
         */
//        source.map(String::toUpperCase).print();

        // 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .groupBy(0).sum(1).print();

    }
}

(4)demo3:流处理(实时处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流式处理
 * 结果:
 * 8> (hi,1)
 * 6> (hello,1)
 * 5> (bye,1)
 * 6> (hello,2)
 * 6> (hello,3)
 * 5> (bye,2)
 */
public class StreamWCApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("data/wc.data");
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum
                .sum(1).print();

        // 需要手动开启
        env.execute("作业名字");
    }
}

(5)总结:实时vs离线

离线:结果是一次性出来的。
实时:来一个数据处理一次,数据是带状态的。

(6)demo4:批流一体

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 采用批流一体的方式进行处理
 */
public class FlinkWordCountApp {

    public static void main(String[] args) throws Exception {
        // 统一使用StreamExecutionEnvironment这个执行上下文环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动

        DataStreamSource<String> source = env.readTextFile("data/wc.data");

        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握
                .sum(1).print();

        // 执行
        env.execute("作业名字");
    }
}

(7)对接Socket

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 使用Flink对接Socket的数据并进行词频统计
 *
 * 大数据处理的三段论: 输入  处理  输出
 *
 */
public class FlinkSocket {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text
         *
         * 官网上描述的是 env.addSource(...)
         *
         * socket的方式对应的并行度是1,因为它来自于SourceFunction的实现
         */
        DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
        System.out.println(source.getParallelism());

        // 处理
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
                    String[] splits = value.split(",");
                    for(String split : splits) {
                        out.collect(Tuple2.of(split.trim(), 1));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握
                .sum(1)
                // 数据输出
                .print();  // 输出到外部系统中去

        env.execute("作业名字");
    }
}

二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。

一个JobManager(协调/分配),一个或多个TaskManager(工作)。
在这里插入图片描述
在这里插入图片描述

2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

可以根据官网来安装,需要下载、解压、安装。
也可以使用docker安装。

启动之后,localhost:8081就可以访问管控台了。

3、自运行flink-web

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>1.18.0</version>
</dependency>
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 新版本可以直接使用getExecutionEnvironment(conf)

以上亲测并不好使……具体原因未知,设置为flink1.16版本或许就好用了。

4、通过参数传递

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");

DataStreamSource<String> source = env.socketTextStream(host, port);
System.out.println(source.getParallelism());

可以通过命令行参数:–host localhost --port 8765

5、通过webui提交job

在这里插入图片描述
在这里插入图片描述

6、停止作业

在这里插入图片描述

7、常用命令

# 查看作业列表
flink list -a  # 所有
flink list -r  # 正在运行的
# 停止作业
flink cancel <jobid>

# 提交job
# -c,--class <classname> 指定main方法
# -C,--classpath <url> 指定classpath
# -p,--parallelism <paralle> 指定并行度
flink run -c com.demo.FlinkDemo FlinkTest.jar 

8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/

单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/

k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/

YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/

参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/379787.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

红队打靶练习:GLASGOW SMILE: 1.1

目录 信息收集 1、arp 2、nmap 3、nikto 4、whatweb 目录探测 1、gobuster 2、dirsearch WEB web信息收集 /how_to.txt /joomla CMS利用 1、爆破后台 2、登录 3、反弹shell 提权 系统信息收集 rob用户登录 abner用户 penguin用户 get root flag 信息收集…

蓝桥杯备赛Day9——链表进阶

给你一个链表,删除链表的倒数第 n 个结点,并且返回链表的头结点。 示例 1: 输入:head = [1,2,3,4,5], n = 2 输出:[1,2,3,5]示例 2: 输入:head = [1], n = 1 输出:[]示例 3: 输入:head = [1,2], n = 1 输出:[1]提示: 链表中结点的数目为 sz1 <= sz <= 300 &l…

PKI - 借助Nginx 实现Https 服务端单向认证、服务端客户端双向认证

文章目录 Openssl操系统默认的CA证书的公钥位置Nginx Https 自签证书Nginx Https 使用CA签发证书客户端使用自签证书供服务端验证客户端使用 根证书 签发客户端证书 供服务端验证 Openssl https://www.openssl.net.cn/ openssl是一个功能丰富且自包含的开源安全工具箱。 它提…

Centos7.9安装SQLserver2017数据库

Centos7.9安装SQLserver2017数据库 一、安装前准备 挂载系统盘 安装依赖 yum install libatomic* -y 二、yum方式安装 # 配置 yum 源 wget -O /etc/yum.repos.d/mssql-server.repo https://packages.microsoft.com/config/rhel/7/mssql-server-2017.repoyum clean all yum…

华为机考入门python3--(10)牛客10-字符个数统计

分类&#xff1a;字符 知识点&#xff1a; 字符的ASCII码 ord(char) 题目来自【牛客】 def count_unique_chars(s): # 创建一个空集合来保存不同的字符 unique_chars set() # 遍历字符串中的每个字符 for char in s: # 将字符转换为 ASCII 码并检查是否在范围内 #…

ARM PAC/BTI/MTE三剑客精讲与实战

一、PAC指针认证精讲与实战 思考 1、什么是栈溢出攻击&#xff1f;什么是代码重用攻击&#xff1f;区别与联系&#xff1f; 2、栈溢出攻击的软&硬件缓解技术有哪些&#xff1f;在TF-A&OPTEE上的应用&#xff1f; 3、什么是ROP攻击&#xff1f;对ROP攻击的缓解技术&…

MyBatis之动态代理实现增删改查以及MyBatis-config.xml中读取DB信息文件和SQL中JavaBean别名配置

MyBatis之环境搭建以及实现增删改查 前言实现步骤1. 编写MyBatis-config.xml配置文件2. 编写Mapper.xml文件&#xff08;增删改查SQL文&#xff09;3. 定义PeronMapper接口4. 编写测试类1. 执行步骤2. 代码实例3. 运行log 开发环境构造图总结 前言 上一篇文章&#xff0c;我们…

P3647 题解

文章目录 P3647 题解OverviewDescriptionSolutionLemmaProof Main Code P3647 题解 Overview 很好的题&#xff0c;但是难度较大。 模拟小数据&#xff01;——【数据删除】 Description 给定一颗树&#xff0c;有边权&#xff0c;已知这棵树是由这两个操作得到的&#xff1…

Leecode之环形链表

一.题目及剖析 https://leetcode.cn/problems/linked-list-cycle/description/ 这道题就是去判断一个链表是否带环,分两种情况,链表中只有一个元素则一定不带环,链表中有两个及以上的元素则要引入快慢指针 二.思路引入 设置两个快慢指针,快指针走2步,慢指针走1步(不论快慢指…

[BeginCTF]真龙之力

安装程序 双击安装 出现了安装失败的标签&#xff0c;开发者不允许测试。 查看Mainfest入口文件 <?xml version"1.0" encoding"utf-8"?> <manifest xmlns:android"http://schemas.android.com/apk/res/android" android:versionCo…

单片机学习笔记---LED点阵屏显示图形动画

目录 LED点阵屏显示图形 LED点阵屏显示动画 最后补充 上一节我们讲了点阵屏的工作原理&#xff0c;这节开始代码演示&#xff01; 前面我们已经说了74HC595模块也提供了8个LED&#xff0c;当我们不使用点阵屏的时候也可以单独使用74HC595&#xff0c;这8个LED可以用来测试7…

js中new操作符详解

文章目录 一、是什么二、流程三、手写new操作符 一、是什么 在JavaScript中&#xff0c;new操作符用于创建一个给定构造函数的实例对象 例子 function Person(name, age){this.name name;this.age age; } Person.prototype.sayName function () {console.log(this.name) …

零基础学Python(8)— 流程控制语句(上)

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。流程控制语句是编程语言中用于控制程序执行流程的语句&#xff0c;本节课就带大家认识下Python语言中常见的流程控制语句&#xff01;~&#x1f308; 目录 &#x1f680;1.程序结构 &#x1f680;2.最简单的if语句 &a…

NGINX upstream、stream、四/七层负载均衡以及案例示例

文章目录 前言1. 四/七层负载均衡1.1 开放式系统互联模型 —— OSI1.2 四/七层负载均衡 2. Nginx七层负载均衡2.1 upstream指令2.2 server指令和负载均衡状态与策略2.2.1 负载均衡状态2.2.2 负载均衡策略 2.3 案例 3. Nginx四层负载均衡的指令3.1 stream3.2 upstream指令3.3 四…

掌握Vue,开启你的前端开发之路!

介绍&#xff1a;Vue.js是一个构建数据驱动的Web应用的渐进式框架&#xff0c;它以简洁和轻量级著称。 首先&#xff0c;Vue.js的核心在于其视图层&#xff0c;它允许开发者通过简单的模板语法将数据渲染进DOM&#xff08;文档对象模型&#xff09;。以下是Vue.js的几个重要特点…

哈希表(Hash Table)-----运用实例【通过哈希表来管理雇员信息】(java详解) (✧∇✧)

目录 一.哈希表简介&#xff1a; 实例介绍&#xff1a; 类的创建与说明&#xff1a; 各功能图示&#xff1a; 1.class HashTab{ }; 2. class EmpLinkedList{ }&#xff1b; 3. class Emp{ }&#xff1b; 4.测试&#xff1a; 运行结果&#xff1a; 最后&#xff0c;完整…

LeetCode-第28题-找出字符串中第一个匹配项的下标

1.题目描述 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 2.样例描述 3.思路描述 可以让字符串 …

uTools工具使用

之前发现一款非常有用的小工具&#xff0c;叫uTools&#xff0c;该软件集成了比如进制转换、json格式化、markdown、翻译、取色等等集插件大成&#xff0c;插件市场提供了很多开源插件工具。可以帮助开发人员节省了寻找各种处理工具的时间&#xff0c;非常推荐。 1、软件官方下…

R语言rmarkdown使用

1、安装 install.packages(rmarkdown) library(rmarkdown) install.packages(tinytex) tinytex::install_tinytex() 2、新建R Markdown 3、基本框架 红色框内为YAML&#xff1a;包括标题、作者和日期等 黄色框内为代码块&#xff1a;执行后面的代码&#xff0c;并可以设置展…

LLMs之Llama2 70B:《Self-Rewarding Language Models自我奖励语言模型》翻译与解读

LLMs之Llama2 70B&#xff1a;《Self-Rewarding Language Models自我奖励语言模型》翻译与解读 目录 《Self-Rewarding Language Models》翻译与解读 Abstract 5 Conclusion结论 6 Limitations限制 《Self-Rewarding Language Models》翻译与解读 地址 文章地址&#xff1…