Flink滑动窗口(Sliding)中window和windowAll的区别

滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。

窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据

滑动窗口(window)

比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。

代码展示:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class Demo4Window {
    public static void main(String[] args) throws Exception {
        //1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //使用lambda表达式处理数据
        DataStream<String> wordsDS = linesDS
                .flatMap((line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING);

        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                //指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        /*
         * SlidingProcessingTimeWindows:滑动的处理时间窗口
         */
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
                //每隔5秒计算最近15秒的数据
                .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));


        //kv1代表之前的结果(状态),kv2代码最新一条数据
        //reduce:有状态计算
        DataStream<Tuple2<String, Integer>> countDS = windowDS
                .reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));

        countDS.print();

        //execute方法会触发任务执行(任务调度)
        env.execute("lambda");
    }
}

滑动窗口(windowAll) 

将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加

 代码展示:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class Demo4WindowAll {
    public static void main(String[] args) throws Exception {
        //1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //使用lambda表达式处理数据
        DataStream<String> wordsDS = linesDS
                .flatMap((line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING);

        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                //指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        /*
         * SlidingProcessingTimeWindows:滑动的处理时间窗口
         */
        AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS
                //每隔5秒计算最近15秒的数据
                //windowAll:将同一个窗口的数据发一起进行计算
                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));

        //kv1代表之前的结果(状态),kv2代码最新一条数据
        //reduce:有状态计算
        DataStream<Tuple2<String, Integer>> countDS = windowAllDS
                .reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));

        countDS.print();

        //execute方法会触发任务执行(任务调度)
        env.execute("lambda");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/910079.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于YOLO11/v10/v8/v5深度学习的煤矿传送带异物检测系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

Golang--文件操作

1、文件 文件&#xff1a;文件用于保存数据&#xff0c;是数据源的一种 os包下的File结构体封装了对文件的操作&#xff08;记得包os包&#xff09; 2、File结构体--打开文件和关闭文件 2.1 打开文件 打开文件&#xff0c;用于读取&#xff08;函数&#xff09;&#xff1a; 传…

BSAchongsds、

一、 ## 统计基因组整体信息 srun -A 2022099 -p Debug -n 2 -N 1 seqkit stats ~/yiyaoran/workspace/06.BSRseq/guo_BSR_pipline/ref/genome.fasta > genome.allstatcat genome.allstat 文件名 格式 类型 序列数量 总长度 最小长度 平均长…

聊一聊Elasticsearch的基本原理与形成机制

1、搜索引擎的基本原理 通常搜索引擎包括&#xff1a;数据采集、文本分析、索引存储、搜索等模块&#xff0c;它们之间的协作流程如下图&#xff1a; 数据采集模块负责采集需要搜索的数据源。 文本分析模块是将结构化数据中的长文本切分成有实际意义的词&#xff0c;这样用户…

**AI的三大支柱:神经网络、大数据与GPU计算的崛起之路**

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Python | Leetcode Python题解之第542题01矩阵

题目&#xff1a; 题解&#xff1a; class Solution:def updateMatrix(self, matrix: List[List[int]]) -> List[List[int]]:m, n len(matrix), len(matrix[0])# 初始化动态规划的数组&#xff0c;所有的距离值都设置为一个很大的数dist [[10**9] * n for _ in range(m)]…

RabbitMQ 管理平台(控制中心)的介绍

文章目录 一、RabbitMQ 管理平台整体介绍二、Overview 总览三、Connections 连接四、Channels 通道五、Exchanges 交换机六、Queues 队列查看队列详细信息查看队列的消息内容 七、Admin 用户给用户分配虚拟主机 一、RabbitMQ 管理平台整体介绍 RabbitMQ 管理平台内有六个模块&…

【机器学习】聚类算法分类与探讨

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

易语言模拟真人动态生成鼠标滑动路径

一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序&#xff0c;它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言&#xff0c;原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势&#xff1a; 模拟…

Linux:防火墙和selinux对服务的影响

1-1selinux 1-1 SELinux是对程序、文件等权限设置依据的一个内核模块。由于启动网络服务的也是程序&#xff0c;因此刚好也 是能够控制网络服务能否访问系统资源的一道关卡。 1-2 SELinux是通过MAC的方式来控制管理进程&#xff0c;它控制的主体是进程&#xff0c;而目标则是…

华为eNSP:QinQ

一、什么是QinQ&#xff1f; QinQ是一种网络技术&#xff0c;全称为"Quantum Insertion"&#xff0c;也被称为"Q-in-Q"、"Double Tagging"或"VLAN stacking"。它是一种在现有的VLAN&#xff08;Virtual Local Area Network&#xff0…

运动控制 PID算法

文章目录 一、自动控制简介1.1 开环控制系统1.2 闭环控制系统1.3 电机速度闭环控制系统 二、PID算法2.1 比例2.1.1 静态误差 2.2 积分2.3 微分环节2.4 位置式PID2.5 增量式PID 一、自动控制简介 自动控制系统是使用自动控制装置对关键控制参数进行自动控制&#xff0c;使它在收…

Pytorch实现transformer语言模型

转载自&#xff1a;| 03_language_model/02_Transformer语言模型.ipynb | 从头训练Transformer语言模型 |Open In Colab | Transformer语言模型 本节训练一个 sequence-to-sequence 模型&#xff0c;使用pytorch的 nn.Transformer <https://pytorch.org/docs/master/nn.ht…

C语言例题练手(1)

前几篇博客的内容已经涉及了C语言的部分语法知识&#xff0c;我们可以尝试做一些编程题&#xff0c;或者换一种说法就是可以写出什么样的程序以此来解决一些问题。 题目来自牛客网https://www.nowcoder.com和C语言菜鸟教程C 语言教程 | 菜鸟教程 数值计算 【例1】带余除法计…

Spring AI 核心概念

SpringAI 核心概念 1. Models2. Prompts3. Prompt Templates4. Embeddings5. Tokens6. Structured Output7. Bringing Your Data & APIs to the AI Model7.1 Retrieval Augmented Generation7.2 Function Calling 1. Models AI 模型是用于处理和生成信息的算法&#xff0c…

http请求响应详解

http介绍 http协议&#xff1a; Http”协议称为是“超文本传输协议”&#xff08;HTTP-Hypertext transfer protocol&#xff09;。它定义了浏览器怎么向万维网服务器请求万维网文档&#xff0c;以及服务器怎么样把文档传送给浏览器。 https协议&#xff1a; 传统的HTTP协议…

直播系统搭建教程安装说明

需要安装的软件(宝塔【软件商店】中查找安装): 1.PHP7.0 ~ PHP7.3 需要安装的扩展:(宝塔【PHP管理】【安装扩展】中安装) *PDO PHP Extension * MBstring PHP Extension * CURL PHP Extension * Mylsqi PHP Extension * Redis PHP Extension * fileinfo PHP Extension …

redis7学习笔记

文章目录 1. 简介1.1 功能介绍1.1.1 分布式缓存1.1.2 内存存储和持久化(RDBAOF)1.1.3 高可用架构搭配1.1.4 缓存穿透、击穿、雪崩1.1.5 分布式锁1.1.6 队列 1.2 数据类型StringListHashSetZSetGEOHyperLogLogBitmapBitfieldStream 2. 命令2.1 通用命令copydeldumpexistsexpire …

51c~C语言~合集1

我自己的原文哦~ https://blog.51cto.com/whaosoft/12428240 一、C语言和C的区别 ​ C语言虽说经常和C在一起被大家提起&#xff0c;但可千万不要以为它们是一个东西。现在我们常用的C语言是C89标准&#xff0c;C是C99标准的。C89就是在1989年制定的标准&#xff0c;如今最新…

【论文解读】EdgeYOLO:一种边缘实时目标检测器(附论文地址)

论文地址&#xff1a;https://arxiv.org/pdf/2302.07483 这篇文章的标题是《EdgeYOLO: An Edge-Real-Time Object Detector》&#xff0c;由中国北京理工大学的Shihan Liu、Junlin Zha、Jian Sun、Zhuo Li和Gang Wang共同撰写。这篇论文提出了一个基于最新YOLO框架的高效、低复…