Flink自定义Source模拟数据流

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zxl</groupId>
    <artifactId>FlinkJoin</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <!--com.mysql.cj.jdbc.Driver-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--这个一定要加,否则会报错(5001好像是,记不清了)-->
        <dependency>
            <groupId>org.glassfish.jersey.inject</groupId>
            <artifactId>jersey-hk2</artifactId>
            <version>2.34</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

实体类

订单类

package com.zxl.bean;


// TODO: 2024/1/6 订单类 


public class Orders {
    //订单ID
    private Long order_id;
    //用户ID
    private Long user_id;
    //订单日期
    private Long order_date;
    //订单金额
    private Integer order_amount;
    //商品ID
    private Integer product_id;
    //订单数量
    private Long order_num;

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public Long getUser_id() {
        return user_id;
    }

    public void setUser_id(Long user_id) {
        this.user_id = user_id;
    }

    public Long getOrder_date() {
        return order_date;
    }

    public void setOrder_date(Long order_date) {
        this.order_date = order_date;
    }

    public Integer getOrder_amount() {
        return order_amount;
    }

    public void setOrder_amount(Integer order_amount) {
        this.order_amount = order_amount;
    }

    public Integer getProduct_id() {
        return product_id;
    }

    public void setProduct_id(Integer product_id) {
        this.product_id = product_id;
    }

    public Long getOrder_num() {
        return order_num;
    }

    public void setOrder_num(Long order_num) {
        this.order_num = order_num;
    }

    public Orders() {
    }

    public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {
        this.order_id = order_id;
        this.user_id = user_id;
        this.order_date = order_date;
        this.order_amount = order_amount;
        this.product_id = product_id;
        this.order_num = order_num;
    }

    @Override
    public String toString() {
        return "Orders{" +
                "order_id=" + order_id +
                ", user_id=" + user_id +
                ", order_date=" + order_date +
                ", order_amount=" + order_amount +
                ", product_id=" + product_id +
                ", order_num=" + order_num +
                '}';
    }
}

支付类

package com.zxl.bean;



// TODO: 2024/1/6 支付类 


public class Payments {
    //支付ID
    private Long payment_id;
    //订单号
    private Long order_id;
    //支付金额
    private Integer payment_amount;
    //支付类型
    private String payment_type;
    //支付日期
    private Long payment_date;

    public Long getPayment_id() {
        return payment_id;
    }

    public void setPayment_id(Long payment_id) {
        this.payment_id = payment_id;
    }

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public Integer getPayment_amount() {
        return payment_amount;
    }

    public void setPayment_amount(Integer payment_amount) {
        this.payment_amount = payment_amount;
    }

    public String getPayment_type() {
        return payment_type;
    }

    public void setPayment_type(String payment_type) {
        this.payment_type = payment_type;
    }

    public Long getPayment_date() {
        return payment_date;
    }

    public void setPayment_date(Long payment_date) {
        this.payment_date = payment_date;
    }

    public Payments() {
    }

    public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {
        this.payment_id = payment_id;
        this.order_id = order_id;
        this.payment_amount = payment_amount;
        this.payment_type = payment_type;
        this.payment_date = payment_date;
    }

    @Override
    public String toString() {
        return "payments{" +
                "payment_id=" + payment_id +
                ", order_id=" + order_id +
                ", payment_amount=" + payment_amount +
                ", payment_type='" + payment_type + '\'' +
                ", payment_date=" + payment_date +
                '}';
    }
}

商品类

用作维表测试

package com.zxl.bean;

// TODO: 2024/1/6 商品类


public class Products {
    //商品ID
    private Integer product_id;
    //商品名称
    private String product_name;
    //商品价格
    private Integer product_price;
    //商品库存
    private Long product_num;
    //商品分类
    private String product_type;

    public Integer getProduct_id() {
        return product_id;
    }

    public void setProduct_id(Integer product_id) {
        this.product_id = product_id;
    }

    public String getProduct_name() {
        return product_name;
    }

    public void setProduct_name(String product_name) {
        this.product_name = product_name;
    }

    public Integer getProduct_price() {
        return product_price;
    }

    public void setProduct_price(Integer product_price) {
        this.product_price = product_price;
    }

    public Long getProduct_num() {
        return product_num;
    }

    public void setProduct_num(Long product_num) {
        this.product_num = product_num;
    }

    public String getProduct_type() {
        return product_type;
    }

    public void setProduct_type(String product_type) {
        this.product_type = product_type;
    }

    public Products() {
    }

    public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {
        this.product_id = product_id;
        this.product_name = product_name;
        this.product_price = product_price;
        this.product_num = product_num;
        this.product_type = product_type;
    }

    @Override
    public String toString() {
        return "products{" +
                "product_id=" + product_id +
                ", product_name='" + product_name + '\'' +
                ", product_price=" + product_price +
                ", product_num=" + product_num +
                ", product_type='" + product_type + '\'' +
                '}';
    }
}

数据生成

订单数据生成

package com.zxl.datas;

import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


import java.util.Random;


public class OrdersData implements SourceFunction<Orders> {

    private static Random random = new Random();

    private static boolean isRunning = true;

    private static Integer num = 0;

    //订单ID
    private static Long getOrder_id() {
        num++;
        long aLong = Long.parseLong(num.toString());
        return aLong;
    }

    //订单日期
    private static Long getOrder_date() {

        return System.currentTimeMillis();
    }

    //用户ID
    private static Long getUser_id() {
        return random.nextLong();
    }

    //订单金额
    private static Integer getOrder_amount() {
        return random.nextInt(100);
    }

    //商品ID
    private static Integer getProduct_id() {

        return random.nextInt(100);
    }

    //订单数量
    private static Long getOrder_num() {
        return random.nextLong();
    }

    //订单类
    private static Orders getOrders() {
        Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());
        return orders;
    }

    @Override
    public void run(SourceContext<Orders> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(getOrders());
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

支付数据生成

package com.zxl.datas;


import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Date;
import java.util.Random;

public class PaymentData implements SourceFunction<Payments> {

    private static Random random = new Random();

    private static boolean isRunning = true;

    private static Integer num = 0;

    //支付ID
    private static Long getPayment_id(){
        return random.nextLong();
    }

    //订单ID
    private static Long getOrder_id() {
        num++;
        long aLong = Long.parseLong(num.toString());
        return aLong;
    }

    //支付金额
    private static Integer getPayment_amount(){
        return random.nextInt(1000);
    }

    //支付类型
    private static String getPayment_type(){
        String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};
        int are= random.nextInt(6);
        String area=type[are];
        return area;
    }

    //支付日期
    private static Long getPayment_date(){
        return System.currentTimeMillis();
    }

    //支付类
    private static Payments getPayments(){
        Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());
        return payments;
    }

    @Override
    public void run(SourceContext<Payments> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(getPayments());
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

测试数据打印

package com.zxl.flink;


import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flinkWorks {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据 
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据 
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        //打印数据
        paymentsDataStreamSource.print();
        ordersDataStreamSource.print();
        //启动程序
        environment.execute();
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/298941.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

瓢虫目标检测数据集VOC格式400张

瓢虫&#xff0c;一种小巧玲珑、色彩鲜艳的昆虫&#xff0c;因其独特的形态和生态习性而受到广泛欢迎。 瓢虫的体型小巧&#xff0c;一般为圆球形&#xff0c;体色鲜艳&#xff0c;有红、黄、黑等多种颜色。它们通常有一个坚硬的外壳&#xff0c;可以保护自己不受天敌的侵害。…

12月笔记

#pragma once 防止多次引用头文件&#xff0c;保证同一个&#xff08;物理意义上&#xff09;文件被多次包含&#xff0c;内容相同的两个文件同样会被包含。 头文件.h与无.h的文件&#xff1a; iostream是C的头文件&#xff0c;iostream.h是C的头文件&#xff0c;即标准的C头文…

前端--基础 常用标签 - 超链接标签 ( 内部链接,空链接,下载链接,网页元素连接)

链接分类 &#xff1a; 外部链接 内部链接 空链接 下载链接 网页元素链接 内部链接 &#xff1a; 即 网站内部页面之间的相互链接&#xff0c;直接点击 链接内部页面名称即可 所谓内部链接&#xff0c;就是在同一个网站里面&#xff0c;有许多链接&#xff0c;当你在 a…

机器学习笔记 - 用于语义图像分割的空洞卷积DeepLabv3

一、什么是DeepLabv3&#xff1f; DeepLabv3 是用于语义分割任务的深度神经网络 (DNN) 架构。虽然不是比较新的网络模型&#xff0c;但是也是分割模型里的杰出代表之一&#xff0c;所以还是值得深入了解。 它使用Atrous&#xff08;Dilated&#xff09;卷积来控制感受野和特征图…

房贷计算器,妥妥的数学计算

根据给出的公式&#xff0c;编写房贷计算器。妥妥的数学计算&#xff0c;把数学公式“翻译”成代码就好。 (笔记模板由python脚本于2024年01月06日 18:08:55创建&#xff0c;本篇笔记适合初具基本编程能力的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;http…

服务器GPU温度过高挂掉排查记录

服务器GPU挂掉 跑深度学习的代码的时候发现中断了。通过命令查看&#xff1a; nvidia-smi显示 Unable to determine the device handle for GPU 0000:01:00.0: Unknown Error。感觉很莫名其妙。通过重启大法之后&#xff0c;又能用一段时间。 shutdown -r now但是过了一个小…

遗传算法(GA)、模拟退火算法(SAA)、蚁群算法(ACO)、粒子群算法(PSO)优缺点汇总

遗传算法 优点&#xff1a; 与问题领域无关且快速随机的搜索能力&#xff0c;不会陷入局部最优解&#xff1b;搜索从群体出发&#xff0c;具有潜在的并行性&#xff0c;提高运行速度&#xff0c;鲁棒性高&#xff1b;搜索使用评价函数启发&#xff0c;过程简单&#xff1b;使…

基于Java实现全功能电子商城

&#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩项目推荐订阅&#x1f447;&#x1f3fb; 不然下次找不到哟 基于SpringBoot的旅游网站 基于SpringBoot的MusiQ音乐网站 感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及…

小游戏实战丨基于PyGame的俄罗斯方块小游戏

文章目录 写在前面PyGame五子棋注意事项系列文章写在后面 写在前面 本期内容&#xff1a;基于pygame的俄罗斯方块小游戏 下载地址&#xff1a;https://download.csdn.net/download/m0_68111267/88700182 实验环境 python3.11及以上pycharmtkinter PyGame Pygame是一个非常…

Java设计模式-模板方法模式

目录 一、豆浆制作问题 二、模板方法模式基本介绍 三、原理类图 四、模板方法模式解决豆浆制作问题 五、模板方法模式的钩子方法 六、模板方法模式在Spring框架应用的源码分析 七、注意事项和细节 一、豆浆制作问题 编写制作豆浆的程序&#xff0c;说明如下 : 1) 制作…

案例098:基于微信小程序的电子购物系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

【大数据进阶第三阶段之Datax学习笔记】使用阿里云开源离线同步工具DataX 实现数据同步

【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax概述 【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax快速入门 【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax类图 【大数据进阶第三阶段之Datax学习笔记】使用…

RH850P1X芯片学习笔记-A/D Converter (ADCF)

文章目录 Features of RH850/P1x-C ADCFNumber of UnitsRegister Base AddressClock SupplyInterrupts and DMAHardware ResetExternal Input/Output SignalsVirtual Channel OverviewFunctional OverviewBlock DiagramPhysical Channels, Virtual Channels and Scan Groups Re…

SPRING BOOT发送邮件验证码(Gmail邮箱)

SPRING BOOT邮件发送验证码 一、Gmail邮箱配置 1、进入Gmail(https://mail.google.com) 2、打开谷歌右上角设置 3、启用POP/IMP 4、启用两步验证(https://myaccount.google.com/security) 5、建立应用程式密码 6、复制保存应用程式密码 二、代码 1、引入依赖 <d…

【LMM 012】TinyGPT-V:24G显存训练,8G显存推理的高效多模态大模型

论文标题&#xff1a;TinyGPT-V: Efficient Multimodal Large Language Model via Small Backbones 论文作者&#xff1a;Zhengqing Yuan, Zhaoxu Li, Lichao Sun 作者单位&#xff1a;Anhui Polytechnic University, Nanyang Technological University, Lehigh University 论文…

Window端口占用处理

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精…

C#编程-实现函数重载

考虑一个示例&#xff1a;您必须编写一个程序来实现计算器的功能。计算器执行各种运算&#xff0c;例如数字的加、减及乘等。可以对任何类型的数据执行这些运算。这是否意味着您必须定义单独的函数名&#xff08;如addInteger、addFloat和addDoublie&#xff09;对每种此类数字…

VMware VCP+VCAP持续通过

稳定通过&#xff0c;安全可靠

LOG滤波器原理探究---计算机视觉和特征检测

先来看几个滤波器公式&#xff1a; 高斯滤波器&#xff1a; G ( x , y ; σ ) 1 2 π σ 2 e − x 2 y 2 2 σ 2 G(x,y;\sigma) \frac{1}{2 \pi \sigma^2} e^{-\frac{x^2 y^2}{2\sigma^2}} G(x,y;σ)2πσ21​e−2σ2x2y2​ 图像的二阶导数&#xff1a; ∇ 2 f ∂ 2 f ∂…

看图识熊(二)

使用Tools for AI封装onnx模型并推理 进行这一步之前&#xff0c;请确保已正确安装配置了Visual Studio 2017 和 Microsoft Visual Studio Tools for AI环境。 项目的代码也可以在这里找到&#xff0c;下面的步骤是带着大家从头到尾做一遍。 界面设计 创建Windows窗体应用(…