Flink:入门介绍

目录

一、Flink简介

2.1 Flink 架构

2.2 Flink 应用程序 运行模式

二、Flink 集群 部署

2.1 本地集群模式

2.1.1  安装JDK​编辑

2.1.3 启动集群

2.1.4 停止集群

2.2 Standalone 模式

2.2.0 集群规划

2.2.1 安装JDK

2.2.2 设置免密登录

2.2.3 修改配置文件

2.2.4 启动集群

2.2.5 关闭集群

2.2.6 Standalone 高可用服务

2.3 YARN 模式

2.4 K8S 模式

三、Flink 应用 开发

3.1 编写Flink 应用程序

3.1.1 maven引入jar

3.1.2 编写代码

3.1.3 打包程序

3.2 运行 Flink 应用程序

3.2.1 命令行运行

3.2.2 WEB管理端运行

3.2.3 查看完成的任务


一、Flink简介

        Apache Flink是一个框架和分布式处理引擎,用于在无界和有界数据流上进行有状态计算。Flink 提供了数据分发以及并行化计算的能力,并且可以部署在各种集群环境中,如Hadoop YARN、Kubernetes或独立集群。

2.1 Flink 架构

        Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

        JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager:负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
  • Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
  • JobMaster:负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

        始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby。

       TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。

        必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。一个 task slot 中可以执行多个算子。

2.2 Flink 应用程序 运行模式

        当Flink应用程序编写好后,可以通过会话模式、单作业模式和应用模式等三种方式运行:

  • 会话模式(Session Mode)

        会话模式需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

  • 单作业模式(Per-Job Mode)

        会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,可以为每个提交的作业启动一个集群,作业完成后,集群就会关闭,释放资源,这就是所谓的单作业(Per-Job)模式。单作业模式运行稳定,是实际应用的首选模式。
        需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes(K8s)。

  • 应用模式(Application Mode)

        会话模式和单作业模式应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
        所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。

        应用程序各种运行模式,需要的Flink集群不一样,本地集群和Standalone集群适合会话模式;YARN模式适合单作业模式和应用模式;K8S模式不熟悉,暂时不介绍。

应用程序运行模式会话模式
(Session Mode)
单作业模式
(Per-Job Mode)
应用模式
(Application Mode)
集群部署模式
本地模式×
Standalone模式×
YARN模式
K8S模式

二、Flink 集群 部署

2.1 本地集群模式

2.1.1  安装JDK

2.1.3 启动集群

$ ./bin/start-cluster.sh

2.1.4 停止集群

$ ./bin/stop-cluster.sh

2.2 Standalone 模式

2.2.0 集群规划

flink01
192.168.179.151
flink02
192.168.179.152
flink01
192.168.179.153
JobManager
TaskManagerTaskManagerTaskManager

2.2.1 安装JDK

        在所有部署Flink的服务器上安装jkd,要求jdk17+,jdk具体安装可参考搭建环境02:安装前准备(配置CentOS7) 中的安装jdk部分。

2.2.2 设置免密登录

        在所有部署Flink的服务器上设置免密登录,具体操作可参考搭建环境02:安装前准备(配置CentOS7) 中的设置免密登录部分。

2.2.3 修改配置文件

  • config.yaml 文件

vi conf/config.yaml

jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: 192.168.179.151 #修改成本节点IP

taskmanager:
  bind-host: 0.0.0.0
  host: 192.168.179.151  #修改成本节点IP

rest:
  address: 192.168.179.151  #修改成本节点IP
  bind-address: 0.0.0.0
  port: 8081

  • masters文件

vi conf/masters

192.168.179.151:8081

  • workers文件

vi conf/workers

192.168.179.151

192.168.179.152

192.168.179.153

2.2.4 启动集群

$ ./bin/start-cluster.sh

2.2.5 关闭集群

./bin/stop-cluster.sh

2.2.6 Standalone 高可用服务

        具体可参考官方文档。

2.3 YARN 模式

        比较复杂,另开一篇专门介绍。

2.4 K8S 模式

        不熟悉,暂时不做介绍。

三、Flink 应用 开发

        Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment);
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

3.1 编写Flink 应用程序

3.1.1 maven引入jar

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.20.0</version>
</dependency>

3.1.2 编写代码

package com.yichenkeji.demo.flink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo {
    public static void main(String[] args) throws Exception {

        //1.获取一个执行环境(execution environment)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.加载/创建初始数据
        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        //3.指定数据相关的转换:乘以2
        SingleOutputStreamOperator<Integer> map = source.map(x -> x * 2);

        //4.指定计算结果的存储位置:直接输出到控制台
        map.print();

        //5.触发执行
        env.execute();
    }
}

3.1.3 打包程序

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yichenkeji.demo.flink</groupId>
    <artifactId>yichen-demo-flink</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.20.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- Replace this with the main class of your job -->
                                    <mainClass>com.yichenkeji.demo.flink.Demo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3.2 运行 Flink 应用程序

3.2.1 命令行运行

]$ ./bin/flink run  /data/flink/demo/yichen-demo-flink-1.0.jar

3.2.2 WEB管理端运行

3.2.3 查看完成的任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/934033.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

重生之我在异世界学编程之C语言:深入结构体篇(上)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文《1》 结构体的两种声明一、结构…

移动充电服务如何打破传统的新能源汽车充电难、找桩难问题?

中国新能源汽车在全球市场中占据重要地位&#xff0c;2024年上半年&#xff0c;中国新能源汽车销量达到494.4万辆&#xff0c;占全球新能源市场的68%。新能源汽车作为国家实现“碳中和”的重要途径之一&#xff0c;国家陆续出台各项产业政策&#xff0c;推动新能源汽车行业往标…

AI大模型ollama结合Open-webui

AI大模型Ollama结合Open-webui 作者:行癫(盗版必究) 一:认识 Ollama 1.什么是Ollama ​ Ollama是一个开源的 LLM(大型语言模型)服务工具,用于简化在本地运行大语言模型,降低使用大语言模型的门槛,使得大模型的开发者、研究人员和爱好者能够在本地环境快速实验、管理和…

Ubuntu Linux 图形界面工具管理磁盘分区和文件系统(八)

本文为Ubuntu Linux操作系统- 第八弹~~ 今天接着上文的内容&#xff0c;讲Linux磁盘分区存储的相关知识~ 上期回顾&#xff1a;命令行-管理磁盘分区和文件系统 今天看酷酷的雪獒铠甲&#xff01;&#xff01;雪獒铠甲合体~ 文章目录 磁盘管理器GNOME Disks主要功能安装命令 磁盘…

AI大模型的实践应用-探索智能科技的未来(附学习教程资源)

第一章&#xff1a;AI大模型技术概览 1.1 AI大模型的定义与特点 AI大模型&#xff0c;通常指的是参数数量达到数亿甚至数千亿的深度学习模型。这些模型因其庞大的参数量而得名&#xff0c;能够捕捉和学习数据中的复杂模式和关系。与传统模型相比&#xff0c;AI大模型具有更强的…

AI 数字人模型 Hallo2:让图片开口说话,一键修复模糊人脸

Hallo2 是由复旦大学 (Fudan University)、百度公司 (Baidu Inc) 和南京大学 (Nanjing University) 于 2024 年联合开发的一项先进技术&#xff0c;旨在生成长时间、高质量的唇形视频。该技术在原有的 Hallo 模型基础上进行了多项创新和改进&#xff0c;使其能够应对长时间视频…

反向代理-缓存篇

文章目录 强缓存一、Expires(http1.0 规范)二、cache-control(http1.1 出现的 header 信息)Cache-Control 的常用选项Cache-Control 常用选项的选择三、弊端协商缓存一、ETag二、If-None-Match三、Last-modified四、If-Modified-Since浏览器的三种刷新方式静态资源部署策略…

uniapp扭蛋机组件

做了一个uniapp的扭蛋机组件&#xff0c;可以前往下载地址下载 仅测试了vue2、3、h5页面微信小程序&#xff0c;理论支持全平台 使用方法简单&#xff0c;具有待机动效、抽奖中动效、掉落奖品动效&#xff0c;可以替换奖品图片&#xff0c;足以满足大部分抽奖页面需求。 示例图…

QT实战--QTreeWidget实现两种行颜色+QListWidget样式

本文主要介绍了QTreeWidget实现两种行颜色、点击打开父节点以及设置父子节点之间距离,同时附带介绍了QListWidget样式 树效果图: 列表效果图: 1.树样式的实现 1)使用代码: m_pLeftTreeWidget = new QTreeWidget(this);m_pLeftTreeWidget->setObjectName("suolue_t…

React - useActionState、useFormStatus与表单处理

参考文档&#xff1a;react18.3.1官方文档 一些概念&#xff1a; React 的 Canary 和 Experimental 频道是 React 团队用于发布和测试新功能的渠道。 useActionState useActionState 是一个可以根据某个表单动作的结果更新 state 的 Hook。 const [state, formAction, isPe…

Oracle之表空间迁移

问题背景&#xff1a;一个数据表随着时间的累积&#xff0c;导致所在表空间占用很高&#xff0c;里面历史数据可以清除&#xff0c;保留近2个月数据即可 首先通过delete删除了2个月以前的数据。 按网上的教程进行空间压缩&#xff0c;以下sql在表所在用户执行: -- 允许表重新…

如何在UI自动化测试中创建稳定的定位器?

如何在UI自动化测试中创建稳定的定位器&#xff1f; 前言1. 避免使用绝对路径2. 避免在定位器中使用索引3. 避免多个类名的定位器4. 避免动态和自动生成的ID5. 确保定位器唯一6. 处理隐藏元素的策略7. 谨慎使用基于文本的定位器8. 使用AI创建稳定的定位器 总结 前言 在自动化测…

从一条慢sql优化,深入探讨mysql的优化器优化机制

在某环境发现一个前端请求报错&#xff0c;经查为一条复杂的sql耗时约70s&#xff0c;最终导致前端响应超时。下面叙述下本次问题排查及根因分析过程&#xff0c;供其他同学参考。 本文中使用到的数据库是mariadb&#xff0c;对于mysql也是基本适用。 一&#xff0c;连接算法…

SpringBoot+OSS文件(图片))上传

SpringBoot整合OSS实现文件上传 以前,文件上传到本地(服务器,磁盘),文件多,大,会影响服务器性能 如何解决? 使用文件服务器单独存储这些文件,例如商业版–>七牛云存储,阿里云OSS,腾讯云cos等等 也可以自己搭建文件服务器(FastDFS,minio) 0 过程中需要实名认证 … 1 开…

Linux其三,yum源配置,定时任务,免密登录和查找命令

目录 一、Linux的两种软件安装方式 1、Yum源配置 2、linux中软件安装的另一种方式 rpm 3、安装mysql8.0 二、对虚拟机进行克隆 1、先关机 2、最新的状态&#xff0c;整个快照 3、开始克隆 4、修改克隆的服务器的硬件设置 5、修改克隆机的IP地址&#xff08;因为跟第一…

linux环境GitLab服务部署安装及使用

一、GitLab介绍 GitLab是利用Ruby onRails一个开源的版本管理系统&#xff0c;实现一个自托管的Git项目仓库&#xff0c;可通过Web界面进行访问公开的或者私人项目。 二、GitLab安装 1、先安装相关依赖 yum -y install policycoreutils openssh-server openssh-clients postf…

/usr/local/go/bin/go: cannot execute binary file: Exec format error

现象&#xff1a;ubuntu中安装go软件环境&#xff0c;报上述错误 原因&#xff1a;系统与软件不适配 解决&#xff1a;查看本系统的版本 找到x86-64对应的go版本即可

技术 + 舞蹈,探秘 SpringBoot 硬核广场舞团

3 系统分析 3.1 系统可行性分析 3.1.1 经济可行性 由于本系统是作为毕业设计系统&#xff0c;且系统本身存在一些技术层面的缺陷&#xff0c;并不能直接用于商业用途&#xff0c;只想要通过该系统的开发提高自身学术水平&#xff0c;不需要特定服务器等额外花费。所有创造及工作…

MySQL Workbench基本使用

MySQL Workbench 是一款由 MySQL官方开发和提供的统一可视化工具&#xff0c;专为数据库管理员、开发者和数据架构师设计。它提供了数据建模、SQL 开发和数据库管理的全面功能&#xff0c;支持 Windows、Linux 和 macOS 操作系统。 MySQL Workbench 是一个强大的工具&#xff…

fedora下Jetbrains系列IDE窗口中文乱码解决方法

可以看到窗口右部分的中文内容为小方块。 进入 Settings - Appearance & Behavior - Appearance - Use custom font : Note Sans Mono CJK SC &#xff0c;设置后如下图&#xff1a;