Flink入门实战详解

Flink入门实战

Flink项目构建

1)基于Maven+Idea创建项目:

使用maven进行项目构建,如图1所示。

图-34 构建maven项目

输入项目中的maven的坐标和存储坐标,如图2所示。

图2 maven坐标和存储位置

2)Maven依赖:

    <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.12</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <hadoop.version>2.6.0</hadoop.version>
    <flink.version>1.9.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <!-- flink-2-hadoop-->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop-2</artifactId>
        <version>2.7.5-9.0</version>
    </dependency>
    <!-- lombok -->
    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <type>pom</type>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58</version>
    </dependency>
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!--<arg>-make:transitive</arg>-->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <!--
                                    zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                    -->
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>chapter1.BatchWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Flink基础API概念

Flink编程是在分布式集合的基础的规律的编程模型(比如,执行filtering,mapping,updating,state,joining,grouping,defining,windows,aggregating)。这些集合可以通过外部数据源(比如从文件,kafka的topics、本地或者内存的集合)。通过下沉算子返回结果,比如将数据写入到一个分布式的文件中,或者控制台。Flink程序可以基于各种context、stanalone或者嵌入其他程序进行运行。可以在本地的jvm或者在多台机器间分布式运行。

基于外部的数据源,比如有界或者无界的数据源,我们可能会选择使用批处理的DataSet API或者流处理的DataStream API来处理。

需要注意的是,在DataStream和DataSet中的绝大多数的API是一致的,只需要替换对应的ExecutionEnvironment或者StreamExecutionEnvironment即可。

Flink在编程的过程中使用特定类——DataSet和DataStream来体现数据,类似Spark中的RDD。可以将其认为是一个可以拥有重复的不可变的集合。其中DataSet表示的是一个有界的数据集,DataStream则表示的是无界的集合。

这些集合在一些关键的地方和Java中的普通集合不同。首先,DataSet和DataStream是不可变的,这就意味着一旦被创建,便不能进行add或者remove的操作。同样也不能简单的查看集合内部的元素。

Flink可以通过外部的数据源来创建DataSet或者DataStream,也可以通过在一个已知的集合上面执行一系列的Transformation操作来转换产生新的集合。

Flink程序看起来就是一个普通的程序,进行数据的转换,每一个程序包含如下相同的集合基础概念,通用编程步骤如下:

1)创建一个执行环境ExecutionEnvironment。

2)加载或者创建初始化数据——DataSet或者DataStream。

3)在此数据基础之上进行特定的转化操作。

4)将计算的结果输出到特定的目的地。

5)触发作业的执行。

Flink编程的入口,便是ExecutionEnvironment,不同之处在于,DataSet和DataStream使用的ExecutionEnvironment不同。DataSet使用ExecutionEnviroment,而DataStream使用StreamExectionEnvironment。

获得ExecutionEnvironment可以通过ExecutionEnvironment的如下方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这种方式会自动选择正确的context。如果我们在IDE中执行,则会创建一个Local的Context,如果打包到集群中执行,会返回一个Cluster的Context。

加载数据源的方式有多种。可以一行一个的读入,比如CSV文件,或者自定义格式。如果只是从一个文本文件中按顺序读取行数据。只需要如下操作即可。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path")

创建了一个DataStream或者DataSet,接下来便可以执行各种transformation转换操作。比如执行一个map操作。

创建一个新的DataStream,类型为Integer的集合。

DataStream中包含了最终的结果,我们可以将结果通过创建一个sink操作,写入外部系统中,比如:writeAsText(path)

一旦我们完成整个程序,我们需要通过调用StreamExecutionEnvironment的execute()方法来触发作业的执行。基于ExecutionEnvironment会在本地或者集群中执行。

execute()方法返回值为JobExecutionResult,包含本次执行时间或者累加器结果信息。

与Spark中的Transformation操作相同,Flink中的Transformation操作是Lazy懒加载的,需要execute()去触发。基于此,我们可以创建并添加程序的执行计划。进行任务调度和数据分离,执行更加高效。

目前Flink支持7种数据类型,分别为:

1)Java Tuples和Scala Case Classes。

2)Java POJOS(一种数据结构类型)。

3)Primitive Types(Java的基本数据类型)。

4)Regular Classes(普通类)。

5)Values。

6)Hadoop Writables。

7)SpecialTypes。

DataSet批处理API

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接, 分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._

object WordCountOps {

def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements("Who's there?",

"I think I hear them. Stand, ho! Who's there?"

)

val wordCounts:DataSet[(String, Int)] = text

.flatMap(line => line.split("\\s+")).map((_, 1))

.groupBy(0)

.sum(1)

wordCounts.print()

}

}

Streaming流式处理API

Flink中的DataStream程序是实现数据流转换的常规程序(例如 filtering, updating state, defining windows, aggregating)。最初从各种源(例如, message queues, socket streams, files)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object StreamDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val file = env.socketTextStream("localhost", 9999)
        val spliFile: DataStream[String] = file.flatMap(_.split(" "))
        val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
        val keyed = wordAndOne.keyBy(data=>data._1)
        val wordAndCount: DataStream[(String, Int)] = keyed.sum(1)
        wordAndCount.print()
        env.execute()
    }
}

要运行示例程序,首先从终端使用netcat启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。

Flink程序提交到集群

1)Web提交方式:

图3 web提交方式

1)脚本方式:

#!/bin/sh

FLINK_HOME=/home/bigdata/apps/flink

$FLINK_HOME/bin/flink run \

-c BatchDemo \

/root/wc.jar \

hdfs://hadoop101:8020/wordcount/words.txt \

hdfs://hadoop101:8020/wordcount/output3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/728937.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

虚幻引擎 Gerstner Waves -GPU Gems 从物理模型中实现有效的水体模拟

1.1 目标与范围 我们从简单的正弦函数开始&#xff0c;然后逐步过渡到更复杂的函数&#xff0c;以适应需要。 本章主要解释系统参数的物理意义&#xff0c;表明将水表面近似为正弦波的总和并不像人们通常认为的那样是随意的。我们特别关注将基本模型转换为实际实现所需的数学方…

Linux系统资源监控nmon工具下载及使用介绍

一、资源下载 夸克网盘链接&#xff1a;https://pan.quark.cn/s/2684089bc34d 里面包含了各种分享的实用工具&#xff0c;nmon在 Linux服务器监控nmon工具 文件夹内 文件说明&#xff1a; nmon16p_binaries.tar.gz 为最新的nmon官方工具包&#xff0c;支持linux全平台 nmo…

钢琴块小游戏(附源码)

代码结构 app.png是游戏运行主界面的图片&#xff08;可以加载自己喜欢的主界面图片&#xff09; Assets文件夹里面装的是一些需要用到的游戏图片 全部都可以替换为自己喜欢的图片 Fonts里面装的是 Sounds文件夹里面装的是 一 . 主程序代码 1.运行这个代码使得游戏开始 2.主界面…

【机器学习 复习】第6章 支持向量机(SVM)

一、概念 1.支持向量机&#xff08;support vector machine&#xff0c;SVM&#xff09;&#xff1a; &#xff08;1&#xff09;基于统计学理论的监督学习方法&#xff0c;但不属于生成式模型&#xff0c;而是判别式模型。 &#xff08;2&#xff09;支持向量机在各个领域内的…

健康与生活助手:Kompas AI的高效应用

一、引言 在现代社会&#xff0c;随着生活节奏的加快和工作压力的增加&#xff0c;人们的健康问题日益凸显。健康管理已经成为每个人关注的重点。Kompas AI作为一款智能助手&#xff0c;通过其先进的人工智能技术&#xff0c;为用户提供全面的健康管理服务&#xff0c;帮助用户…

【C++知识点】类和对象:友元,运算符重载,多态

今天来继续了解类和对象&#xff01; PS.本博客参考b站up黑马程序员的相关课程&#xff0c;老师讲得非常非常好&#xff01; 封装 深拷贝与浅拷贝 浅拷贝&#xff1a;简单的赋值拷贝操作 深拷贝&#xff1a;在堆区重新申请空间&#xff0c;进行拷贝操作 首先&#xff0c…

【头歌】HBase扫描与过滤答案 解除复制粘贴限制

解除复制粘贴限制 当作者遇到这个限制的时候火气起来了三分&#xff0c;然后去网上搜索答案&#xff0c;然后发现了一位【碳烤小肥肠】居然不贴代码&#xff0c;XX链接&#xff0c;贴截图&#xff0c;瞬时火气冲顶&#xff0c;怒写此文 首先启动万能的控制台&#xff0c;然后C…

【Hadoop大数据技术】——期末复习(冲刺篇)

&#x1f4d6; 前言&#xff1a;快考试了&#xff0c;做篇期末总结&#xff0c;都是重点与必考点。 题型&#xff1a;简答题、编程题&#xff08;Java与Shell操作&#xff09;、看图分析题。题目大概率会从课后习题、实验里出。 课本&#xff1a; 目录 &#x1f552; 1. HDF…

数据结构--单链表(图文)

单链表的概念 在单链表中&#xff0c;每个元素&#xff08;称为节点&#xff09;包含两部分&#xff1a;一部分是存储数据的数据域&#xff0c;另一部分是存储下一个节点地址的指针域。这里的“单”指的是每个节点只有一个指向下一个节点的指针。 节点&#xff1a;链表中的基…

java-数据结构与算法-02-数据结构-01-数组

文章目录 1. 概述2. 动态数组3. 二维数组4. 局部性原理5. 越界检查6. 习题 1. 概述 定义 在计算机科学中&#xff0c;数组是由一组元素&#xff08;值或变量&#xff09;组成的数据结构&#xff0c;每个元素有至少一个索引或键来标识 In computer science, an array is a dat…

如何与精益管理咨询公司进行有效的沟通?

在现代企业管理中&#xff0c;精益管理咨询公司发挥着不可或缺的作用&#xff0c;它们通过提供专业的精益管理咨询服务&#xff0c;帮助企业优化运营流程&#xff0c;提升生产效率&#xff0c;降低成本&#xff0c;实现可持续发展。然而&#xff0c;与精益管理咨询公司进行有效…

软件测评中心▏软件安全测试的测试方法和注意事项介绍

软件安全测试是一种重要的测试活动&#xff0c;旨在评估和验证软件系统中潜在的安全风险&#xff0c;并提供可行的解决方案。通过对软件系统进行系统化的测试&#xff0c;可以及时发现和修复安全漏洞&#xff0c;保护软件系统的安全性。 软件安全测试的测试方法可以帮助测试人…

深度学习500问——Chapter11:迁移学习(4)

文章目录 11.3.8 流形学习方法 11.3.9 什么是finetune 11.3.10 finetune为什么有效 11.3.11 什么是网络自适应 11.3.12 GAN在迁移学习中的应用 参考文献 11.3.8 流形学习方法 什么是流行学习&#xff1f; 流行学习自从2000年在Science上被提出来以后&#xff0c;就成为了机器…

ASP.NET Core 中使用 Dapper 的 Oracle 存储过程输出参数

介绍 Oracle 数据库功能强大&#xff0c;在企业环境中使用广泛。在 ASP.NET Core 应用程序中使用 Oracle 存储过程时&#xff0c;处理输出参数可能具有挑战性。本教程将指导您完成使用 Dapper&#xff08;适用于 . NET 的轻量级 ORM&#xff08;对象关系映射器&#xff09;&am…

Python数据分析-对驾驶安全数据进行了预测

一、研究背景和意义 随着汽车保有量的不断增加&#xff0c;交通事故已成为全球范围内的重大公共安全问题。每年因交通事故造成的人员伤亡和财产损失给社会带来了巨大的负担。为了提高驾驶安全&#xff0c;减少交通事故的发生&#xff0c;许多研究致力于探索影响驾驶安全的因素…

模式分解的概念(上)-分解、无损连接性、保持函数依赖特性

一、分解的概念 1、分解的定义 2、判断一个关系模式的集合P是否为关系模式R的一个分解 只要满足以下三个条件&#xff0c;P就是R的一个分解 &#xff08;1&#xff09;P中所有关系模式属性集的并集是R的属性集 &#xff08;2&#xff09;P中所有不同的关系模式的属性集之间…

如何通过自定义模块DIY出专属个性化的CSDN主页?一招教你搞定!

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 &#x1f4af;如何通过HTMLCSS自定义模板diy出自己的个性化csdn主页&#x…

本地快速部署大语言模型开发平台Dify并实现远程访问保姆级教程

文章目录 前言1. Docker部署Dify2. 本地访问Dify3. Ubuntu安装Cpolar4. 配置公网地址5. 远程访问6. 固定Cpolar公网地址7. 固定地址访问 前言 本文主要介绍如何在Linux Ubuntu系统使用Docker快速部署大语言模型应用开发平台Dify,并结合cpolar内网穿透工具实现公网环境远程访问…

解决element-plus没有导出的成员FormInstance

使用element-plus的el-form时&#xff0c;报错“"element-plus"”没有导出的成员“FormInstance”。你是否指的是“FooterInstance”? 解决方法&#xff1a; 引入ElForm类型&#xff0c;在外重新定义FormInstance的类型为ElForm的实例类型 示例&#xff1a; import…

记录keras库中导入函数找不到的问题

1 . keras.preprocessing.text import Tokenizer 将最右边的点 " . " 修改成 " _ " : 2 . 相应函数/库找不到&#xff0c;在keras后面加一个api :