Flink - souce算子

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    List<WaterSensor> waterSensors = Arrays.asList(
            new WaterSensor("ws_001", 1577844001L, 45),
            new WaterSensor("ws_002", 1577844015L, 43),
            new WaterSensor("ws_003", 1577844020L, 42));
    
    env
            .fromCollection(waterSensors)
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("input/words.txt").print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();
    
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    //从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999
    env.socketTextStream("hadoop101",9999).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    Properties properties = new Properties();
    // 设置集群地址
    properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
    // 设置所属消费者组
    properties.setProperty("group.id", "flink_consumer_group");
    env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        env.addSource(new RandomWatersensor()).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {
    private Boolean running = true;

    @Override
    public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
        Random random = new Random();
        while (running){
            sourceContext.collect(new WaterSensor(
                    "sensor" + random.nextInt(50),
                    Calendar.getInstance().getTimeInMillis(),
                    random.nextInt(100)
            ));
            Thread.sleep(1000);
        }
    }

    /**
     * 大多数的source在run方法内部都会有一个while循环,
     * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
     */
    @Override
    public void cancel() {
        running = false;
    }

}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {
    public static void main(String[] args) throws Exception {


        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .addSource(new MySource("hadoop102", 9999))
          .print();

        env.execute();
    }

    public static class MySource implements SourceFunction<WaterSensor> {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }


        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            // 实现一个从socket读取数据的source
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = null;
            while (isRunning && (line = reader.readLine()) != null) {
                String[] split = line.split(",");
                ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
            }
        }

        /**
         * 大多数的source在run方法内部都会有一个while循环,
         * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
         */

        @Override
        public void cancel() {
            isRunning = false;
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/54422.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【python】使用Selenium和Chrome WebDriver来获取 【腾讯云 Cloud Studio 实战训练营】中的文章信息

文章目录 前言导入依赖库设置ChromeDriver的路径创建Chrome WebDriver对象打开网页找到结果元素创建一个空列表用于存储数据遍历结果元素并提取数据提取标题、作者、发布时间等信息判断是否为目标文章提取目标文章的描述、阅读数量、点赞数量、评论数量等信息将提取的数据存储为…

【外卖系统】菜品信息分页查询

需求分析 当菜品数据很多时&#xff0c;用分页的形式来展示列表数据 代码开发 页面发送ajax请求&#xff0c;将分页查询参数提交到服务端&#xff0c;获取分页数据页面发送请求&#xff0c;请求服务端进行图片下载&#xff0c;用于页面图片展示 构造分页 注意&#xff1a;…

Java入门指南:Java语言优势及其特点

目录 1. Java语言简介及发展概述 2. Java语言的优势 2.1 可移植性 2.2 面向对象 2.3 安全性 2.4 大量类库 3. Java语言与C/C的区别 4. 初识Java程序入口之main方法 5. 注释、标识符、关键字 5.1 注释 5.2 标识符 5.3 关键字 1. Java语言简介及发展概述 Java是一种面…

iphone备份用什么软件?好用的苹果数据备份工具推荐!

众所周知&#xff0c;如果要将iPhone的数据跟电脑进行传输备份的话&#xff0c;我们需要用到iTunes这个pc工具。但是对于iTunes&#xff0c;不少人都反映这个软件比较难用&#xff0c;用不习惯。于是&#xff0c;顺应时代命运的iPhone备份同步工具就出现了。那iphone备份用什么…

[css]margin-top不起作用问题(外边距合并)

在初学css时&#xff0c;会遇到突然间margin-top不起作用的情况。如下面&#xff1a; 情况一&#xff1a; 代码&#xff1a; <html> <head><style type"text/css"> * {margin:0;padding:0;border:0; }#outer {width:300px;height:300px;backgroun…

数据库—数据库备份(三十四)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、数据备份的重要性 三、造成数据丢失的原因 四、备份类型 4.1物理与逻辑角度 4.2数据库备份策略角度 五、常见的备份方法 5.1 物理备份 5.2 使用专用备…

短视频平台视频怎么去掉水印?

短视频怎么去水印&#xff0c;困扰很多人&#xff0c;例如&#xff0c;有些logo水印&#xff0c;动态水印等等&#xff0c;分享操作经验&#xff1a; 抖音作为中国最受欢迎的社交娱乐应用程序之一&#xff0c;已成为许多人日常生活中不可或缺的一部分。在使用抖音过程中&#x…

融合大数据、物联网和人工智能的智慧校园云平台源码 智慧学校源码

电子班牌系统用以展示各个班级的考勤信息、授课信息、精品课程、德育宣传、班级荣誉、校园电视台、考场信息、校园通知、班级风采&#xff0c;是智慧校园和智慧教室的对外呈现窗口&#xff0c;也是学校校园文化宣传和各种信息展示的重要载体。将大数据、物联网和人工智能等新兴…

tinkerCAD案例:24. Ruler - Measuring Lengths 标尺 -量勺

tinkerCAD案例&#xff1a;24. Ruler - Measuring Lengths 标尺 - 测量长度 Project Overview: 项目概况&#xff1a; A machine shop, where any idea can become a reality, can cost millions and million of dollars. Still, the most important tool in the shop is the…

ELK高级搜索(一)

文章目录 ELK搜索1&#xff0e;简介1.1 内容1.2 面向 2&#xff0e;Elastic Stack2.1 简介2.2 特色2.3 组件介绍 3&#xff0e;Elasticsearch3.1 搜索是什么3.2 数据库搜索3.3 全文检索3.4 倒排索引3.5 Lucene3.6 Elasticsearch3.6.1 Elasticsearch的功能3.6.2 Elasticsearch使…

Patchwork 黑客组织瞄准我国大学和研究机构

据知道创宇404高级威胁情报团队近期发现&#xff0c;名为“Patchwork”的黑客组织正以中国的大学和研究机构为目标进行活动&#xff0c;部署名为EyeShell的后门。 Patchwork也被称为“Operation Hangover”和“Zinc Emerson”&#xff0c;被怀疑是来自印度的APT组织。该组织发起…

职业发展规划指南:如何成为成功的产品经理

导语&#xff1a;产品经理是当今互联网时代最炙手可热的职位之一。作为连接技术、商业和用户需求的桥梁&#xff0c;产品经理在公司中扮演着至关重要的角色。本文将为你提供一些关于产品经理职业发展的规划指南&#xff0c;帮助你在这个领域取得成功。 掌握核心技能&#xff1…

Coremail中睿天下|2023年第二季度企业邮箱安全态势观察

7月24日&#xff0c;Coremail邮件安全联合中睿天下发布《2023第二季度企业邮箱安全性研究报告》&#xff0c;对2023第二季度和2023上半年的企业邮箱的安全风险进行了分析。 一、垃圾邮件同比下降16.38% 根据Coremail邮件安全人工智能实验室&#xff08;以下简称AI实验室&#…

pgsql 查看某个表建立了那些索引sql

执行以下sql&#xff1a; SELECTns.nspname as schema_name,tab.relname as table_name,cls.relname as index_name,am.amname as index_type,idx.indisprimary as is_primary,idx.indisunique as is_unique FROMpg_index idx INNER JOIN pg_class cls ON cls.oididx.indexrel…

AI技术快讯:清华开源ChatGLM2双语对话语言模型

ChatGLM2-6B是一个开源项目&#xff0c;提供了ChatGLM2-6B模型的代码和资源。根据提供的搜索结果&#xff0c;以下是对该项目的介绍&#xff1a; 论文&#xff1a;https://arxiv.org/pdf/2103.10360.pdf ChatGLM2-6B是一个开源的双语对话语言模型&#xff0c;是ChatGLM-6B模…

如何利用plotly和geopandas根据美国邮政编码(Zip-Code)绘制美国地图

对于我自己来说&#xff0c;该需求源自于分析Movielens-1m数据集的用户数据&#xff1a; UserID::Gender::Age::Occupation::Zip-code 1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117我希望根据Zip-…

Windows下安装Hive(包安装成功)

Windows下安装Hive Hive与Hadoop的版本选择很关键&#xff0c;千万不能选错&#xff0c;否则各种报错。一、Hive下载1.1、官网下载Hive1.2、网盘下载Hive 二、解压安装包&#xff0c;配置Hive环境变量2.1、环境变量新增&#xff1a;HIVE_HOME2.2、修改Path环境变量&#xff0c;…

leetcode 860. 柠檬水找零

2023.8.1 简单的一个思路就是建一个大小为3的数组change &#xff0c;用于存储剩余的零钱&#xff0c;然后遍历账单&#xff0c;每次找零钱的时候判断一下是否有足够的零钱&#xff0c;不够的话直接返回false。 能坚持到结束遍历则返回true。 代码如下&#xff1a; class Solu…

【TypeScript】接口类型 Interfaces 的使用理解

导语&#xff1a; 什么是 类型接口&#xff1f; 在面向对象语言中&#xff0c;接口&#xff08;Interfaces&#xff09;是一个很重要的概念&#xff0c;它是对行为的抽象&#xff0c;而具体如何行动需要由类&#xff08;classes&#xff09;去实现&#xff08;implement&#x…

深入理解设计模式之门面模式

深入理解设计模式之门面模式 什么是门面模式&#xff1f; 门面模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它提供了一个简单的接口&#xff0c;用于访问复杂子系统中的一组接口。门面模式通过封装子系统的复杂性&#xff0c;提供了一个更简…