【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、自定义Source介绍及示例
    • 1、示例-自定义数据源
      • 1)、java bean
      • 2)、自定义数据源实现
      • 3)、使用自定义的数据源
      • 4)、自定义数据源验证
  • 三、自定义数据源-MySQL
    • 1、maven依赖
    • 2、java bean
    • 3、mysql自定义数据源实现
    • 4、使用mysql自定义的数据源
    • 5、验证
      • 1)、准备mysql环境-建库表
      • 2)、启动TestCustomMySQLSourceDemo.java
      • 3)、往user表中添加数据,并观察应用程序控制台输出


本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、自定义Source介绍及示例

Flink提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

  • SourceFunction,非并行数据源,并行度只能=1
  • RichSourceFunction,多功能非并行数据源,并行度只能=1
  • ParallelSourceFunction,并行数据源,并行度能够>=1
  • RichParallelSourceFunction,多功能并行数据源,并行度能够>=1

1、示例-自定义数据源

本示例展示如何实现自定义数据源,并通过随机数产生数据信息,比较简单。

1)、java bean

如果使用其自定义的数据结构也可以,视情况需要。

package org.datastreamapi.source.custom.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private long clicks;
	private long ranks;
	private Long createTime;
}

2)、自定义数据源实现

继承RichParallelSourceFunction,并源源不断的产生数据,直到自己手动停止其运行或将启停标志flag设置成false。

package org.datastreamapi.source.custom;

import java.util.Random;
import java.util.UUID;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomUserSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;

	// 生产数据
	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		Random random = new Random();
		while (flag) {
			ctx.collect(new User(random.nextInt(100000001), "alanchan" + UUID.randomUUID().toString(), random.nextInt(9000001), random.nextInt(10001), System.currentTimeMillis()));
			Thread.sleep(1000);
		}
	}

	@Override
	public void cancel() {
		flag = false;
	}

}

3)、使用自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomUserSource());

完整的示例如下

package org.datastreamapi.source.custom;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomSourceDemo {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source,设置并行度是看每次产生多少条数据
		DataStream<User> userDS = env.addSource(new CustomUserSource()).setParallelism(2);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

4)、自定义数据源验证

运行TestCustomSourceDemo.java查看运行结果,本示例运行结果如下(每次运行结果均不同,供参考)

7> User(id=14317087, name=alanchan4ea184fd-472d-429c-b1b5-8fc16ba8c373, clicks=1815669, ranks=9648, createTime=1701824325507)
9> User(id=35956856, name=alanchanb715dab7-d3cb-40f3-b9e8-e0ba76d8b998, clicks=8888642, ranks=8143, createTime=1701824325507)
10> User(id=46536055, name=alanchan2aa31683-1af3-4d0b-81b6-c89265ba63f4, clicks=3766665, ranks=3282, createTime=1701824326536)
8> User(id=55937607, name=alanchan2ef54410-eaa4-47ed-8531-478957143051, clicks=5684939, ranks=8006, createTime=1701824326536)
9> User(id=92498863, name=alanchan1a324cdf-7ee6-4c6b-8059-7db01648428d, clicks=7199973, ranks=8005, createTime=1701824327552)
11> User(id=67794502, name=alanchanc73d8f15-abeb-4b86-9809-98094439e25c, clicks=654744, ranks=6799, createTime=1701824327552)

以上就完整的介绍了一个自定义数据源的过程,其中使用场景视情况而定。

三、自定义数据源-MySQL

上述示例中展示了自定义数据源的一种方式,就是自己产生数据或者其他的数据源获取,本示例展示的是以mysql数据源作为flink的数据源的实现。flink本身没有实现mysql数据源,需要自己实现。

1、maven依赖

本示例需要新增mysql的依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
	<!--<version>8.0.20</version> -->
</dependency>

2、java bean

package org.datastreamapi.source.custom.mysql.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}

3、mysql自定义数据源实现

package org.datastreamapi.source.custom.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomMySQLSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;
	private Connection conn = null;
	private PreparedStatement ps = null;
	private ResultSet rs = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
		String sql = "select id,name,pwd,email,age,balance from user";
		ps = conn.prepareStatement(sql);
	}

	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		while (flag) {
			rs = ps.executeQuery();
			while (rs.next()) {
				User user = new User(rs.getInt("id"), rs.getString("name"), rs.getString("pwd"), rs.getString("email"), rs.getInt("age"), rs.getDouble("balance"));
				ctx.collect(user);
			}
			//每5秒查询一次数据库
			Thread.sleep(5000);
		}

	}

	@Override
	public void cancel() {
		flag = false;
	}

}

4、使用mysql自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

完整示例如下:

package org.datastreamapi.source.custom.mysql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomMySQLSourceDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

5、验证

1)、准备mysql环境-建库表

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `age` smallint(6) NULL DEFAULT NULL,
  `balance` double(255, 0) NULL DEFAULT NULL,
  `email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pwd` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2)、启动TestCustomMySQLSourceDemo.java

启动TestCustomMySQLSourceDemo.java程序,并观察控制台输出。

3)、往user表中添加数据,并观察应用程序控制台输出

  • user 表数据
    在这里插入图片描述
  • 程序控制台输出(每5秒输出一次)
2> User(id=1, name=alan, pwd=123, email=alan.chan.chn@163.com, age=18, balance=20.0)
4> User(id=3, name=alanchanchn, pwd=123, email=alan.chan.chn@163.com, age=20, balance=30.0)
6> User(id=5, name=alan_chan_chn, pwd=123, email=alan.chan.chn@163.com, age=20, balance=46.0)
3> User(id=2, name=alanchan, pwd=123, email=alan.chan.chn@163.com, age=19, balance=25.0)
5> User(id=4, name=alan_chan, pwd=123, email=alan.chan.chn@163.com, age=19, balance=36.0)

以上,本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/239002.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1631. 最小体力消耗路径:广度优先搜索BFS

【LetMeFly】1631.最小体力消耗路径&#xff1a;广度优先搜索BFS 力扣题目链接&#xff1a;https://leetcode.cn/problems/path-with-minimum-effort/ 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights &#xff0c;其中 heights[row][col] 表示格子 (ro…

Leetcode—2961.双模幂运算【中等】

2023每日刷题&#xff08;五十六&#xff09; Leetcode—2961.双模幂运算 实现代码 class Solution { public:int func(int a, int b) {int ans 1;for(int i 0; i < b; i) {ans * a;ans % 10;}return ans;}int func2(int a, int b, int m) {int ans 1;for(int i 0; i …

使用Kali Linux端口扫描

端口扫描 【实训目的】 掌握端口扫描的基本概念和端口扫描的原理&#xff0c;掌握各种类型端口扫描的方法及其区别。 【场景描述】 在虚拟机环境下配置4个虚拟系统“Win XP1” “Win XP2” “Kali Linux”和“Metasploitable2”&#xff0c;使得4个系统之间能够相互通信。实…

深度学习(生成式模型)——ADM:Diffusion Models Beat GANs on Image Synthesis

文章目录 前言基础模型结构UNet结构Timestep Embedding关于为什么需要timestep embedding global attention layer 如何提升diffusion model生成图像的质量Classifier guidance实验结果 前言 在前几篇博文中&#xff0c;我们已经介绍了DDPM、DDIM、Classifier guidance等相关的…

EasyV易知微助力智慧城市未来趋势发展——数字孪生城市

“智慧城市的未来趋势就是数字孪生”——《基于数字孪生的智慧城市》 城市数字化管理、智慧城市和数字孪生城市的发展是相互促进、逐步深化的过程。 城市数字化管理作为起点&#xff0c;奠定了信息化、数据化的基础&#xff1b;而智慧城市则将数字城市管理进一步升级&#xff…

Could not resolve all dependencies for configuration ‘:app:androidApis‘.

android studio出现Could not resolve all dependencies for configuration ‘:app:androidApis’. 试过很多种方法&#xff0c;但是都不好使&#xff0c;不管怎么样都是提示如下报错&#xff1a; Using insecure protocols with repositories, without explicit opt-in, is un…

nginx配置正向代理支持https

操作系统版本&#xff1a; Alibaba Cloud Linux 3.2104 LTS 64位 nginx版本&#xff1a; nginx-1.25.3 1. 下载软件 切换目录 cd /server wget http://nginx.org/download/nginx-1.25.3.tar.gz 1.1解压 tar -zxvf nginx-1.25.3.tar.gz 1.2切换到源码所在目录…

Wireshark中的http协议包分析

Wireshark可以跟踪网络协议的通讯过程&#xff0c;本节通过http协议&#xff0c;在了解Wireshark使用的基础上&#xff0c;重温http协议的通讯过程。 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一种面向连接的、可靠的、基于 字节流…

lwIP 细节之三:errf 回调函数是何时调用的

使用 lwIP 协议栈进行 TCP 裸机编程&#xff0c;其本质就是编写协议栈指定的各种回调函数。将你的应用逻辑封装成函数&#xff0c;注册到协议栈&#xff0c;在适当的时候&#xff0c;由协议栈自动调用&#xff0c;所以称为回调。 注&#xff1a;除非特别说明&#xff0c;以下内…

评论送书:以企业架构为中心的SABOE数字化转型五环法

01 传统企业数字化转型面临诸多挑战 即将过去的2023年&#xff0c;chatGPT大模型、数据资产入表等事件的发生&#xff0c;标志着数字经济正在加速发展。数字经济是人类社会继农业经济、工业经济之后的第三种经济形态&#xff0c;将推动生产方式、生活方式和治理方式深刻变革&a…

Goby 漏洞发布| 亿赛通电子文档安全管理系统 LinkFilterService 接口权限绕过漏洞

漏洞名称&#xff1a;亿赛通电子文档安全管理系统 LinkFilterService 接口权限绕过漏洞 English Name&#xff1a;Esafenet Electronic Document Security Management System LinkFilterService API Permission Bypass Vulnerability CVSS core: 9.3 影响资产数&#xff1a;…

玩转大数据12:大数据安全与隐私保护策略

1. 引言 大数据的快速发展&#xff0c;为各行各业带来了巨大的变革&#xff0c;也带来了新的安全和隐私挑战。大数据系统通常处理大量敏感数据&#xff0c;包括个人身份信息、财务信息、健康信息等。如果这些数据被泄露或滥用&#xff0c;可能会对个人、企业和社会造成严重的损…

《opencv实用探索·十八》Camshift进行目标追踪流程

CamShift&#xff08;Continuously Adaptive Mean Shift&#xff09;是一种用于目标跟踪的方法&#xff0c;它是均值漂移&#xff08;Mean Shift&#xff09;的扩展&#xff0c;支持对目标的旋转跟踪&#xff0c;能够对目标的大小和形状进行自适应调整。 cv::CamShift和cv::me…

051:vue项目webpack打包后查看各个文件大小

第050个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…

SSM与SpringBoot面试题总结

什么是spring&#xff1f;谈谈你对IOC和AOP的理解。 Spring:是一个企业级java应用框架&#xff0c;他的作用主要是简化软件的开发以及配置过程&#xff0c;简化项目部署环境。 Spring的优点: 1、Spring低侵入设计&#xff0c;对业务代码的污染非常低。 2、Spring的DI机制将…

scala编码

1、Scala高级语言 Scala简介 Scala是一门类Java的多范式语言&#xff0c;它整合了面向对象编程和函数式编程的最佳特性。具体来讲Scala运行于Java虚拟机&#xff08;JVM)之上&#xff0c;井且兼容现有的Java程序&#xff0c;同样具有跨平台、可移植性好、方便的垃圾回收等特性…

JavaWeb三大组件(Servlet程序、Filter过滤器、Listener监听器)

文章目录 一、Servlet1、Servlet概述和运行流程2、开发过程&#xff08;xml和注解方式&#xff09;3、Servlet生命周期4、Servlet继承结构4.1、Servlet规范接口4.2、ServletConfig配置接口4.3、GenericServlet抽象类4.4、HttpServlet抽象类 5、ServletConfig和ServletContext6、…

解决高风险代码:Header Manipulation

Abstract HTTP 响应头文件中包含未验证的数据会引发 cache-poisoning、 cross-site scripting、 cross-user defacement、 page hijacking、 cookie manipulation 或 open redirect Explanation 以下情况中会出现 Header Manipulation 漏洞&#xff1a; 1. 数据通过一个不可信…

《opencv实用探索·十七》calcBackProject直方图反向投影

在了解反向投影前需要先了解下直方图的概念&#xff0c;可以看我上一章内容&#xff1a;opencv直方图计算calcHist函数解析 直方图反向投影是一种图像处理技术&#xff0c;通常用于目标检测和跟踪。通过计算反向投影&#xff0c;可以将图像中与给定模式&#xff08;目标对象&a…

Next.js中的App Router与Page Router,各自的作用和使用方式,如何理解和配置使用?

App Router介绍 Next.js中的App Router是全局的路由器&#xff0c;它用于在应用程序的所有页面之间进行导航。它可以用于在页面之间传递状态和数据&#xff0c;类似于React中的Context。 App Router是通过_app.js文件中的getInitialProps方法来配置的。 在 Next.js 中&#xf…