Flink 03 | 数据流基本操作

图片

Flink数据流结构

DataStream 转换

通常我们需要分析的业务数据可能存在如下问题:

  • 数据中包含一些我们不需要的数据

  • 数据格式不方面分析

因此我们需要对原始数据流进行加工,比如过滤、转换等操作才可以进行数据分析。

Flink DataStream 转换主要作用:对输入的数据流(DataStream)经过各种转换操作以生成新的数据流

操作分类

  • 单条记录操作

    • 比如 Map  、 Fliter

  • 基于窗口 (Window)操作

    • 窗口根据某些特征(例如,过去 5 秒内到达的数据)对所有流事件进行分组

  • 合并数据流

    • union 、join、connect 可以将多个DataStream 合并为一个DataStream 进行分析处理

  • 拆分数据流

    • 将数据流拆分为多个数据流分别对每个数据流进行分析

基本操作

操作描述备注
Map将数据流中每个元素转换为新的元素类似 Java 中 stream.map 操作
Filter筛选只保留符合条件的数据类似 Java 中 stream.filter 操作
FlatMap将一个输入"展开"为多个元素
KeyBy将流逻辑划分为不相交的分区。所有具有相同键的记录都分配到同一个分区。
Reduce对具有相同键的元素进行规约操作,如求和、求最大值

使用示例

Map

将数据流中每个元素转换为新的元素

使用场景很多,主要对原始数据进行加工转换,Java 8 中 stream().map 操作相信大家不陌生, Flink中map 操作类似。

以下展示对数据流中数字取绝对值例子。

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return value >=0 ? value : -value;
    }
});

Filter

筛选出数据流中符合条件的数据,进行分析, 该操作同样与Java 8 中 stream().filter 类型。

以下代码 保留数据流中正数。

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});

FlatMap

该操作将一个输入"展开"为多个元素,简单来说一个对象,变成一个List。

典型例子,将句子拆分为单词

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Reduce 操作

对具有相同键的元素进行规约操作,如求和、求最大值。单词统计能够很好的展示 Flink 基本操作,包括reduce操作。

数据源进行KeyBy 后, Reduce 操作即 数据流按Key 分组聚合

public class WordCount {  
    public static void main(String[] args) throws Exception {  
        // 设置执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 从文件中读取文本数据  
        DataStream<String> text = env.readTextFile("your file");
  
        // 使用 flatMap 将文本分割成单词  
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())  
                // 使用 keyBy 分组,然后使用 reduce 进行聚合  
                .keyBy(value->value.f0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {  
                    @Override  
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {  
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  
                    }  
                });  
  
        // 打印结果  
        counts.print();  
  
        // 执行程序  
        env.execute("Flink Word Count Example");  
    }  
  
    // 自定义 Tokenizer 用于分割文本  
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {  
  
        @Override  
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {  
            // 使用空格分割字符串  
            for (String word : value.toLowerCase().split("\\s+")) {  
                if (word.length() > 0) {  
                    out.collect(new Tuple2<>(word, 1));  
                }  
            }  
        }  
    }  
}

总结

本文介绍了Flink 数据流基本操作Map/Filter/FlatMap/KeyBy/Reduce 的用法以及使用场景,并通过一个完整的例子展示 这些基本操作同时使用,完成数据分析过程。

对于Flink 一些其他高级操作,会持续更新中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/888388.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PHP变量(第④篇)

本栏目教学是php零基础到精通&#xff0c;如果你还没有安装php开发工具请查看下方链接&#xff1a; Vscode、小皮面板安装-CSDN博客 今天来讲一讲php中的变量&#xff0c;变量是用于存储信息的"容器"&#xff0c;这些数据可以在程序执行期间被修改&#xff08;即其…

nginx配置https加密

安装nginx 官网&#xff1a; https://nginx.org/ yum安装&#xff1a; https://nginx.org/en/linux_packages.html /etc/yum.repos.d/nginx.repo [nginx-stable] namenginx stable repo baseurlhttp://nginx.org/packages/centos/$releasever/$basearch/ gpgcheck1 enabled1 …

(C语言贪吃蛇)15.贪吃蛇吃食物

目录 前言 注意事项⚠️ 效果预览 实现方法 运行效果 新的问题&#x1f64b; 最终效果 总结 前言 我们上一节实现了解决了贪吃蛇不合理走位的情况&#xff0c;不理解的再回去看看(传送门&#xff1a;解决贪吃蛇不合理走位)&#xff0c;那么贪吃蛇自然是要吃食物的啊&…

【GEE学习第三期】GEE常用函数总结

【GEE学习第三期】GEE常用函数总结 数据统计类ee.List.sequence函数 图像处理类ee.Geometry类‌defaultVisualizationVis函数 数据输入输出数值与绘图导出影像 参考 数据统计类 ee.List.sequence函数 用法如下&#xff1a; ee.List.sequence &#xff08;开始&#xff0c;结…

windows C++-创建图像处理的异步消息(二)

创建图像处理网络 此部分介绍如何创建对给定目录中的每个 JPEG (.jpg) 图像执行图像处理的异步消息块网络。 网络执行以下图像处理操作&#xff1a; 对于 Tom 创作的任何图像&#xff0c;转换为灰度。 对于任何以红色作为主色的图像&#xff0c;移除绿色和蓝色分量&#xff0…

提升开机速度:有效管理Windows电脑自启动项,打开、关闭自启动项教程分享

日常使用Windows电脑时&#xff0c;总会需要下载各种各样的办公软件。部分软件会默认开机自启功能&#xff0c;开机启动项是指那些在电脑启动时自动运行的程序和服务。电脑开机自启太多的情况下会导致电脑卡顿&#xff0c;开机慢&#xff0c;运行不流畅的情况出现&#xff0c;而…

Unity各个操作功能+基本游戏物体创建与编辑+Unity场景概念及文件导入导出

各个操作功能 部分功能 几种操作游戏物体的方式&#xff1a; Center:有游戏物体父子关系的时候&#xff0c;中心点位置 Global/Local:世界坐标系方向/自身坐标系方向 &#xff1a;调试/暂停/下一帧 快捷键 1.Alt鼠标左键&#xff1a;可以实现巡游角度查看场景 2.鼠标滚轮…

mysql join的使用

MySQL 支持以下 JOIN 语法用于 SELECT 语句和多表 DELETE 和 UPDATE 语句中的 table_references 部分&#xff1a; table_references: 查询中涉及的一个或多个表的引用&#xff0c;可以是简单表名或 JOIN 表达式的组合。 escaped_table_reference [, escaped_table_referenc…

10.7学习

1.安全认证 ●Session 认证中最常用的一种方式&#xff0c;也是最简单的。存在多节点session丢失的情况&#xff0c;可通过nginx粘性Cookie和Redis集中式Session存储解决 ●HTTP Basic Authentication 服务端针对请求头中base64加密的Authorization 和用户名和密码进行校验。…

《贪吃蛇小游戏 1.0》源码

好久不见&#xff01; 终于搞好了简易版贪吃蛇小游戏&#xff08;C语言版&#xff09;&#xff0c;邀请你来玩一下~ 目录 Snake.h Snake.c test.c Snake.h #include<stdio.h> #include<windows.h> #include<stdbool.h> #include<stdlib.h> #inclu…

Ascend C 自定义算子开发:高效的算子实现

Ascend C 自定义算子开发&#xff1a;高效的算子实现 在 Ascend C 平台上&#xff0c;开发自定义算子能够充分发挥硬件的性能优势&#xff0c;帮助开发者针对不同的应用场景进行优化。本文将以 AddCustom 算子为例&#xff0c;介绍 Ascend C 中自定义算子的开发流程及关键技术…

FireRedTTS - 小红书最新开源AI语音克隆合成系统 免训练一键音频克隆 本地一键整合包下载

小红书技术团队FireRed最近推出了一款名为FireRedTTS的先进语音合成系统&#xff0c;该系统能够基于少量参考音频快速模仿任意音色和说话风格&#xff0c;实现独特的音频内容创造。 FireRedTTS 只需要给定文本和几秒钟参考音频&#xff0c;无需训练&#xff0c;就可模仿任意音色…

[记录]-安装pycharm

官网下载安装包&#xff1a;https://www.jetbrains.com/pycharm/ 然后按照引导安装 全部勾选

【数据管理】DAMA-元数据专题

导读&#xff1a;元数据是关于数据的组织、数据域及其关系的信息&#xff0c;是描述数据的数据。在数据治理中&#xff0c;元数据扮演着至关重要的角色&#xff0c;是数据治理的基础和支撑。以下是对数据治理中元数据专题方案的详细介绍&#xff1a; 目录 一、元数据的重要性 …

VRRP协议个人理解+报文示例+典型配置-RFC2338/RFC3768/RFC5798/RFC9568

个人认为&#xff0c;理解报文就理解了协议。通过报文中的字段可以理解协议在交互过程中相关传递的信息&#xff0c;更加便于理解协议。 因此本文将在VRRP协议报文的基础上进行介绍。 VRRP协议发展 关于VRRPv2基本原理&#xff0c;可重点参考2004年发布的RFC3768-Virtual Ro…

【Python|接口自动化测试】使用requests发送http请求时添加headers

文章目录 1.前言2.HTTP请求头的作用3.在不添加headers时4.反爬虫是什么&#xff1f;5.在请求时添加headers 1.前言 本篇文章主要讲解如何使用requests请求时添加headers&#xff0c;为什么要加headers呢&#xff1f;是因为有些接口不添加headers时&#xff0c;请求会失败。 2…

【C++ Primer Plus】4

2 字符串 字符串是存储在内存的连续字节中的一系列字符&#xff1b;C处理字符串的方式有两种&#xff0c; c-风格字符串&#xff08;C-Style string&#xff09;string 类 2.1 c-风格字符串&#xff08;C-Style string&#xff09; 2.1.1 char数组存储字符串&#xff08;c-…

Python编码规范与常见问题纠正

Python编码规范与常见问题纠正 Python 是一种以简洁和易读性著称的编程语言&#xff0c;因此&#xff0c;遵循良好的编码规范不仅能使代码易于维护&#xff0c;还能提升代码的可读性和可扩展性。编写规范的 Python 代码也是开发者职业素养的一部分&#xff0c;本文将从 Python…

Linux聊天集群开发之环境准备

一.windows下远程操作Linux 第一步&#xff1a;在Linux终端下配置openssh&#xff0c;输入netstate -tanp,查看ssh服务是否启动&#xff0c;默认端口22.。 注&#xff1a;如果openssh服务&#xff0c;则需下载。输入命令ps -e|grep ssh, 查看如否配有&#xff0c; ssh-agent …

tensorflow快速入门--如何定义张量、定义网络结构、超参数设置、模型训练???

前言 由于最近学习的东西涉及到tensorflow的使用&#xff0c;故先简单的学习了一下tensorflow中如何定义张量、定义网络结构、超参数设置、模型训练的API调用过程&#xff1b;欢迎大家&#xff0c;收藏关注&#xff0c;本人将持续更新。 文章目录 1、基本操作1、张量基础操作创…