【极数系列】Flink集成DataSource读取Socket请求数据(09)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于socket套接字读取数据
    • 3.1 从套接字读取。元素可以由分隔符分隔。
    • 3.2 windows安装netcat工具
      • (1)下载netcat工具
      • (2)安装部署
      • (3)启动socket端口监听
  • 04 源码实战demo
    • 4.1 pom.xm依赖
    • 4.2创建socket数据流作业
    • 4.3实时cmd窗口输入数据

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkSocketSourceJob(socket请求)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于socket套接字读取数据

3.1 从套接字读取。元素可以由分隔符分隔。

3.2 windows安装netcat工具

(1)下载netcat工具

下载地址:https://eternallybored.org/misc/netcat/

(2)安装部署

注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。

将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。

(3)启动socket端口监听

注意:该端口需要跟代码中监听的端口一致,否则获取不到数据

nc -l -p 8081

04 源码实战demo

4.1 pom.xm依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2创建socket数据流作业

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

/**
 * @description flink的socket请求的source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkSocketSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkSocketSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //3.基于socket请求的source使用
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",8081);

        //4.输出打印
        dataStreamSource.print();

        //5.启动运行
        env.execute();
    }

}

4.3实时cmd窗口输入数据

注意:先启动cmd窗口监听再启动程序,否则会报端口连接失败

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/360794.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[网络安全]IIS---FTP服务器 、serverU详解

一 . FTP服务器(File Transfor Protocol) : 协议:文件传输协议 端口号:TCP: 20(数据) / 21(控制) 二 . FTP工作方式: 1.主动模式 : (FTP服务器21端口与FTP客户端产生的随机端口先建立连接 建立连接后,再使用FTP服务器21端口与FTP客户端创建的一个新的随机端口进行发送…

【lesson29】MySQL事务不同隔离级别之间的区别演示

文章目录 读未提交读提交可重复读串行化总结 读未提交 我们看到这时的隔离级别是读提交&#xff0c;那么我们就要把隔离级别改为&#xff0c;读未提交。 我们可以看到两个终端的mysql隔离级别已经都被改成了读未提交。 开始演示读未提交&#xff1a; 开启2个事务 事务2读…

【C++】STL之空间配置器(了解)

一、什么是空间配置器 空间配置器 &#xff0c;顾名思义就是为各个容器高效的管理空间&#xff08;空间的申请与回收&#xff09;的&#xff0c;在默默地工作。虽然在常规使用 STL 时&#xff0c;可能用不到它&#xff0c;但站在学习研究的角度&#xff0c;学习它的实现原理对…

[BUUCTF]-PWN:cmcc_pwnme2解析

保护 ida 完整exp&#xff1a; from pwn import* context(log_leveldebug) #premote(node5.buuoj.cn,26964) pprocess(./pwnme2) addhome0x8048644 addflag0x8048682 getfile0x80485CB main0x80486F8 pop_ebp0x8048680 ret0x80483f2 pop_ebx0x8048409 pop_edi_ebp0x804867f st…

QT自制软键盘 最完美、最简单、支持中文输入(二)

目录 一、前言 二、本自制虚拟键盘特点 三、中文输入原理 四、组合键输入 五、键盘事件模拟 六、界面 七、代码 7.1 frmKeyBoard 头文件代码 7.2 frmKeyBoard 源文件代码 八、使用示例 九、效果 十、结语 一、前言 由于系统自带虚拟键盘不一定好用&#xff0c;也不一…

大学生以C语言为开始怎样学好编程呢?

大学生以C语言为开始怎样学好编程呢&#xff1f; 在开始前我分享下我的经历&#xff0c;刚入行时遇到一个好公司和师父&#xff0c;给了我机会&#xff0c;两年时间从3k薪资涨到18k的&#xff0c; 我师父给了一些C语言学习方法和资料&#xff0c;让我不断提升自己&#xff0c;…

【Node.js基础】Node.js的介绍与安装

文章目录 前言一、什么是Node.js&#xff1f;二、安装Node.js2.1 Windows系统2.2 macOS系统2.3 Linux系统 三、运行js代码总结 前言 随着互联网技术的不断发展&#xff0c;构建高性能、实时应用的需求日益增长。Node.js作为一种服务器端运行时环境&#xff0c;以其事件驱动、非…

2024新鲜出炉 Java集合常见面试题总结(下)

2024新鲜出炉 Java集合常见面试题总结(下) 文章目录 2024新鲜出炉 Java集合常见面试题总结(下)Map&#xff08;重要&#xff09;HashMap 和 Hashtable 的区别HashMap 和 HashSet 区别HashMap 和 TreeMap 区别HashSet 如何检查重复?HashMap 的底层实现JDK1.8 之前JDK1.8 之后 H…

java基于springboot的美妆化妆品商城购物网站ssm+vue

美妆购物网站分为管理员&#xff0c;商家&#xff0c;用户三种权限。 用户可以注册&#xff0c;可以登录&#xff0c;用户进入到首页可以看到热门化妆品和新品化妆品&#xff0c;可以选购化妆品&#xff08;可以通过搜索查询&#xff09;加入购物车&#xff0c;查看化妆品详细情…

(自用)learnOpenGL学习总结-高级OpenGL-抗锯齿

MSAA 光栅器会将一个图元的所有顶点作为输入&#xff0c;并将它转换为一系列的片段。顶点坐标理论上可以取任意值&#xff0c;但片段不行&#xff0c;因为它们受限于你窗口的分辨率。顶点坐标与片段之间几乎永远也不会有一对一的映射&#xff0c;所以光栅器必须以某种方式来决定…

github开源代码流程-初始化配置 quick start

开始前配置&#xff1a; 需要有一个github账号 需要安装git 一.配置github 登陆github 需要username&#xff08;这个后面会用到&#xff09;&#xff0c;password 1.配置pat密钥token 由于github已经移除了靠password进行验证的方式&#xff0c;所以必须进行个人令牌的设…

一键给家长私发成绩

各位老师&#xff0c;你们是否也有过这样的经历&#xff1a;每到考试后&#xff0c;为了将学生的成绩一一发给家长&#xff0c;费尽心思地整理、核对&#xff0c;甚至有时候还要加班。如今&#xff0c;有了易查分&#xff0c;这一切似乎变得轻松起来。但这个功能真的是老师们的…

2023 IoTDB Summit:Dr. Feinauer《Apache IoTDB在德国工业和关键基础设施中的应用》

12 月 3 日&#xff0c;2023 IoTDB 用户大会在北京成功举行&#xff0c;收获强烈反响。本次峰会汇集了超 20 位大咖嘉宾带来工业互联网行业、技术、应用方向的精彩议题&#xff0c;多位学术泰斗、企业代表、开发者&#xff0c;深度分享了工业物联网时序数据库 IoTDB 的技术创新…

Patch2QL:开源供应链漏洞挖掘和检测的新方向

背景 开源生态的上下游中&#xff0c;漏洞可能存在多种成因有渊源的其它缺陷&#xff0c;统称为“同源漏洞”&#xff0c;典型如&#xff1a; 上游代码复用缺陷。开源贡献者在实现功能相似的模块时&#xff0c;常复用已有模块代码或逻辑&#xff1b;当其中某个模块发现漏洞后…

成功解决Error:AttributeError: module ‘numpy‘ has no attribute ‘long‘.

成功解决Error&#xff1a;AttributeError: module ‘numpy‘ has no attribute ‘long‘. &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;报错分析&#x1f333;&#x1f333;解决方案1&#xff1a;降低NumPy版本&#x1f333;&#x1f333…

vue实现瀑布流

每个色块宽度一致&#xff0c;高度自适应 <!DOCTYPE html> <html><head><meta charset"utf-8"><meta http-equiv"X-UA-Compatible" content"IEedge,chrome1"><meta name"renderer" content"we…

【Docker】了解Docker Desktop桌面应用程序,TA是如何管理和运行Docker容器(1)

欢迎来到《小5讲堂》&#xff0c;大家好&#xff0c;我是全栈小5。 这是《Docker容器》序列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对…

EasyExcel导出Excel和多个图片到Zip,并实现超链接

前言&#xff1a;之前做了将图片直接插入到excel的需求&#xff0c;由于数据太多会导致导出慢或者直接报错&#xff0c;于是采用了将图片和excel分开放在一个zip压缩包中&#xff0c;并且&#xff0c;excel中对应图片的列点击后可以直接超链接到对应的图片。 实现效果&#xff…

东芝2323AMW数码复合机扫描文件至U盘无法选择JPEG问题解决方法

东芝eSTUDIO2323AMW数码复合机扫描文件至U盘无法选择JPEG问题解决方法 问题描述&#xff1a; 东芝eSTUDIO2323AMW数码复合机扫描文件至U盘只能输出PDF文件格式&#xff0c;而下方的JPEG是灰色的无法选择&#xff1b; 解决方法&#xff1a; 将模式选择为“彩色模式”之后“JPEG”…

【c语言】简单贪吃蛇的实现

目录 一、游戏说明 ​编辑 二、地图坐标​ ​编辑 三、头文件 四、蛇身和食物​ 五、数据结构设计​ 蛇节点结构如下&#xff1a; 封装一个Snake的结构来维护整条贪吃蛇&#xff1a;​ 蛇的方向&#xff0c;可以一一列举&#xff0c;使用枚举&#xff1a; 游戏状态&a…