创建第一个 Flink 项目

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
</dependencies>

<build>
   <plugins>
       <!-- 该插件用于将Scala代码编译成class文件 -->
       <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>3.4.6</version>
           <executions>
               <execution>
                   <goals>
                       <!--声明绑定到 maven 的compile阶段-->
                       <goal>compile</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-assembly-plugin</artifactId>
           <version>3.0.0</version>
           <configuration>
               <descriptorRefs>
                   <descriptorRef>jar-with-dependencies</descriptorRef>
               </descriptorRefs>
           </configuration>
           <executions>
               <execution>
                   <id>make-assembly</id>
                   <phase>package</phase>
                   <goals>
                       <goal>single</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._

/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理的执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据
    var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by
    val resultDataSet: DataSet[(String,Int)] = inputDateSet
      .flatMap(_.split(" "))//分词得到所有 word构成的数据集
      .map((_,1))//_表示当前 word 转换成一个二元组(word,count)
      .groupBy(0)//以二元组中第一个元素作为key
      .sum(1) //1表示聚合二元组的第二个元素的值

    //打印输出
    resultDataSet.print()
  }
}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flink

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流
   val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流  hostname:prot 从程序运行参数中读取
   val params: ParameterTool = ParameterTool.fromArgs(args);
   val hostname: String = params.get("host");
   val port: Int = params.getInt("port");
   val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/226725.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringbootWeb登录认证

登录校验 会话技术 会话&#xff1a;用户打开浏览器&#xff0c;访问web服务器的资源&#xff0c;会建立会话&#xff0c;直到有一方断开连接&#xff0c;会话结束。在一次会话中可以包含多次请求和响应。会话跟踪&#xff1a;一种维护浏览器状态的方法&#xff0c;服务器需要…

使用postman做接口测试

1.接口测试&#xff1a;针对软件对外提供服务的接口的输入输出进行测试&#xff0c;以及接口间相互逻辑的测试&#xff0c;验证接口功能与接口描述文档的一致性 2.接口测试流程&#xff1a; 1&#xff09;获取接口信息&#xff1a;通过接口文档或抓包来获取接口的基本调用方式和…

hive映射es表任务失败,无错误日志一直报Task Transitioned from NEW to SCHEDULED

一、背景 要利用gpt产生的存放在es种的日志表做统计分析&#xff0c;通过hive建es的映射表&#xff0c;将es的数据拉到hive里面。 在最初的时候同事写的是全量拉取&#xff0c;某一天突然任务报错&#xff0c;但是没有错误日志一直报&#xff1a;Task Transitioned from NEW t…

漏洞复现--Apache Ofbiz XML-RPC RCE(CVE-2023-49070)

免责声明&#xff1a; 文章中涉及的漏洞均已修复&#xff0c;敏感信息均已做打码处理&#xff0c;文章仅做经验分享用途&#xff0c;切勿当真&#xff0c;未授权的攻击属于非法行为&#xff01;文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…

P8 Linux 目录操作

目录 前言 01 mkdir 系统调用 mkdir的代码示例 02 rmdir删除目录 03 打开、读取以及关闭目录 3.1 opendir()函数原型&#xff1a; 04 读取目录 readdir() 05 struct dirent 结构体&#xff1a; 06 rewinddir ()函数重置目录流 07 关闭目录 closedir ()函数 测试:打印…

temu缺货订单号在哪里查

在拼多多的商家后台管理系统Temu中&#xff0c;查找缺货订单号是非常重要的。及时了解缺货订单的情况&#xff0c;可以帮助商家更好地处理订单&#xff0c;提供良好的客户服务。本文将介绍在Temu中如何查找缺货订单号&#xff0c;以及处理缺货订单的步骤和注意事项。 先给大家推…

【c】小红的漂亮串

#include<stdio.h> #include<string.h> int main() {char arr[1000];int count0;gets(arr);//在数组中输入字符串int lenstrlen(arr);//求字符串长度printf("%d\n",len);for(int i0;i<len;i){if(arr[i]r){if(arr[i1]e){if(arr[i2]d){countcount1;}}}}…

SpringBoot框架+原生HTML开发,基于云端SaaS服务方式的电子病历编辑器源码

一体化电子病历编辑器源码&#xff0c;电子病历系统 一体化电子病历系统基于云端SaaS服务的方式&#xff0c;采用B/S&#xff08;Browser/Server&#xff09;架构提供&#xff0c;覆盖了医疗机构电子病历模板制作到管理使用的整个流程。除实现在线制作内容丰富、图文并茂、功能…

软著项目推荐 疫情数据分析与3D可视化 - python 大数据

文章目录 0 前言1 课题背景2 实现效果3 设计原理4 部分代码5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 大数据全国疫情数据分析与3D可视化 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff0…

Leetcode 92 反转链表II

反转链表II 题解1 一遍遍历&#xff08;穿针引线&#xff09; 给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 提示&#xff1a; 链表中节点数目…

MacOS VSCode 配置远程服务器ssh remote链接,并上传文件文件服务器

环境&#xff1a; MacOS & VSCode & ssh remote 1. VSCode安装插件 2. 配置ssh remote链接 Host 10.128.200.101HostName 10.128.200.101User rootForwardAgent yesIdentityFile ~/.ssh/id_rsa # 服务器免密登录本地秘钥文件路径 配置完保存&#xff1b; 3. 在ssh…

98基于matlab的在MIMO通信系统中功率优化算法的仿真

基于matlab的在MIMO通信系统中&#xff0c;功率优化算法的仿真&#xff0c;重点研究了注水功率分配算法。数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 98matlab功率优化功率分配 (xiaohongshu.com)

Halcon reduce_domain和scale_image的作用

在Halcon中&#xff0c;reduce_domain是用于缩小图像域&#xff08;Image Domain&#xff09;的操作。 它的作用是通过指定一个感兴趣区域&#xff08;ROI&#xff0c;Region of Interest&#xff09;&#xff0c;将图像数据限制在该区域内&#xff0c;从而实现对图像进行裁剪…

地震反演基础知识3

文章目录 地震勘探原理1 地震波1. 1 地震波概念1. 2 波的传播1. 2. 1 波传播的基本原理1. 2. 2 地震波的反射,折射,透射的1. 2. 3 地震子波&#xff08;seismic wavelet&#xff09;1. 2. 4 地震合成记录 2 地震时距曲线2.1 地震时距曲线作用2.2 不同波的时距曲线2.2.1 直达波时…

在 Windows 桌面的redis中远程连接到 VMware 中运行的 Linux 上的 Redis

先修改一下docker容器中的redis(一会连上之后看效果) 我使用的是VMware的虚拟机 选择的网络设置为桥接模式 查到虚拟机独立的ip是如下 允许 Linux 虚拟机上的 Redis 监听外部连接&#xff1a; 打开 Linux 虚拟机上的 Redis 配置文件。在大多数系统上&#xff0c;配置文件位于…

94. 二叉树的中序遍历(Java)

目录 解法&#xff1a; 思路 官方解答&#xff1a; 方法一&#xff1a;递归 思路与算法 代码&#xff1a; 复杂度分析 时间复杂度&#xff1a; 空间复杂度&#xff1a; 方法二&#xff1a;迭代 思路与算法 复杂度分析 时间复杂度&#xff1a; 空间复杂度 给定一个…

【SpringBoot】SpringBoot配置Swagger

文章目录 前言配置步骤使用步骤总结 前言 使用Swagger只需要按照规范去定义接口及接口的相关信息&#xff0c;就可以做到生成接口文档和在线接口调试页面 官网&#xff1a;Swagger官网 Knife4j是为Java MVC框架集成Swagger生成Api文档的增强解决方案 配置步骤 1.导入knife4j的m…

新课程杂志新课程杂志社新课程编辑部2023年第13期目录

教育前沿_新时代教育 基于“互联网课程思政”的小学语文教学实施路径——以部编版六年级上册《桥》为例 潘霞; 1-3 立德树人&#xff0c;德育为先——以小学语文爱国主义教育为例 王蕾; 4-6《新课程》投稿&#xff1a;cn7kantougao163.com 发挥红色资源优势 培育新…

项目设计---智力冲刺

文章目录 一. 项目描述二. 核心技术三. 需求分析概要设计四. 详细设计4.1 实现用户模块4.1.1 约定前后端交互接口4.1.2 实现数据库设计4.1.3 客户端页面展示4.1.4 服务器功能实现 4.2 实现匹配模块4.2.1 约定前后端交互接口4.2.2 客户端页面展示4.2.3 服务器功能实现 4.3 实现对…

Linux学习笔记(九)MISC设备驱动

前言 misc 的意思是混合、杂项的&#xff0c;因此 MISC 驱动也叫做杂项驱动。也就是当我们板子上的某些外设无法进行分类的时候就可以使用 MISC 驱动。 MISC 驱动其实就是最简单的字符设备驱动&#xff0c;通常嵌套在 platform 总线驱动中&#xff0c;实现复杂的驱动&#xff0…