大数据学习之Flink算子、了解(Source)源算子(基础篇二)

Source源算子(基础篇二)


目录

Source源算子(基础篇二)

二、源算子(source)

1. 准备工作

2.从集合中读取数据

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

说明:

4. 从Socket读取数据

(1)编写StreamWordCount

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

(3)启动 StreamWordCount 程序

(4)从 bigdata1 发送数据:

(5)看控制台的输出结果

5.从Kafka读取数据

6.自定义源算子(source)

7.Flink支持的数据类型


二、源算子(source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理 程序的输入端。

Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法:

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

1. 准备工作

case class Event(user: String, url: String, timestamp: Long)

2.从集合中读取数据

  • 最简单的读取数据的方式,就是在代码中直接创建一个集合,然后调用执行环境的 fromCollection 方法进行读取。
  • 这相当于将数据临时存储到内存中,形成特殊的数据结构后, 作为数据源使用,一般用于测试。
import org.apache.flink.streaming.api.scala._

case class Event(user: String, url: String, timestamp: Long)

object SourceCollection {
  def main(args: Array[String]): Unit = {
    //获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度(并行任务的数量)为1
    env.setParallelism(1)
    // 创建包含点击事件的列表
    // 点击操作中包含两个事件
    val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
    //将列表作为流输出
    //把clicks作为数据流
    val stream = env.fromCollection(clicks)
    //fromElements从给定的元素集合中创建一个DataStream
    val stream1 = env.fromElements(
      Event("zhangsan","/.opt",1000L),
      Event("lisi","/.opt",2000L)
    )
    stream.print("stream")
    stream1.print("stream1")
    env.execute()
  }
}

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中 获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

val stream = env.readTextFile("input/words.txt")
说明:
  • 参数可以是文件,可以是目录

  • 可以是绝对路径,也可以是相对路径

  • 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录;

    • 系统属性 user.dir:这是一个Java系统属性,它表示用户当前的工作目录。在很多应用中,它通常被用作参考路径。

    • IDEA下是project的根目录:当你在IDEA中打开一个项目时,项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。

    • standalone模式下是集群节点根目录:如Hadoop分布式计算系统中的独立模式(standalone mode)。在这种模式下,路径可能是相对于集群节点的根目录。

  • 也可以从 HDFS 目录下读取, 使用路径 hdfs://...

    • 前提要在pom文件中添加hadoop相关依赖

4. 从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无 界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差,一般也是用于测试。

//通过主机名和端口号读取socket文本流
    val linDs = env.socketTextStream("bigdata1",7777)

具体实现案例:

(1)编写StreamWordCount

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //通过主机名和端口号读取socket文本流
    val linDs = env.socketTextStream("bigdata1",7777)
    //进行转换计算
    val result = linDs
      .flatMap(data => data.split(" ")) //用空格切分字符串
      .map((_,1)) //切分后的字符串转换为一个元组
      .keyBy(_._1) //使用元组的第一个字段进行分组
      .sum(1) //分组后的数据的第二个字段进行累加
    //打印计算结果
    result.print()
    env.execute()
  }
}

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

$ nc -lk 7777

(3)启动 StreamWordCount 程序

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从 bigdata1 发送数据:

hello flink
hello world
hello scala

(5)看控制台的输出结果

5.从Kafka读取数据

Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。

而消息队列的传输方式,恰恰和流处理是完全一致的。

所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。

在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选

调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic 列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条数据流中 去。
  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
更新中...

6.自定义源算子(source)

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

7.Flink支持的数据类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/343829.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

“探索C语言操作符的神秘世界:从入门到精通的全方位解析“

各位少年,我是博主那一脸阳光,今天来分享深度解析C语言操作符,C语言操作符能帮我们解决很多逻辑性的问题,减少很多代码量,就好比数学的各种符号,我们现在深度解剖一下他们。 前言 在追求爱情的道路上&…

Google ASPIRE框架:赋予大型语言模型(LLMs)自我评估的新动力

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

Vue生命周期;综合案例;工程化开发入门

Vue的生命周期 和 生命周期的四个阶段 思考: 什么时候可以发送初始化渲染请求?(越早越好:最早可以早到什么时候?) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命…

【C++】C++入门(一)

个人主页 : zxctsclrjjjcph 文章封面来自:艺术家–贤海林 如有转载请先通知 文章目录 1. 前言2. C关键字3. 命名空间3.1 命名空间定义3.2 命名空间的使用 4. C输入&输出 1. 前言 C是在C的基础之上,容纳进去了面向对象编程思想&#xff0…

4G物联网LED智慧路灯杆显示屏产品介绍

4GLED显示屏是一种具有4G网络连接功能的LED显示屏。它可以通过4G网络连接到互联网,实现远程管理和控制,方便进行内容更新和管理。同时,4GLED显示屏具有高亮度、高清晰度和高对比度的特点,可以提供清晰明亮的图像和视频展示效果。它…

【前端小点】Vue3中的IP输入框组件

本文章记录,如何在vue3项目开发中,使用ip输入框组件. 之前写过vue2版本的ip组件,为了更好的适应vue3,此次进行vue3代码重写 先上效果图: 禁用效果图: 主要是组件的开发,代码如下,可直接拷贝使用. 大概思路就是: 使用四个输入框拼接,然后给输入内容添加校验操作,添加光标移动,…

灵眸边缘计算产品学习

EASY EAI灵眸科技 | 让边缘AI落地更简单 (easy-eai.com) 产品简介 支持4路1080P30fps视频流采集,四核CPU1.5GHz与2Tops AI边缘算力能力。集成有以太网、Wi-Fi、4G等网络通信外设;RS232、RS485、UART等本地通信接口。HDMI显示屏接口、音频输入输出等交互…

ntp时间适配服务器和ssh免密登录

1.配置ntp时间服务器,确保客户端主机能和服务主机同步时间 服务端server向阿里时间服务器进行时间同步 第一步:定位服务端server #安装软件 [rootserver ~]# yum install chrony -y # 编辑配置文件,定位第3行,修改…

小程序直播项目搭建

项目功能: 登录实时聊天点赞功能刷礼物取消关注用户卡片直播带货优惠券直播功能 项目启动: 1 小程序项目创建与配置: 第一步 需要登录小程序公众平台的设置页面进行配置: 首先需要是企业注册的才可以个人不能开通直播功能。服务类…

动态gif图怎么在线做?简单三步快速上手

使用gif动态图片能够增加图片的吸引力和趣味性,在很多社交平台上gif动态都是用来表达自己的心情的。而且,gif动图可以用于创意设计和艺术制作的宣传等。那么,要怎么制作呢?这时候使用gif制作(https://www.gif.cn/&…

【学网攻】 第(6)节 -- 三层交换机实现VLAN间路由

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan【学网攻】 第(5)节 -- Cisco VTP的使用 前言 第5章给大家讲了VTP,也是为这节课铺垫,带领大家慢慢进入路由的区…

安全基础~通用漏洞2

文章目录 知识补充盲注Boolean盲注延时盲注报错注入二次注入 知识补充 盲注常用 if(条件,5,0) #条件成立 返回5 反之 返回0 left(database(),1),database() #left(a,b)从左侧截取a的前b位 盲注 盲注就是在注入过程中,获取的数据不能回显至前端页面。 …

vivado: 设置里配置改了之后,总是在下次重启时重置的解决

我以前改字体大小,和改notepad编辑器都遇到,下一次打开就又是默认配置 解决: 1. c盘路径下,找到这个.xml文件,用记事本打开 2. 直接拉到记事本最后,我圈起来这里的路径不能有中文,所以要去把…

C++面试宝典第24题:袋鼠过河

题目 一只袋鼠要从河这边跳到河对岸,河很宽,但是河中间打了很多桩子。每隔一米就有一个桩子,每个桩子上都有一个弹簧,袋鼠跳到弹簧上就可以跳得更远。每个弹簧力量不同,用一个数字代表它的力量,如果弹簧力量为5,就代表袋鼠下一跳最多能够跳5米;如果为0,就会陷进去无法…

【GitHub项目推荐--人脸识别】【转载】

01 带有移动应用程序的人脸识别库 OpenFace 作为用于人脸识别的通用库,能够实现瞬态和移动人脸识别,目前在 GitHub 上斩获 14291 Star。以下为 LFW 数据集 Sylvestor Stallone 输入单个图像的流程。 项目地址:https://github.com/cmusatya…

【zlm】针对单个设备的码率的设置

目录 代码修改 实验数据一 实验数据二 同时拉一路视频后 修改记录 使用方法 各库实操 代码修改 要被子类引用 ,所以放在protected 不能放private 下面的结论,可以在下面的实验数据里引用。“同时拉一路视频后” 实验数据一 https://10.60.3.45:1…

【ZYNQ入门】第十篇、基于FPGA的图像白平衡算法实现

目录 第一部分、关于白平衡的知识 1、MATLAB 自动白平衡算法的实现 1.1、matlab代码 1.2、测试效果 1.3 测试源图 2、为什么摄像头采集的图像要做白平衡 3、自动白平衡算法总结 4、FPGA设计思路 4.1、实时白平衡的实现 4.2、计算流程优化思路 第二部分、硬件实…

机器学习之numpy库

机器学习之numpy库 numpy库概述numpy库历史numpy的核心numpy基础ndarray数组内存中的ndarray对象ndarray数组对象的特点ndarray数组对象的创建ndarray对象属性的基本操作数组的维度元素的类型数组元素的个数数组元素索引(下标) ndarray对象数组的自定义类型切片操作一维数组切片…

后端开发_单元测试

后端开发_单元测试 1. 简介2. JUnit 4使用方法2.1 jar包引入2.2 测试用例1. 简介 2. JUnit 4使用方法 2.1 jar包引入 1. 本地依赖引入方式 Junit4.jar包 2. maven方式引入jar <dep

AMIS的组件学习使用

部分代码片段 {"id": "filterForm","className": " xysd-zbkb-pubquery","labelWidth": 130,"body": [{"type": "grid","className": "xysd-grid-query-input","c…