Flink 1.18.1的基本使用

系统示例应用
/usr/local/flink-1.18.1/bin/flink run /usr/local/flies/streaming/SocketWindowWordCount.jar --port 9010
nc -l 9010
asd asd sdfsf sdf sdfsdagd sdf

在这里插入图片描述

在这里插入图片描述


单次统计示例工程
cd C:\Dev\IdeaProjects


mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.18.1
 Define value for property 'groupId':
 Define value for property 'artifactId':
 Define value for property 'version' 1.0-SNAPSHOT: :
 Define value for property 'package' : :

 com.edu
 flink-example
 1.0.0
 com.edu.flink
package com.edu.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        //设置运行时环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        //设置输入流,并执行数据流的处理和转换
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.18.128", 9000)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
        );

        //设置输出流
        dataStream.print();
        //执行程序
        env.execute("Window WordCount");
        System.out.print("finished...");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String,
            Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out)
                throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/364220.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

单元测试实践

一、写在开始写单元测试前 1.1 背景 我们开发都知道单元测试的重要性&#xff0c;而且每个开发都有要写单元测试的意识单元测试和代码编写结构息息相关&#xff0c;业界常用专业名词TDD&#xff08;测试驱动开发&#xff09;&#xff0c;言外之意我们开始编写代码的时候就已经…

开发板——X210开发板的SD卡启动方式

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 参考博客&#xff1a; S5PV210 SD卡启动 - 简书 关于存储器的相关基础知识&#xff0c;见博文&#xff1a; 外存——SD卡/iNand芯片与S5PV210的SD/MMC/iNand控制器-CSDN博客 RAM、ROM和FLASH三…

Qt6入门教程 15:QRadioButton

目录 一.简介 二.常用接口 三.实战演练 1.径向渐变 2.QSS贴图 3.开关效果 4.非互斥 一.简介 QRadioButton控件提供了一个带有文本标签的单选按钮。 QRadioButton是一个可以切换选中&#xff08;checked&#xff09;或未选中&#xff08;unchecked&#xff09;状态的选项…

Flink 流式读取 Debezium CDC 数据写入 Hudi 表无法处理 -D / Delete 消息

问题场景是&#xff1a;使用 Kafka Connect 的 Debezium MySQL Source Connector 将 MySQL 的 CDC 数据 &#xff08;Avro 格式&#xff09;接入到 Kafka 之后&#xff0c;通过 Flink 读取并解析这些 CDC 数据&#xff0c;然后以流式方式写入到 Hudi 表中&#xff0c;测试中发现…

Java Springboot解决很多页面Whitelabel Error Page(404)问题

前言 最近接手了一个前后端一体的项目&#xff0c;发现其默认路径不是主机端口&#xff08;如&#xff1a;http://localhost:3453/&#xff09;的形式。很多页面的访问是加了一个层级。只要访问就会出现如下提示&#xff1a; Whitelabel Error Page This application has no …

双目相机立体匹配基础

双目匹配就是用左相机和右相机去拍摄同一个点&#xff0c;目的是找到三维世界的同一个点&#xff0c;也就是在左相机和右相机中的成像点之间的像素差&#xff08;视差&#xff09;&#xff0c;根据视差去求解深度&#xff0c;那么找到左相机点到右相机的同一个对应点这个过程就…

草图导入3d后模型贴材质的步骤?---模大狮模型网

3D模型在导入草图大师后出现混乱可能有多种原因&#xff0c;以下是一些可能的原因和解决方法&#xff1a; 模型尺寸问题&#xff1a;如果3D模型的尺寸在导入草图大师时与画布尺寸不匹配&#xff0c;可能导致模型混乱。解决方法是在3D建模软件中调整模型的尺寸&#xff0c;使其适…

【NodeJS】005- MongoDB数据库

1.简介 1.1 Mongodb 是什么 MongoDB 是一个基于分布式文件存储的数据库&#xff0c;官方地址 https://www.mongodb.com/ 1.2 数据库是什么 数据库&#xff08;DataBase&#xff09;是按照数据结构来组织、存储和管理数据的 应用程序 1.3 数据库的作用 数据库的主要作用就是…

目标检测:1预备知识

开始涉及目标检测内容&#xff0c;总结一下学习记录 1、目标检测的基本概念 &#xff08;一&#xff09;什么是目标检测 目标检测&#xff08;Object Detection&#xff09; 的任务是找出图像中所有感兴趣的目标&#xff08;物体&#xff09;&#xff0c;不同于分类和回归问题…

react-virtualized实现行元素不等高的虚拟列表滚动

前言&#xff1a; 当一个页面中需要接受接口返回的全部数据进行页面渲染时间&#xff0c;如果数据量比较庞大&#xff0c;前端在渲染dom的过程中需要花费时间&#xff0c;造成页面经常出现卡顿现象。 需求&#xff1a;通过虚拟加载&#xff0c;优化页面渲染速度 优点&#xff1…

如何批量删除文件名里的多余文字?

如何批量删除文件名里的多余文字&#xff1f;删除文件名中多余的文字可以提高文件管理的效率和可读性。简洁性&#xff1a;删除多余的文字可以使文件名更简洁&#xff0c;减少冗余信息。这样可以更轻松地浏览和识别文件&#xff0c;尤其是当文件数量较多时。可读性&#xff1a;…

tcp/ip模型中,帧是第几层的数据单元?

在网络通信的世界中&#xff0c;TCP/IP模型以其高效和可靠性而著称。这个模型是现代互联网通信的基石&#xff0c;它定义了数据在网络中如何被传输和接收。其中&#xff0c;一个核心的概念是数据单元的层级&#xff0c;特别是“帧”在这个模型中的位置。今天&#xff0c;我们就…

代码随想录day17--二叉树的应用5

LeetCode654.最大二叉树 题目描述&#xff1a; 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。递归地在最大值 左边 的 子数组前缀上 构建左子树。递归地在最大值 右边 的 子数组后…

后端性能优化的一些总结

目录 1、背景 2、优化实现 2.1查询数据表速度慢 2.2调别人接口速度慢 2.3导入速度慢、 2.4导出速度慢的做出介绍 2.5统计功能速度慢 3、总结 1、背景 系统上线后&#xff0c;被用户反应系统很多功能响应时长很慢。用户页面影响速度有要求&#xff0c;下面针对查询数据表…

C#,入门教程(36)——尝试(try)捕捉(catch)不同异常(Exception)的点滴知识与源代码

上一篇&#xff1a; C#&#xff0c;入门教程(35)——哈希表&#xff08;Hashtable&#xff09;的基础知识与用法https://blog.csdn.net/beijinghorn/article/details/124236243 1、try catch 错误机制 Try-catch 语句包含一个后接一个或多个 catch 子句的 try 块&#xff0c;这…

深度学习(7)--Keras项目详解(卷积神经网络)

目录 一.项目介绍 二.卷积神经网络构造 2.1.判断是否是channels first的back end 2.2.卷积层构造 2.3.添加激活函数 2.4.池化层构造 2.5.全连接FC层构造 三.完整代码 3.1.学习率衰减设置 四.首次运行结果 五.数据增强对结果的影响 六.BatchNormalization对结果的影…

LeetCode: 160.相交链表(令人赞叹的优雅)

160. 相交链表 - 力扣&#xff08;LeetCode&#xff09; 目录 官方双指针解法&#xff1a; 博主的辣眼代码&#xff1a; 每日一表情包&#xff1a; 博主还未学习哈希表&#xff0c;所以介绍的是双指针法&#xff0c;此题的哈希表解法时O&#xff08;nm&#xff09;空O&…

R语言入门笔记2.0

1.创建数据框 在R语言中&#xff0c;可以使用data.frame函数来创建数据框。以下是一个简单的示例&#xff0c;这段R语言代码创建了一个名为student的数据框&#xff0c;其中包含了学生的ID、性别、姓名和出生日期&#xff0c;首先创建一个包含学生出生日期的向量&#xff0c;再…

网络时间协议NTP

网络时间协议NTP(Network Time Protocol)是TCP/IP协议族里面的一个应用层协议。NTP用于在一系列分布式时间服务器与客户端之间同步时钟。NTP的实现基于IP和UDP。NTP报文通过UDP传输,端口号是123。 随着网络拓扑的日益复杂,整个网络内设备的时钟同步将变得十分重要。如果依靠…

Skywalking的Trace Profiling 代码级性能剖析功能应用详解

代码级性能剖析 Skywalking 提供了Trace Profiling功能对具体出现问题的span进行代码级性能剖析。 代码级性能剖析就是利用方法栈快照&#xff0c;并对方法执行情况进行分析和汇总。并结合有限的分布式追踪 span 上下文&#xff0c;对代码执行速度进行估算。性能剖析激活时&a…