执行flink sql连接clickhouse库

手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。

组件版本
jdk1.8
flink1.17.2
clickhouse23.12.2.59

1.背景

flink官方不支持clickhouse连接器,工作中难免会用到。

2.方案

利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16

3.编译

导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程

spring boot工程

4.1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
<!--	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.13</version>
		<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;
	</parent>-->

	<parent>
		<groupId>com.mit.microgrid</groupId>
		<artifactId>mit-microgrid</artifactId>
		<version>${project.build.version}</version>
	</parent>

	<artifactId>mit-microgrid-flink</artifactId>
	<name>mit-microgrid-flink</name>
	<description>flink connector clickhouse</description>

	<properties>
		<java.version>1.8</java.version>
		<flink.version>1.17.2</flink.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<!-- 排除SpringBoot自带的日志依赖 -->
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
		</dependency>

		<!--flink-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!--flink connector-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-base</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!--flink connector clickhouse-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-clickhouse</artifactId>
			<version>1.16.0-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
<!--			<artifactId>flink-clients_2.12</artifactId>-->
			<version>${flink.version}</version>
<!--			<scope>provided</scope>-->
		</dependency>
		<!-- flink sql -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-loader</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- Flink JDBC Connector -->
<!--		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>1.14.6</version> &lt;!&ndash; 与您的Flink版本匹配 &ndash;&gt;
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.2-1.17</version>
		</dependency>
		<!-- ClickHouse JDBC Driver -->
		<dependency>
			<groupId>ru.yandex.clickhouse</groupId>
			<artifactId>clickhouse-jdbc</artifactId>
			<version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 -->
		</dependency>
		<!-- 添加clickhouse-maven依赖-->
		<dependency>
			<groupId>ru.ivi.opensource</groupId>
			<artifactId>flink-clickhouse-sink</artifactId>
			<version>1.2.0</version>
		</dependency>

		<!--module-->
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-common-core</artifactId>
			<version>${project.build.version}</version>
		</dependency>
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-api-history</artifactId>
			<version>${project.build.version}</version>
		</dependency>

		<!--sql parse-->
		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-core</artifactId>
			<version>1.37.0</version>
		</dependency>
<!--		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-server</artifactId>
			<version>1.37.0</version>
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-parser</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!--mysql-->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.30</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>1.17.2</version>
		</dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-text</artifactId>
			<version>1.12.0</version>
		</dependency>
	</dependencies>

	<build>
		<!--<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>-->
<!--			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.2.4</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers combine.children="append">
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>-->
<!--		</plugins>-->

		<finalName>${project.artifactId}</finalName>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>2.7.3</version>
				<configuration>
					<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
					<fork>true</fork>
					<layout>ZIP</layout>
					<includeSystemScope>true</includeSystemScope>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
						<configuration>
							<classifier>-with-dependencies</classifier>
						</configuration>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<archive>
						<addMavenDescriptor>false</addMavenDescriptor>
						<manifest>
							<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
						</manifest>
					</archive>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<descriptors>
						<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
					</descriptors>
					<outputDirectory>./../out</outputDirectory>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

</project>

4.2)核心方法:

    /**
     * multiple sql execute
     *
     * @param sqlList
     */
    public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {
        log.info("参数sqlList: {}", sqlList);
//        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(setting);
        if (CollectionUtil.isNullOrEmpty(sqlList)) {
            log.warn("sqlList参数为空");
            return null;
        }
        for (String s : sqlList) {
            TableResult tableResult = tEnv.executeSql(s);
            Optional<JobClient> jobClientOptional = tableResult.getJobClient();
            if (jobClientOptional.isPresent()) {
                JobClient jobClient = jobClientOptional.get();
                log.info("jobClient: " + jobClient);
                return jobClient;
            }
        }
        log.error("没有可执行的job");
        return null;
    }

5.源码地址

https://github.com/genghongsheng0/mit-microgrid-flink

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

力扣(leetcode)题目总结——辅助栈篇

leetcode 经典题分类 链表数组字符串哈希表二分法双指针滑动窗口递归/回溯动态规划二叉树辅助栈 本系列专栏&#xff1a;点击进入 leetcode题目分类 关注走一波 前言&#xff1a;本系列文章初衷是为了按类别整理出力扣&#xff08;leetcode&#xff09;最经典题目&#xff0c…

基于Java Springboot宠物猫售卖管理系统

一、作品包含 源码数据库全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据库&#xff1a;…

Windows docker下载minio出现“Using default tag: latestError response from daemon”

Windows docker下载minio出现 Using default tag: latest Error response from daemon: Get "https://registry-1.docker.io/v2/": context deadline exceeded 此类情况&#xff0c;一般为镜像地址问题。 {"registry-mirrors": ["https://docker.re…

数据结构查找-哈希表(开发地址法+线性探测法)+(创建+查找+删除代码)+(C语言代码)

#include<stdlib.h> #include<stdio.h> #include<stdbool.h> #define NULLKEY -1//单元为空 #define DELKEY -2//单元内容被删除 #define M 20 typedef struct {int key;//关键字int count;//统计哈希冲突探测次数 }HashTable; //插入到哈希表 void InsertHT…

视频直播5G CPE解决方案:ZX7981PG/ZX7981PMWIFI6网络覆盖

方案背景 视频直播蓬勃发展的当下&#xff0c;传统直播网络联网方式的局限性越来越明显。目前传统直播的局限性主要集中在以下几个方面&#xff1a; 传统直播间网络架构条件有限&#xff0c;可连接WIFI数量少&#xff0c;多终端同时直播难以维持&#xff1b;目前4G网络带宽有限…

【电子设计】按键LED控制与FreeRTOS

1. 安装Keilv5 打开野火资料,寻找软件包 解压后得到的信息 百度网盘 请输入提取码 提取码:gfpp 安装526或者533版本都可以 下载需要的 F1、F4、F7、H7 名字的 DFP pack 芯片包 安装完 keil 后直接双击安装 注册操作,解压注册文件夹后根据里面的图示步骤操作 打开说明 STM…

vue3【实战】切换白天黑夜(暗黑模式)【组件封装】DarkMode.vue

效果预览 原理解析 切换为暗黑模式时&#xff0c;会在 html 标签上添加样式类 dark导入 ElementPlus 的暗黑模式样式后&#xff0c; ElementPlus 组件会自动响应暗黑模式自定义组件需用 UnoCSS 的 dark: 语法自定义暗黑模式的样式 代码实现 技术方案 vue3 vite ElementPlus …

基于单片机的多功能环保宠物窝设计

本设计基于单片机设计的多功能环保宠物窝&#xff0c;利用温湿度传感器、压力传感模块、气味传感模块、红外测温传感器、通信模块、显示模块、清扫部件等&#xff0c;使其能够实现自动检测并调节温湿度、补充宠物食物、检测宠物体温健康并出现异常时进行报警、自动清扫消毒宠物…

MySql结合element-plus pagination的分页查询

实现效果如下&#xff1a; 重点&#xff1a;使用mysql查询的limit和offset 原生SQL写法&#xff1a; select c.id as deptid,c.name as department,position,a.name staffname,2024-11 as shijian ,CASE WHEN b.shijian IS NULL THEN no ELSE yes END AS submit from fa_wecom…

vue使用List.reduce实现统计

需要对集合的某些元素的值进行计算时&#xff0c;可以在计算属性中使用forEach方法 1.语法&#xff1a;集合.reduce ( ( 定义阶段性累加后的结果 , 定义遍历的每一项 ) > 定义每一项求和逻辑执行后的返回结果 , 定义起始值 ) 2、简单使用场景&#xff1a;例如下面…

Spring Boot汽车资讯:科技与速度的交响

3系统分析 3.1可行性分析 通过对本汽车资讯网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本汽车资讯网站采用SSM框架&#xff0c;JAVA作为开发语言&#…

前端页面自适应等比例缩放 Flexible+rem方案

在移动互联网时代&#xff0c;随着智能手机和平板电脑的普及&#xff0c;前端开发者面临的一个重要挑战是如何让网页在不同尺寸和分辨率的设备上都能良好地显示。为了应对这一挑战&#xff0c;阿里巴巴的前端团队开发了 flexible.js&#xff0c;旨在提供一种简单有效的解决方案…

记录一下在原有的接口中增加文件上传☞@RequestPart

首先&#xff0c;咱声明一下&#xff1a; RequestBody和 MultipartFile 不可以 同时使用&#xff01;&#xff01;&#xff01; 因为这两者预期的请求内容类型不同。RequestBody 预期请求的 Content-Type 是 application/json 或 application/xml&#xff0c;而 MultipartFile …

HTML5实现剪刀石头布小游戏(附源码)

文章目录 1.设计来源1.1 主界面1.2 皮肤风格1.2 游戏中界面 2.效果和源码源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/143798520 HTM…

[Qt platform plugin问题] Could not load the Qt platform plugin “xcb“

Qt platform plugin 是 Qt 应用程序启动时加载的插件。不同的平台有不同的插件。 常见的插件有:linuxfb Wayland xcb 简单来说就是启动一个GUI程序, 离不开这些插件.选择其中一个就好 出现这个问题要么就是没有插件&#xff0c;要么就是插件依赖的库没有。 要么就是插件选则的…

【qt】控件2

1.frameGeometry和Geometry区别 frameGeometry是开始从红圈开始算&#xff0c;Geometry从黑圈算 程序证明&#xff1a;使用一个按键&#xff0c;当按键按下,qdebug打印各自左上角的坐标&#xff08;相当于屏幕左上角&#xff09;&#xff0c;以及窗口大小 Widget::Widget(QWid…

Idea中创建和联系MySQL等数据库

备注&#xff1a;电脑中要已下好自己需要的MySQL数据库软件 MySQL社区版下载链接&#xff1a; https://dev.mysql.com/downloads/installer/ 优点&#xff1a; 1.相比与在命令行中管理数据库&#xff0c;idea提供了图形化管理&#xff0c;简单明了&#xff1b; 2.便于与后端…

【Unity】网格系统:物体使用网格坐标定位

需求分析 前面物体放置在地板上都是地板任意位置放置&#xff0c;本节开始对物体放置的位置做限制。 建立网格&#xff0c;网格可以设置起始世界坐标、单元格大小和规格&#xff1b;单元格中包括内部物体的信息&#xff1b;物体的位置通过网格的坐标确定&#xff1b;单元格中…

网络协议(4)拥塞控制

之前已经说过了tcp也是会考虑网络的情况的&#xff0c;也就是当网络出现问题的时候tcp不会再对报文进行重传。当所有的用户在网络不好的时候都不会对丢失的报文进行重传。这样就会防止网络瘫痪。 这样的机制也就是tcp会进行拥塞控制。 拥塞控制 所谓的慢启动看下面这张图就能…

集群聊天服务器(8)用户登录业务

目录 登录状态业务层代码数据模型层代码记录用户的连接信息以及线程安全问题客户端异常退出业务 登录状态 登录且状态变为online 业务层代码 #include "chatservice.hpp" #include "public.hpp" #include <string> #include <muduo/base/Loggi…