【极数系列】Flink集成DataSource读取集合数据(07)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.util.*;

/**
 * @description flink的list集合source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        List<String> list = Arrays.asList("测试", "开发", "运维");

        // 01 从集合创建数据流
        DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);

        // 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);

        // 03 从给定的对象序列中创建数据流
        DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");

        // 04 从迭代器并行创建数据流
        NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);
        DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);

        // 05 基于给定间隔内的数字序列并行生成数据流
        DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);

        //自定义数据流
        DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> sourceContext) throws Exception {
                //自定义你自己的数据来源
                for (int i = 0; i < 10; i++) {
                    sourceContext.collect("测试数据" + i);
                }
            }

            @Override
            public void cancel() {

            }
        });

        //5.输出打印
        dataStreamSource_01.print();
//        dataStreamSource_02.print();
        dataStreamSource_03.print();
        dataStreamSource_04.print();
        dataStreamSource_05.print();
        dataStreamSource_06.print();

        //6.启动运行
        env.execute();
    }

}


4.3 运行结果日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355061.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Python 爬虫的房地产数据可视化分析与实现

摘要&#xff1a; 过去&#xff0c;不管是翻阅书籍&#xff0c;还是通过手机&#xff0c;电脑等从互联网上手动点击搜索信息&#xff0c;视野受限&#xff0c;信息面太过于狭窄&#xff0c;且数据量大而杂乱&#xff0c;爆炸式信息的更新速度是快速且不定时的。要想手动获取到海…

OpenAI、斯坦福大学提出Meta-Prompting,有效提升语言模型的性能

为了研究如何提高语言模型的性能&#xff0c;使其更充分有效地输出对于提问的回答&#xff0c;来自斯坦福和 OpenAI 的学者强强联手&#xff0c;通过提出一种名为元提示&#xff08;meta-prompting&#xff09;的方法来深入探索。元提示通过让单个语言模型&#xff08;如 GPT-4…

【JavaScript基础入门】04 JavaScript基础语法(二)

JavaScript基础语法&#xff08;二&#xff09; 目录 JavaScript基础语法&#xff08;二&#xff09;变量变量是什么声明变量变量类型动态类型注释 数字与运算符数字类型算术运算符操作运算符比较运算符逻辑运算符运算符的优先级 变量 变量是什么 在计算机中&#xff0c;数据…

练习12.6_横向射击_Python编程:从入门到实践(第3版)

编写一个游戏&#xff0c;将一艘飞船放在屏幕左侧&#xff0c;并允许玩家上下移动飞船。在玩家按空格键时&#xff0c; 让飞船发射一颗在屏幕中向右飞行的子弹&#xff0c;并在子弹从屏幕中消失后将其删除。 ship_shooting.py import pygame import sys from leftship impor…

RabbitMQ基础编程模型及详细使用

目录 RabbitMQ基础编程模型 引入依赖 创建连接&#xff0c;获取Channel 声明Exchange-可选 声明queue 声明Exchange与Queue的绑定关系-可选 Producer根据应用场景发送消息到queue Consumer消费消息 Consumer主要有两种消费方式 1、被动消费模式 2、主动消费模式 完成…

sqli-labs闯关

目录 1.安装靶场2.了解几个sql常用知识2.1联合查询union用法2.2MySQL中的通配符&#xff1a;2.3常用函数2.4数据分组 3.mysql中重要的数据库和表4.开始闯关4.1 Less-14.1.1 首先进行一次常规的注入4.1.2 深入解析 1.安装靶场 1.首先推荐使用github下载靶场源码 https://githu…

内网安全:PTH PTK PTT

目录 实验所用网络拓朴图 网络环境说明​​​​​​​ LM认证 NTLM认证 NTLM Hash Kerberos认证 TGT票据 服务票据 Windows系统密码存储 域控制器 - 用户登录 域用户 本地用户 域用户和本地管理员 用户登录 Mimikatz抓取密码来源 域内一台主机上可以得到非本地用…

js实现贪吃蛇

文章目录 实现方法_11实现效果2 实现步骤2.1 移动场地2.2 游戏难度2.3 造蛇和食物2.4 蛇的移动2.5 产生食物的随机位置 3 全部代码 实现方法_21 实现效果2实现想法2.1 蛇的存储 实现方法_1 1实现效果 2 实现步骤 html部分忽略&#xff0c;布局写的太辣眼了 2.1 移动场地 用的表…

遥感的CCDC连续变化监测的qgis插件

简介 今天我逛GitHub的时候&#xff0c;看到一个比较有意思的插件:CCD-Plugin&#xff0c;记录一下。 CCD-Plugin是一个qgis插件&#xff0c;它使用 Google Earth Engine 获取 Landsat 或 Sentinel2 数据集&#xff0c;并运行连续变化检测 (CCDC) 算法来分析给定点的多年时间序…

【C++】一题掌握空指针

今天看见一道面试题&#xff0c;比较有意思&#xff0c;这一分享出来&#xff1a; 1.下面程序能编译通过吗&#xff1f; 2.下面程序会崩溃吗&#xff1f;在哪里崩溃 class A {public:void PrintA(){cout<<_a<<endl;}void Show(){cout<<"Show()"&…

(自用)learnOpenGL学习总结-高级OpenGL-模板测试

模板测试 模板测试简单来说就是一个mask&#xff0c;根据你的mask来保留或者丢弃片段。 那么可以用来显示什么功能呢&#xff1f;剪切&#xff0c;镂空、透明度等操作。 和深度缓冲的关系是&#xff1a; 先片段着色器&#xff0c;然后进入深度测试&#xff0c;最后加入模板测…

Linux第37步_解决“Boot interface 6 not supported”之问题

在使用USB OTG将“自己移植的固件”烧写到eMMC中时&#xff0c;串口会输出“Boot interface 6 not supported”&#xff0c;发现很多人踩坑&#xff0c;我也一样。 见下图&#xff1a; 解决办法&#xff1a; 1、打开终端 输入“ls回车”&#xff0c;列出当前目录下所有的文件…

Centos7 升级Docker 至最新版本

卸载旧版本的Docker yum remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 安装需要的软件包 yum install -y yum-utils device-mapper-persistent-data lvm2 添加Docker的yum源 #yu…

防范[myers@airmail.cc].mkp攻击:解密[myers@airmail.cc].mkp勒索病毒的方法

引言&#xff1a; 随着科技的迅猛发展&#xff0c;网络安全问题日益突出&#xff0c;而勒索病毒也成为当前互联网威胁中的一大焦点。其中&#xff0c;[datastorecyberfear.com].mkp [hendersoncock.li].mkp [hudsonLcock.li].mkp[myersairmail.cc].mkp勒索病毒以其强大的加密能…

QT学习日记 | 初始QT

目录 一、创建QT文件 二、目录结构讲解 1、.pro文件 2、源文件与头文件 3、编译运行 4、界面文件 三、梦开始的地方&#xff08;Hello World&#xff01;&#xff09; 1、代码方式 2、拖拽方式 四、Qt中的“容器” 五、Qt的对象树机制 1、对象树的引入 2、对象树…

Java 的文件类的学习总结

目录 一、File 的创建 二、File 类的常用方法 一、File 的创建 二、File 类的常用方法

开始学习第二十五天(番外)

今天分享一下写的小游戏啦 头文件game.h #include<stdio.h> #include<time.h> #include<stdlib.h> #define H 3 #define L 3 void InitBoard(char Board[H][L], int h, int l); void DisplayBoard(char Board[H][L], int h, int l); void playermove(cha…

【开源】基于JAVA+Vue+SpringBoot的智慧家政系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询家政服务4.2 新增单条服务订单4.3 新增留言反馈4.4 小程序登录4.5 小程序数据展示 五、免责说明 一、摘要 1.1 项目介绍 基于微信小程序JAVAVueSpringBootMySQL的智慧家政系统&#xff0…

OpenCV-29 自适应阈值二值化

一、引入 在前面的部分我们使用的是全局阈值&#xff0c;整幅图像采用同一个数作为阈值。当时这种方法并不适应于所有情况。尤其是当同一幅图像上的不同部分具有不同的亮度时。这种情况下我们需要采用自适应阈值。此时的阈值时根据图像上的每一个小区域计算与其对应的阈值。因此…

Less-1(sqlmap自动注入攻击)--sqli

环境准备 打开火狐浏览器&#xff0c;进入sqli第一关的页面 工具准备 sqlmap 参数解释 -u URL 指定目标URL进行注入测试。--dataDATA指定POST请求的数据进行注入测试--cookieCOOKIE指定用于身份验证的cookie进行注入测试-p PARAMETER指定要测试的参数--levelLEVEL设置测试的深…