[flink 实时流基础] flink 源算子

学习笔记

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png


文章目录

        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据

在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/505070.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PostCSS的安装及使用(1):安装

在不同操作系统上安装PostCSS的步骤大致相同&#xff0c;因为PostCSS是基于Node.js的一个JavaScript工具&#xff0c;是依赖于Node.js环境和npm&#xff08;Node包管理器&#xff09;。 PostCSS官网&#xff1a; GitHub地址&#xff1a; GitHub - postcss/postcss: Transformi…

WebScraper网页数据爬取可视化工具使用(无需编码)

前言 Web Scraper 是一个浏览器扩展&#xff0c;可以实现无需编码即可爬取网页上的数据。只需按照规则进行配置&#xff0c;即可实现一键爬取导出数据。 安装 进入Google应用商店安装此插件&#xff0c;安装步骤如下&#xff1a; 进入Google应用商店需要外网VPN才能访问&…

学生寝室恶性负载识别模块

学生寝室恶性负载识别模块系统是石家庄光大远通电气公司一种专门用于学生寝室电力管理的系统&#xff0c;旨在识别并限制使用违规或高风险的电器设备&#xff0c;以保障学生的用电安全和节约能源。该系统通过先进的电力检测技术和算法&#xff0c;能够实时监测寝室内的电力使用…

react ts 封装搜索条件

封装 import React, { ReactNode, useImperativeHandle, forwardRef } from react; import { Card, Form, Input, Button, Row, Col } from antd;interface Iprops {formItem: any,getParams: (params: any) > void,reset?: () > void | undefined,isButton?: boolean…

MySQL学习之连接查询

笛卡尔乘积现象 在表的连接查询方面有一种现象被称为&#xff1a;笛卡尔积现象。 笛卡尔积现象&#xff1a; 当两张表进行连接查询的时候&#xff0c;没有任何条件进行限制&#xff0c;最终的查询结果条数是两张表记录条数的乘积。 select ename,dname from emp,dept; 避免…

YOLOV5训练自己的数据集教程(万字整理,实现0-1)

文章目录 一、YOLOV5下载地址 二、版本及配置说明 三、初步测试 四、制作自己的数据集及转txt格式 1、数据集要求 2、下载labelme 3、安装依赖库 4、labelme操作 五、.json转txt、.xml转txt 六、修改配置文件 1、coco128.yaml->ddjc_parameter.yaml 2、yolov5x.…

如何制作伸缩侧边栏?

目录 一、html-body 二、CSS 三、JS 四、完整代码 五、效果展示 一、html-body 侧边栏的伸缩需要用户触发事件&#xff0c;这里使用button为例&#xff0c;用户点击按钮实现侧边栏的打开和关闭。 <body><!-- 按钮&#xff0c;可以用文字、图片等作为事件源&am…

wails 创建Go 项目

##wails是一个可以让go和web技术编写桌面应用#安装wails go install github.com/wailsapp/wails/v2/cmd/wailslatest#检查环境 wails doctor 创建项目wails init -n AuxiliaryTools -t vue-ts拉取go.mod的依赖项 go mod tidy进入 frontend 前端安装依赖项npm install /pnpm ins…

透视未来安全:PIR技术引领数据隐私新时代

1.隐语实现PIR总体介绍 隐语实现的Private Information Retrieval (PIR) 是一种隐私增强技术&#xff0c;它使用户能够在不暴露他们实际查询内容的情况下从远程服务器数据库中检索所需信息。以下是隐语在实现PIR方面的概要说明和技术特点&#xff1a; 基本概念&#xff1a; PI…

Arcgis根据要素面获取要素中心点并计算中心点坐标

一、要素面获取要素中心点 1、加载数据 2、找到“要素转点”工具 打开ArcTool box工具&#xff0c;数据管理工具—要素—要素转点&#xff0c;或者打开搜索器直接搜索“要素转点”即可 3、要素转点 弹出转换界面之后&#xff0c;输入面状要素&#xff0c;设置保存路径&#…

零代码编程:用kimichat将PDF自动批量分割成多个图片

有一个PDF文件&#xff0c;现在想把pdf文件转换成图片&#xff0c; 可以在kimichat中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个将PDF文件自动批量分割成多个图片的任务&#xff0c;具体步骤如下&#xff1a; 打开d盘下的pdf文件&#xff1a;Ill …

Python开源项目月排行 2024年3月

Python 趋势月报&#xff0c;按月浏览往期 GitHub,Gitee 等最热门的Python开源项目&#xff0c;入选的项目主要参考GitHub Trending,部分参考了Gitee和其他。排名不分先后&#xff0c;都是当前月份内相对热门的项目。 入选公式&#xff1d;70%GitHub Trending20%Gitee10%其他 …

Emotet分析

Emotet分析 编写启动器方便调试。 #include <iostream> #include<windows.h>typedef int(WINAPI* fnRunDLL)(); fnRunDLL My_RunDLL;int main() {HMODULE hModule LoadLibraryA("C:\\Users\\xiao\\Desktop\\samples\\1.dll");My_RunDLL (fnRunDLL)Ge…

[ESP32]:基于esp-modbus实现serial从机

[ESP32]&#xff1a;基于esp-modbus实现serial从机 开发环境&#xff1a; esp idf 5.1esp-modbus 1.0.13 使用如下指令添加组件&#xff0c;或者访问esp-modbus idf.py add-dependency "espressif/esp-modbus^1.0.13"1.mb_register_area_descriptor_t 对于slave…

华为OD机试 - 芯片资源限制(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试…

【Linux】普通用户提升权限

概述 在Linux环境下&#xff0c;给普通用户提权的方式&#xff0c;su与sudo命令&#xff0c;su是将一个普通用户登录为root&#xff0c;而sudo则是将普通用户短暂提升权限 普通用户使用$ root使用# 使用su提升权限 如果我们使用su将用户提升为root&#xff0c;此时需要输入…

【AI+儿童绘本】从0到1制作儿童绘本故事操作思路

今天刷了会小H书&#xff0c;无意刷到一些 睡前儿童绘本故事&#xff0c; 下面一堆评论说 求软件什么的&#xff0c;博主只是引流没做任何回复。 这里写一篇文章科普下吧&#xff0c;免得有人被割韭菜。 制作儿童绘本&#xff0c; 大概这几个步骤。1、写生动有趣的故事&#x…

jmeter性能测试如何实现分布式部署

jmeter什么要做分布式部署&#xff1f; jmeter是运行在JVM虚拟机上的&#xff0c;当模拟大量并发时&#xff0c;对运行机器的性能/网络负载会很大。 此时就需要使用jmeter的分布式部署功能&#xff0c;实现多台被控机器同时并发访问被测系统。 原理图&#xff1a; 准备工作&…

【Spring】Spring框架中的一个核心接口ApplicationContext 简介,以及入口 Run() 的源码分析

一、简介 ApplicationContext 是Spring框架中的一个核心接口&#xff0c;它是Spring IoC容器的实现之一&#xff0c;用于管理和组织应用程序中的各种Bean&#xff0c;同时提供了一系列功能来支持依赖注入、AOP等特性。 简单来说&#xff0c;ApplicationContext 是一个大型的、…

算法学习——LeetCode力扣补充篇5 (52. N 皇后 II、649. Dota2 参议院、1221. 分割平衡字符串、5. 最长回文子串)

算法学习——LeetCode力扣补充篇5 52. N 皇后 II 52. N 皇后 II - 力扣&#xff08;LeetCode&#xff09; 描述 n 皇后问题 研究的是如何将 n 个皇后放置在 n n 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回 n 皇后问题 不同的…