Flink 06 聚合操作入门学习,真不难

抛砖引玉

  1. 让你统计1小时内每种商品的销售额,用Flink 该怎么实现。

  2. 还是让你统计1小时内每种商品的销售额,但是要过滤掉退款的订单,用Flink 该怎么实现。

学了本文两个操作,不信你还不会。

AggregateFunction

通常用于对数据流中的数据进行分组聚合。它可以将一组数据逐步合并、计算,最终得到一个聚合结果。

AggregateFunction 接口包含几个关键的方法,这些方法定义了如何进行状态初始化、累加、合并和获取结果:

createAccumulator():该方法在聚合前被调用,用于初始化聚合状态。

add(value, accumulator)该方法将新的输入值加到累加器上。在每个事件到达时调用会调用该方法。

getResult(accumulator):该方法用于返回最终聚合结果。这在聚合操作结束时被调用。

merge(acc1, acc2)(可选):该方法作用是,在并行流处理情况下,需要合并不同实例的聚合结果。

以下示例模拟统计每小时各商品的销售额

public class AggregateFunctionDemo {

    public static class Order{

        String goods;

        int amount;

        public Order(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }
    }

    public static class OrderACC{

        String goods;

        int amount;

        public OrderACC(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "OrderACC{" +
                    "goods='" + goods + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }

    public  static class OrderACCFunction implements AggregateFunction<Order, OrderACC, OrderACC> {


        @Override
        public OrderACC createAccumulator() {
            return new OrderACC(null,0);
        }

        @Override
        public OrderACC add(Order value, OrderACC accumulator) {
            
            if (accumulator.goods == null) {
                accumulator.goods = value.goods;
            }
            accumulator.amount += value.amount;
            return accumulator;
        }

        @Override
        public OrderACC getResult(OrderACC accumulator) {

            return accumulator;
        }

        @Override
        public OrderACC merge(OrderACC a, OrderACC b) {
            a.amount += b.amount;
            return a;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {

            boolean running = true;

            List<String> goods = Arrays.asList("书包","本子","笔");

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {

                Random random = new Random();
                while (running){
                  int goodsIndex =   random.nextInt(goods.size());
                  int amount = random.nextInt(1000);
                    Order order = new Order(goods.get(goodsIndex), amount);
                    ctx.collect(order);


                    Thread.sleep(200);
                }

            }

            @Override
            public void cancel() {
                running = false;
            }
        });

        DataStream<OrderACC> resultStream =
                dataStream.keyBy(order -> order.goods).
                        window(TumblingProcessingTimeWindows.of(Time.hours(5))).
                aggregate(new OrderACCFunction());

        resultStream.print();

        env.execute();

    }
}


AggregateFunction 小结

  • AggregateFunction 常用于对窗口内的数据进行聚合计算。

例如,你可能需要计算某个时间窗口内某个指标的平均值、总和、最大值或最小值等。

  • 在分布式计算环境中,通过实现 merge 方法,Flink 可以在不同的节点上并行地执行聚合计算,并在最后将结果合并。

ProcessWindowFunction

ProcessWindowFunction 是 Flink 提供的一个强大的窗口函数接口,允许开发者对窗口中的元素进行自定义处理,包括访问窗口的元数据和状态。

来看看ProcessWindowFunction中 process方法的定义

 void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

图片

从上面方法定义我们基本可以推断ProcessWindowFunction 的特点

  • Iterable<IN> elements 窗口中所有元素 ,这与 ReduceFunction 或 AggregateFunction 不同,后者主要关注于元素之间的聚合操作。我们可以遍历elements,实现自己的聚合逻辑。

  • Context context:你可以通过Context获取到窗口的元数据,如窗口的开始和结束时间戳。甚至进行状态管理

ProcessWindowFunction 的使用

public class AggregateFunctionDemo2 {

    public static class Order{

        String goods;

        int amount;

        boolean refund;

        public Order(String goods, int amount, boolean refund) {
            this.goods = goods;
            this.amount = amount;
            this.refund = refund;
        }
    }

    public static class OrderACC{

        String goods;

        int amount;


        public OrderACC(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "OrderACC{" +
                    "goods='" + goods + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }

    public  static class OrderProcessWindowFunction extends ProcessWindowFunction<Order,OrderACC,String, TimeWindow> {


        @Override
        public void process(String key, ProcessWindowFunction<Order, OrderACC, String, TimeWindow>.Context context, Iterable<Order> elements, Collector<OrderACC> out) throws Exception {

            int sum = 0;
            for(Order order : elements){
                if(!order.refund){
                    sum += order.amount;
                }
            }
            out.collect(new OrderACC(key,sum));
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {

            boolean running = true;

            List<String> goods = Arrays.asList("书包","本子","笔");

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {

                Random random = new Random();
                while (running) {
                    int goodsIndex = random.nextInt(goods.size());
                    int amount = random.nextInt(1000);
                    boolean refund = random.nextBoolean();
                    Order order = new Order(goods.get(goodsIndex), amount, refund);
                    ctx.collect(order);
                    Thread.sleep(100);
                }

            }

            @Override
            public void cancel() {
                running = false;
            }
        });

        DataStream<OrderACC> resultStream = dataStream.keyBy(order -> order.goods).
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new OrderProcessWindowFunction());
        
        resultStream.print();

        env.execute();
        

    }
}

图片

ProcessWindowFunction小结

  • 可以实现复杂的聚合逻辑,比如对窗口内元素进行过滤、排序之后 再进行聚合。

  • 可以获取窗口的状态信息,(如窗口的开始和结束时间)来满足一些特定的需求

总结

本文介绍了如何使用ProcessWindowFunction/AggregateFunction 完成一些聚合操作。通过对比两端代码,相信聪明的你已经体会到两者差异。再回到开头的问题,相信已经不是问题,信手拈来了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/891900.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React国际化中英文切换实现

目录 概况 安装 文件结构 引入 使用 正常使用 传参使用 概况 react-intl-universal 是一个国际化库&#xff0c;专门为 React 应用提供多语言支持。与 React 原生的 react-intl 相比&#xff0c;react-intl-universal 支持从远程服务器加载语言包&#xff0c;动态切换语…

【Python】selenium遇到“InvalidArgumentException”的解决方法

在使用try……except 的时候捕获到这个错误&#xff1a; InvalidArgumentException: invalid argument (Session info: chrome112.0.5614.0) 这个错误代表的是&#xff0c;当传入的参数不符合期望时&#xff0c;就会抛出这个异常&#xff1a; InvalidArgumentException: invali…

【MySQL 保姆级教学】在Linux(CentoS 7)中安装MySQL(1)

目录 1. 卸载linux&#xff08;Centos7&#xff09; 中不要的环境2. 获取MySQL官方yum源2.1 获取yum源前先查看自己 linux&#xff08;Centos&#xff09;的版本2.2 获取官方yum源 3. 安装xftp和连接4. 开放连接端口5. 上传文件到Centos76. 安装MySQL6.1 顺利安装6.2 查询是否安…

QGraphics类型学习使用【Qt】【C++】

QGraphics类型学习使用 需求过程全部完整代码 首先已知&#xff0c;QGraphicsView&#xff0c;QGraphicsScene, QGraphicsItem&#xff0c;分别称为&#xff1a;视图&#xff0c;场景&#xff0c;图元&#xff0c;图表就是各种各样的元素&#xff0c;图片元素&#xff0c;线条元…

Unity Apple Vision Pro 开发:Metal 渲染模式开启透视遇到背景黑屏的解决方法

XR 开发者社区链接&#xff1a; SpatialXR社区&#xff1a;完整课程、项目下载、项目孵化宣发、答疑、投融资、专属圈子 以下步骤适用于 PolySpatial 2.0 及以上的版本。 我们可以在 Project Settings 中的 Apple visionOS 里将 App Mode 设为 Metal Rendering with Composit…

【C语言】分支结构switch

switch分支语句 多适用于明确表达式结果的情况&#xff0c;多个分支&#xff0c;用if过于繁琐。 case后跟具体的表达式值&#xff0c;break&#xff1b;跳出分支语句。 #include <stdio.h> #include <math.h> /* 功能&#xff1a;选择结构&#xff08;switch&…

Flink CDC同步mysql数据到doris

前置参考 flink快速安装&#xff1a;Flink入门-CSDN博客 doris快速安装&#xff1a;Apache Doris快速安装-CSDN博客 Flink CDC简介 Flink CDC 是一个基于流的数据集成工具&#xff0c;旨在为用户提供一套功能更加全面的编程接口&#xff08;API&#xff09;。 该工具使得用户能…

AI测试之 TestGPT

如今最火热的技术莫非OpenAI的ChatGPT莫属&#xff0c;AI技术也在很多方面得到广泛应用。今天我们要介绍的TestGPT就是一个软件测试领域中当红的应用。 TestGPT是什么&#xff1f; TestGPT是一家总部位于以色列特拉维夫的初创公司 CodiumAI Ltd.&#xff0c;发布的一款用于测…

hadoop集群搭建-克隆虚拟机,安装jdk,hadoop

2.2 hadoop运行环境的搭建 2.2.1 环境准备 1&#xff09;安装模板虚拟机&#xff0c;IP地址 192.168.10.100&#xff0c;主机名hadoop100&#xff0c;内存41GB&#xff0c;硬盘50GB 2&#xff09;虚拟机配置 首先测试虚拟机是否可以正常上网&#xff0c;测试方法ping www.b…

配置环境windows-IIS默认拒绝put,delete的解决方案

方法一&#xff1a; <system.webServer> </system.webServer> 方法二&#xff1a;移除网站“模块”中的"webdavmodule"

【芯智雲城】Boradcom(博通) 多领域技术解决方案介绍

Broadcom Inc. 是一家全球领先的技术企业&#xff0c;业务范围囊括多种半导体、企业用软件和安全解决方案的设计、开发和供应。Broadcom 的类别领先产品组合在许多重要的市场中发挥作用&#xff0c;其中包括云、数据中心、网络、带宽、无线技术、存储&#xff0c;以及工业和企业…

元数据 - iXML

在专业的音频和视频制作中&#xff0c;元数据的准确传递对于后期制作和编辑至关重要。iXML&#xff08;iXML Metadata&#xff09;是一种开放的、可扩展的元数据规范&#xff0c;旨在在录音设备和数字音频工作站&#xff08;DAW&#xff09;之间传递详细的录音信息。 一、什么是…

单目相机和双目相机定位

1、单目相机 1.1模型 单目相机成像模型为小孔成像&#xff0c;涉及的坐标系包括世界坐标系、相机坐标系、图像坐标系以及像素坐标系。坐标系之间的转换关系如下&#xff1a; 1.2参数求解 张正友相机标定方法、设定世界坐标系精确求解 2、双目相机 2.1、模型 一般双目立体视…

低代码策略量化平台更新|大模型agents生态的一些思考

原创内容第680篇&#xff0c;专注量化投资、个人成长与财富自由。 用户判断星球会员后&#xff0c;会获得10个积分&#xff1a; 当其他用户发布策略&#xff0c;设置为下载需要积分时&#xff1a; 下载策略会扣除相应的积分&#xff0c;扣除的积分属于策略所有者。 策略运行结…

大型企业软件开发是什么样子的? - Web Dev Cody

引用自大型企业软件开发是什么样子的&#xff1f; - Web Dev Cody_哔哩哔哩_bilibili 一般来说 学技术的时候 我们会关注 开发语言特性 &#xff0c;各种高级语法糖&#xff0c;底层技术 但是很少有关注到企业里面的开发流程&#xff0c;本着以终为始&#xff08;以就业为导向…

python源码:目录文件大小排序

前言 这个代码并不难懂&#xff0c;但是在一定情况下&#xff0c;能够为你的自动化脚本提供便利。 该代码主要是&#xff1a;根据大小&#xff0c;对某个目录的下级子文件和目录进行排序。 代码 效果 代码 import osdef get_dir_size(directory):"""计算给定…

【Linux线程】Linux线程编程基础:概念、创建与管理

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;Linux “ 登神长阶 ” &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀Linux多线程 &#x1f4d2;1. 线程概念&#x1f4dc;2. 进程VS线程&#x1f4da;3. 线程控制…

SpringBoot实现接口:统一返回值、全局异常处理、Swagger接口文档

在 Spring Boot 应用中实现统一返回值和全局异常处理可以带来多方面的好处&#xff0c;这些好处不仅提升了代码的可读性和可维护性&#xff0c;还增强了应用的健壮性和用户体验。以下是一些具体的好处&#xff1a; 代码一致性&#xff1a; 通过定义统一的返回值格式&#xff…

【从零开始的LeetCode-算法】3194. 最小元素和最大元素的最小平均值

你有一个初始为空的浮点数数组 averages。另给你一个包含 n 个整数的数组 nums&#xff0c;其中 n 为偶数。 你需要重复以下步骤 n / 2 次&#xff1a; 从 nums 中移除 最小 的元素 minElement 和 最大 的元素 maxElement。将 (minElement maxElement) / 2 加入到 averages …

Apache Linkis + OceanBase:如何提升数据分析效率

计算中间件 Apache Linkis 构建了一个计算中间件层&#xff0c;以实现上层应用程序和底层数据引擎之间的连接、治理和编排。目前&#xff0c;已经支持通过数据源的功能&#xff0c;实现用户通过Linkis 对接并使用 OceanBase数据库。 本文详细阐述了在 Apache Linkis v1.3.2中&a…