Flink-Source的使用

Data Sources 是什么呢?就字面意思其实就可以知道:数据来源

Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。

flink在/流处理中常见的source主要有两大类。

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

预定义Source演示 

Collection [测试]--本地集合Source

在flink最常见的创建DataStream方式有四种:

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:

|使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue

l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了

l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.fromSequence(开始,结束);

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _01YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 各种获取数据的Source
        DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
        dataStreamSource.print();
        // 演示一个错误的
        //DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
        //dataStreamSource2.print();
        DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
                Tuple2.of("张三", 18),
                Tuple2.of("lisi", 18),
                Tuple2.of("wangwu", 18)
        );
        elements.print();

        // 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的API
        String[] arr = {"hello","world"};
        System.out.println(arr.length);
        System.out.println(Arrays.toString(arr));
        List<String> list = Arrays.asList(arr);
        System.out.println(list);

        env.fromElements(
                Arrays.asList(arr),
                Arrays.asList(arr),
                Arrays.asList(arr)
        ).print();




        // 第二种加载数据的方式
        // Collection 的子接口只有 Set 和 List
        ArrayList<String> list1 = new ArrayList<>();
        list1.add("python");
        list1.add("scala");
        list1.add("java");
        DataStreamSource<String> ds1 = env.fromCollection(list1);
        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));

        // 第三种
        DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
        ds3.print();


        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

本地文件的案例:

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _02YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取并行度
        System.out.println(env.getParallelism());

        // 讲第二种Source File类型的
        // 给了一个相对路径,说路径不对,老闫非要写,我咋办?
        // 相对路径,转绝对路径
        File file = new File("datas/wc.txt");
        File file2 = new File("./");
        System.out.println(file.getAbsoluteFile());
        System.out.println(file2.getAbsoluteFile());
        DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
        ds1.print();
        // 还可以获取hdfs路径上的数据
        DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
        ds2.print();



        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

Socket [测试] 

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

使用nc 进行数据的发送

yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!

如果是windows平台:nc -lp 8888 

代码演示:

//socketTextStream创建的DataStream,不论怎样,并行度永远是1
public class StreamSocketSource {

    public static void main(String[] args) throws Exception {

        //local模式默认的并行度是当前机器的逻辑核的数量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        int parallelism0 = env.getParallelism();

        System.out.println("执行环境默认的并行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //获取DataStream的并行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的并行度:" + parallelism);

        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        int parallelism2 = words.getParallelism();

        System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);

        words.print();

        env.execute();
    }
}

以下用于演示:统计socket中的 单词数量,体会流式计算的魅力!

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceDemo02_Socket {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source-加载数据
        DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);

        //TODO 3.transformation-数据转换处理
        //3.1对每一行数据进行分割并压扁
        DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        //3.2每个单词记为<单词,1>
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //3.4聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

        //TODO 4.sink-数据输出
        result.print();

        //TODO 5.execute-执行
        env.execute();
    }
}

自定义数据源

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 */

@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{
    private String orderId;
    private int uid;
    private int money;
    private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {
    boolean flag = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        // 源源不断的产生数据
        Random random = new Random();
        while(flag){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString());
            orderInfo.setUid(random.nextInt(3));
            orderInfo.setMoney(random.nextInt(101));
            orderInfo.setTimeStamp(System.currentTimeMillis());
            ctx.collect(orderInfo);
            Thread.sleep(1000);// 间隔1s
        }
    }

    // source 停止之前需要干点啥
    @Override
    public void cancel() {
        flag = false;
    }
}
public class CustomSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
        // 将自定义的数据源放入到env中
        DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
        System.out.println(dataStreamSource.getParallelism());
        dataStreamSource.print();
        env.execute();
    }


}

 通过ParallelSourceFunction创建可并行Source

/**
 * 自定义多并行度Source
 */
public class CustomerSourceWithParallelDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
        mySource.print();
        env.execute();
    }
    public static class MySource implements ParallelSourceFunction<String> {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            ctx.collect(UUID.randomUUID().toString());
            /*
            如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
             */
        }

        @Override
        public void cancel() {}
    }
}

如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话。

总结:Rich富函数总结 ctrl + o

 Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntimeContext 方法可以获得当前的Runtime对象(底层API)

Kafka Source --从kafka中读取数据

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

创建一个topic1 这个主题:

cd /opt/installs/kafka3/

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1

通过控制台向topic1发送消息:
bin/kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1
package com.bigdata.day02;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        // 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
        // 返回true 的数据就保留下来,返回false 直接丢弃
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String word) throws Exception {
                // 查看单词中是否包含success 字样
                return word.contains("success");
            }
        }).print();

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/921653.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

.net的winfrom程序 窗体透明打开窗体时出现在屏幕右上角

窗体透明&#xff0c; 将Form的属性Opacity&#xff0c;由默认的100% 调整到 80%(尽量别低于50%)&#xff0c;这个数字越小越透明&#xff01; 打开窗体时出现在屏幕右上角 //构造函数 public frmCalendarList() {InitializeComponent();//打开窗体&#xff0c;窗体出现在屏幕…

分布式系统稳定性建设-性能优化篇

分布式系统稳定性建设-性能优化篇 系统稳定性建设是系统工程的核心内容之一。以下是一些重要的方面: 架构设计: 采用模块化、松耦合的架构设计,以提高系统的可扩展性和可维护性。合理划分系统功能模块,降低单个模块的复杂度。定义清晰的接口和数据交换标准,确保各模块之间协调…

【bug】使用transformers训练二分类任务时,训练损失异常大

使用transformers训练二分类任务时&#xff0c;训练损失异常大 问题分析 问题 training_loss异常大&#xff0c;在二分类损失中&#xff0c;收敛在1~2附近&#xff0c;而eval_loss却正常&#xff08;小于0.5&#xff09; 分析 参考&#xff1a; Bug in gradient accumulation…

电容测试流程

一、外观检测 1. 目的&#xff1a;检验电容样品外观是否与规格书一致&#xff0c;制程工艺是否良好&#xff0c;确保部品的品质。 2. 仪器&#xff1a;放大镜 3. 测试说明&#xff1a; &#xff08;1&#xff09;样品上丝印与规格书中相符&#xff0c;丝印信息&#xff08;…

C++设计模式行为模式———中介者模式

文章目录 一、引言二、中介者模式三、总结 一、引言 中介者模式是一种行为设计模式&#xff0c; 能让你减少对象之间混乱无序的依赖关系。 该模式会限制对象之间的直接交互&#xff0c; 迫使它们通过一个中介者对象进行合作。 中介者模式可以减少对象之间混乱无序的依赖关系&…

一篇保姆式centos/ubuntu安装docker

前言&#xff1a; 本章节分别演示centos虚拟机&#xff0c;ubuntu虚拟机进行安装docker。 上一篇介绍&#xff1a;docker一键部署springboot项目 一&#xff1a;centos 1.卸载旧版本 yum remove docker docker-client docker-client-latest docker-common docker-latest doc…

EasyAnimate:基于Transformer架构的高性能长视频生成方法

这里主要是对EasyAnimate的论文阅读记录&#xff0c;感兴趣的话可以参考一下&#xff0c;如果想要直接阅读原英文论文的话地址在这里&#xff0c;如下所示&#xff1a; 摘要 本文介绍了EasyAnimate&#xff0c;一种利用Transformer架构实现高性能视频生成的高级方法。我们将原…

李宏毅机器学习课程知识点摘要(6-13集)

pytorch简单的语法和结构 dataset就是数据集&#xff0c;dataloader就是分装好一堆一堆的 他们都是torch.utils.data里面常用的函数&#xff0c;已经封装好了 下面的步骤是把数据集读进来 这里是读进来之后&#xff0c;进行处理 声音信号&#xff0c;黑白照片&#xff0c;红…

gpt2的学习

现在学习下gpt2模型做摘要&#xff0c;我们都知道gpt2 是纯decoder&#xff0c;做摘要说话的效果较好。 把数据拆分 按照这个进行tokenizer 用这个tokenizer BertTokenizer.from_pretrained(‘bert-base-chinese’) 2w多词汇表 用交叉熵做lossf&#xff0c; 设好一些简单的…

网络安全设备

防火墙 防火墙是管理和控制网络流量的重要工具&#xff0c;防火墙适用于过滤流量的网络设备。防火墙根据一组定义的规则过滤流量。 静态数据包过滤防火墙 静态数据包过滤防火墙通过检查消息头中的数据来过滤流量。通常&#xff0c;规则涉及源、目标和端口号。静态数据包过滤防…

Python爬虫:深入探索1688关键词接口获取之道

在数字化经济的浪潮中&#xff0c;数据的价值愈发凸显&#xff0c;尤其是在电商领域。对于电商平台而言&#xff0c;关键词不仅是搜索流量的入口&#xff0c;也是洞察市场趋势、优化营销策略的重要工具。1688作为中国领先的B2B电商平台&#xff0c;其关键词接口的获取对于商家来…

SpringCloud Gateway转发请求到同一个服务的不同端口

SpringCloud Gateway默认不支持将请求路由到一个服务的多个端口 本文将结合Gateway的处理流程&#xff0c;提供一些解决思路 需求背景 公司有一个IM项目&#xff0c;对外暴露了两个端口8081和8082&#xff0c;8081是springboot启动使用的端口&#xff0c;对外提供一些http接口…

全面监测Exchange邮件服务器的关键指标

在当今高度信息化的社会&#xff0c;Exchange邮件服务器已成为企业日常通信的重要组成部分。为了确保邮件服务器的稳定运行&#xff0c;及时发现潜在问题并采取相应的解决措施显得尤为重要。监控易作为一款专业的监控工具&#xff0c;为Exchange邮件服务器提供了全方位的监测功…

实用功能,觊觎(Edge)浏览器的内置截(长)图功能

Edge浏览器内置截图功能 近年来&#xff0c;Edge浏览器不断更新和完善&#xff0c;也提供了长截图功能。在Edge中&#xff0c;只需点击右上角的“...”&#xff0c;然后选择“网页捕获”->“捕获整页”&#xff0c;即可实现长截图。这一功能的简单易用&#xff0c;使其成为…

IDEA2023版本配置项目全局编码

IDEA默认的项目编码是UTF-8&#xff0c;有时候拿到别人的代码使用的编码是GBK&#xff0c;虽然可以在idea右下角进行修改&#xff0c;但是一个一个的修改太慢了。所以需要去进行该项目的编码全局配置。接下来直接讲步骤&#xff0c;以IDEA2023版本为例。 第一步 File>Sett…

【Spiffo】环境配置:VScode+Windows开发环境

摘要&#xff1a; 在Linux下直接开发有时候不习惯快捷键和操作逻辑&#xff0c;用Windows的话其插件和工具都更齐全、方便&#xff0c;所以配置一个Windows的开发环境能一定程度提升效率。 思路&#xff1a; 自己本地网络内远程连接自己的虚拟机&#xff08;假定用的是虚拟机…

计算机网络 实验六 组网实验

一、实验目的 通过构造不同的网络拓扑结构图并进行验证&#xff0c;理解分组转发、网络通信及路由选择的原理&#xff0c;理解交换机和路由器在子网划分中的不同作用。 二、实验原理 组网实验是指将多个计算机通过网络连接起来&#xff0c;实现数据的共享和通信。 组网需要考虑…

springboot vue工资管理系统源码和答辩PPT论文

人类现已迈入二十一世纪&#xff0c;科学技术日新月异&#xff0c;经济、资讯等各方面都有了非常大的进步&#xff0c;尤其是资讯与网络技术的飞速发展&#xff0c;对政治、经济、军事、文化等各方面都有了极大的影响。 利用电脑网络的这些便利&#xff0c;发展一套工资管理系统…

【PPTist】添加PPT模版

前言&#xff1a;这篇文章来探索一下如何应用其他的PPT模版&#xff0c;给一个下拉菜单&#xff0c;列出几个项目中内置的模版 PPT模版数据 &#xff08;一&#xff09;增加菜单项 首先在下面这个菜单中增加一个“切换模版”的菜单项&#xff0c;点击之后在弹出框中显示所有的…

输入/输出管理 III(磁盘和固态硬盘)

一、磁盘 【总结】&#xff1a; 磁盘&#xff08;Disk&#xff09;是由表面涂有磁性物质的物理盘片&#xff0c;通过一个称为磁头的导体线圈从磁盘存取数据。在读&#xff0f;写操作期间&#xff0c;磁头固定&#xff0c;磁盘在下面高速旋转。如下图所示&#xff1a; 磁盘盘面…