sprintboot集成flink快速入门demo

一、flink介绍

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

二、环境搭建

安装flink

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。

安装nc命令

 
 
yum install -y nc

启动socket端口

 
 
[root@node01 bin]# nc -lk 8888

三、代码工程

实验目的:无界流之读取socket文本流 

pom.xml

 
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>flink</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 添加 Flink 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>1.17.0</version>
        </dependency>




    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.et.flink.job.SocketJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>






</project>

SoketJob

 
 
package com.et.flink.job;


import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName SocketJob
 * @Description todo
 * @date 2024年02月29日 17:06
 */


public class SocketJob {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 指定并行度,默认电脑线程数
        env.setParallelism(3);
        // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
        DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);


        // 处理数据: 切换、转换、分组、聚合 得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
//                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);




        // 输出
        sum.print();


        // 执行
        env.execute();
    }


}

代码仓库

  • https://github.com/Harries/springboot-demo

四、测试

启动socket流

 
 
[root@cmn-zentao-002 ~]# nc -l 8888

本地执行

本地直接ideal启动main程序,在socket流中输入

 
 
abc bcd cde
bcd cde fgh
cde fgh hij

console日志显示

 
 
3> (abc,1)
1> (fgh,1)
3> (bcd,1)
3> (cde,1)
3> (bcd,2)
3> (cde,2)
3> (cde,3)
1> (fgh,2)
2> (hij,1)

集群执行

执行maven打包,将打包的jar上传到集群中94713cb0980fb13dd88d7785b93deef2.png在socker中输入字符,结果和本地一样

五、引用

  • https://juejin.cn/post/7283311024979066937

  • http://www.liuhaihua.cn/archives/710270.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/424172.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

⭐每天一道leetcode:13.罗马数字转整数(简单)

⭐今日份题目 罗马数字包含以下七种字符: I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M 100…

MATLAB知识点:条件判断switch-case-otherwise-end语句

​讲解视频&#xff1a;可以在bilibili搜索《MATLAB教程新手入门篇——数学建模清风主讲》。​ MATLAB教程新手入门篇&#xff08;数学建模清风主讲&#xff0c;适合零基础同学观看&#xff09;_哔哩哔哩_bilibili 节选自​第4章&#xff1a;MATLAB程序流程控制 switch翻译成…

使用C语言 打印出所有的水仙花数

水仙花数 一.什么是水仙花数二.如何获取一个数的每一位数三.如何计算一个数有几位数四.计算出所有的水仙花数 一.什么是水仙花数 水仙花数的定义&#xff1a;“水仙花数”是指一个n位数&#xff0c;其各位数字的n次方之和确好等于该数本身&#xff0c;如:153&#xff1d;1^ 3&a…

寻找峰值[中等]

优质博文IT-BLOG-CN 一、题目 峰值元素是指其值严格大于左右相邻值的元素。给你一个整数数组nums&#xff0c;找到峰值元素并返回其索引。数组可能包含多个峰值&#xff0c;在这种情况下&#xff0c;返回 任何一个峰值 所在位置即可。 你可以假设nums[-1] nums[n] -∞。 你…

【Python】批量读取文件夹中的excel文件

示例展示 代码 import os import pandas as pd folder_path r"C:\Users\admin\Desktop\excelfile" extension"xlsx" files [file for file in os.listdir(folder_path) if file.endswith(. extension)] for file in files:filepath os.path.join(folde…

ChatGPT支持下的PyTorch机器学习与深度学习技术应用

近年来&#xff0c;随着AlphaGo、无人驾驶汽车、医学影像智慧辅助诊疗、ImageNet竞赛等热点事件的发生&#xff0c;人工智能迎来了新一轮的发展浪潮。尤其是深度学习技术&#xff0c;在许多行业都取得了颠覆性的成果。另外&#xff0c;近年来&#xff0c;Pytorch深度学习框架受…

运用qsort函数进行快排并使用C语言模拟qsort

qsort 函数的使用 首先qsort函数是使用快速排序算法来进行排序的&#xff0c;下面我们打开官网来查看qsort是如何使用的。 这里有四个参数&#xff0c;首先base 是至待排序的数组的首元素的地址&#xff0c;num 是值这个数组的元素个数&#xff0c;size 是指每个元素的大小&am…

数字化转型导师坚鹏:证券公司数字化转型战略、方法与案例

证券公司数字化转型战略、方法与案例 课程背景&#xff1a; 数字化转型背景下&#xff0c;很多机构存在以下问题&#xff1a; 不清楚证券公司数字化转型的发展战略&#xff1f; 不知道证券公司数字化转型的核心方法&#xff1f; 不知道证券公司数字化转型的成功案例&am…

第四十八回 解珍解宝双越狱 孙立孙新大劫牢-Python模块和包概念与使用

吴用对宋江说&#xff0c;有个人&#xff0c;他是石勇的关系&#xff0c;与祝家庄的峦廷玉关系好&#xff0c;还是杨林、邓飞的老相识&#xff0c;他有一计.... 原来在宋江攻打祝家庄的时间段&#xff0c;山东海边登州也发生了一件事。登州山下有一家猎户&#xff0c;弟兄两个…

Linux下进程相关概念详解

目录 一、操作系统 概念 设计操作系统的目的 定位 如何理解“管理” 系统调用和库函数概念 二、进程 概念 描述进程—PCB&#xff08;process control block&#xff09; 查看进程 进程状态 进程优先级 三、其它的进程概念 一、操作系统 概念 任何计算机系统都包…

HPE ProLiant MicroServer Gen8更换坏硬盘(RAID 1+0)

HPE ProLiant MicroServer Gen8今天硬盘告警&#xff0c;坏了一块硬盘&#xff08;估计还是由于上次突然断电导致的&#xff09;&#xff0c;关机&#xff0c;拆下坏硬盘&#xff0c;更换新硬盘&#xff0c;开机后按了一次F1键&#xff0c;系统继续启动并正常使用&#xff0c;同…

VueCli的安装与卸载

文章目录 一.Node安装包的报读网盘提取码二、Vue脚手架Cli三、Vue-CLI使用步骤(自定义安装)1.转换路径并创建项目2.创建步骤的解释(保姆级)3.等待vue项目自己创建好(保姆级) 四、通过npm对vue的安装与卸载 一.Node安装包的报读网盘提取码 下面的链接为地址: Node的百度网盘提取…

面试准备:排序算法大汇总 C++

排序算法总结 直接插入排序 取出未排序部分的第一个元素&#xff0c;与已排序的部分从后往前比较&#xff0c;找到合适的位置。将大于它的已排序的元素向后移动&#xff0c;将该元素插入到合适的位置。 //1. 直接插入排序 void InsertionSort(vector<int>& nums){f…

#WEB前端(HTML属性)

1.实验&#xff1a;a,img 2.IDE&#xff1a;VSCODE 3.记录&#xff1a; a: href插入超链接 默认情况下在本窗口打开链接, target可以设置打开的窗口,parent在父窗口打开&#xff0c;blank新开串口打开,top在顶层串口打开,self为默认在本窗口打开 img: 插入图片 可以插…

走进SQL审计视图——《OceanBase诊断系列》之二

1. 前言 在SQL性能诊断上&#xff0c;OceanBase有一个非常实用的功能 —— SQL审计视图(gv$sql_audit)。在OceanBase 4.0.0及更高版本中&#xff0c;该功能是 gv$ob_sql_audit。它可以使开发和运维人员更方便地排查在OceanBase上运行过的任意一条SQL&#xff0c;无论这些SQL是成…

基于 Amazon EKS 的 Stable Diffusion ComfyUI 部署方案

01 背景介绍 Stable Diffusion 作为当下最流行的开源 AI 图像生成模型在游戏行业有着广泛的应用实践&#xff0c;无论是 ToC 面向玩家的游戏社区场景&#xff0c;还是 ToB 面向游戏工作室的美术制作场景&#xff0c;都可以发挥很大的价值&#xff0c;如何更好地使用 Stable Dif…

Day10:基础入门-HTTP数据包Postman构造请求方法请求头修改状态码判断

目录 数据-方法&头部&状态码 案例-文件探针 案例-登录爆破 工具-Postman自构造使用 思维导图 章节知识点&#xff1a; 应用架构&#xff1a;Web/APP/云应用/三方服务/负载均衡等 安全产品&#xff1a;CDN/WAF/IDS/IPS/蜜罐/防火墙/杀毒等 渗透命令&#xff1a;文件…

数字图像处理 SUJOY滤波器:用于图像边缘检测的通用一阶导数滤波器

1、前言 因为是比较旧的论文,但是据论文作者说SUJOY滤波器为图像边缘检测提供了比其他常用的一阶导数方法(如 Robert 算子、Prewitt 算子、Sobel 算子等)更好的方法。 经过测试感觉并没有作者说的那么好,很水的东西,另外图像领域的事情很少有绝对的,通常都是某些方法适合…

【二分】第k个缺失的数

第K个缺失的数 链接 . - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/kth-missing-positive-number/ 题目 题解 二段…

医学大数据|文献阅读|有关“胃癌+机器学习”的研究记录

目录 1.基于32基因特征构建的机器学习模型可有效预测胃癌患者的预后和治疗反应 2.胃癌患者术后90天死亡率的机器学习风险预测模型 3.使用机器学习模型预测幽门螺杆菌根除患者胃癌患病风险 4.利用初始内窥镜检查和组织学结果进行个性化胃癌发病率预测 1.基于32基因特征构建的…