ClickHouse--11--ClickHouse API操作

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1.Java 读写 ClickHouse API
    • 1.1 首先需要加入 maven 依赖
    • 1.2 Java 读取 ClickHouse 集群表数据
        • JDBC--01--简介
      • ClickHouse java代码
    • 1.3 Java 向 ClickHouse 表中写入数据
  • 2.Spark 写入 ClickHouse API
    • 2.1 导入依赖
    • 2.2 代码编写
  • 3.Flink 写入 ClickHouse API
    • 3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API
    • 3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
     <artifactId>clickhouse-jdbc</artifactId>
     <version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

在这里插入图片描述

public class Test01 {

    public static void main(String[] args) throws Exception {
        //1.注册数据库驱动
        Class.forName("com.mysql.jdbc.Driver");
        //2.获取数据库连接
        Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8",
                "root", "root");

        //3.获取传输器
        Statement stat = conn.createStatement();
        //4.发送SQL到服务器执行并返回执行结果
        String sql = "select * from account";
        ResultSet rs = stat.executeQuery( sql );
        //5.处理结果
        while( rs.next() ) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            double money = rs.getDouble("money");
            System.out.println(id+" : "+name+" : "+money);
        }
        //6.释放资源
        rs.close();
        stat.close();
        conn.close();
        System.out.println("TestJdbc.main()....");
    }

}
                                            

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        ResultSet rs = statement.executeQuery("select id,name,age from test");
        //5.处理结果
        while (rs.next()) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            int age = rs.getInt("age");
            System.out.println("id = " + id + ",name = " + name + ",age = " + age);
        }

        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条
       
        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

在这里插入图片描述

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包-->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <!-- 去除与 Spark 冲突的包 -->
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Spark-core -->
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL ON Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

在这里插入图片描述

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/401390.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高校学科竞赛平台|基于springboot高校学科竞赛平台设计与实现(源码+数据库+文档)

高校学科竞赛平台目录 目录 基于springboot高校学科竞赛平台设计与实现 一、前言 二、系统功能设计 三、系统实现 1、竞赛题库管理 2、竞赛信息管理 3、晋级名单管理 4、往年成绩管理 5、参赛申请管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最…

typescript 索引签名类型

ts索引类型简介 在TypeScript中&#xff0c;索引签名类型&#xff08;Index Signature Type&#xff09;是一种特殊的类型&#xff0c;它定义了对象中键的类型以及相应的值的类型。通过使用索引签名类型&#xff0c;我们可以表示一个对象&#xff0c;该对象的键可以是任意类型…

Android全新UI框架之常用ComposeUI组件

在Compose中&#xff0c;每个组件都是一个带有Composable注解的函数&#xff0c;被称为Composable。Compose已经预置了很多基于MD设计规范的Composable组件。 在布局方面&#xff0c;Compose提供了Column、Row、Box三种布局组件(感觉跟flutter差不多)&#xff0c;类似于传统视图…

《Python 语音转换简易速速上手小册》第5章 音频数据处理(2024 最新版)

文章目录 5.1 音频数据的基本处理5.1.1 基础知识5.1.2 主要案例&#xff1a;音频剪辑工具案例介绍案例 Demo案例分析 5.1.3 扩展案例 1&#xff1a;自动音量调节器案例介绍案例 Demo案例分析 5.1.4 扩展案例 2&#xff1a;语音识别预处理案例介绍案例 Demo案例分析 5.2 使用 Py…

LLM 模型融合实践指南:低成本构建高性能语言模型

编者按&#xff1a;随着大语言模型技术的快速发展&#xff0c;模型融合成为一种低成本但高性能的模型构建新途径。本文作者 Maxime Labonne 利用 mergekit 库探索了四种模型融合方法&#xff1a;SLERP、TIES、DARE和passthrough。通过配置示例和案例分析&#xff0c;作者详细阐…

开启智能互动新纪元——ChatGPT提示词工程的引领力

目录 提示词工程的引领力 高效利用ChatGPT提示词方法 提示词工程的引领力 近年来&#xff0c;随着人工智能技术的迅猛发展&#xff0c;ChatGPT提示词工程正逐渐崭露头角&#xff0c;为智能互动注入了新的活力。这一技术的引入&#xff0c;使得人机交流更加流畅、贴近用户需求&…

S-35390A计时芯片介绍及开发方案

计时芯片 S-35390A芯片是计时芯片&#xff0c;一般用来计算时间。低功耗&#xff0c;宽电压&#xff0c;受温度影响小&#xff0c;适用于很多电路。它有一个问题&#xff0c;不阻止用户设置不存在的时间&#xff0c;设置进去之后计时或者闹钟定时会出错。 规格书阅读 首先我…

推荐几款项目经理常用的项目管理软件

随着科技的发展和项目需求&#xff0c;项目管理工具成为了确保工作顺利进行的关键。市场上有许多优秀的免费项目管理工具&#xff0c;它们功能强大、易于使用&#xff0c;并可以帮助团队更有效地规划、组织、执行和监控项目。以下是几款深受项目经理欢迎&#xff0c;好用且免费…

【转载】企业资产收集与脆弱性检查工具

简介 云图极速版是针对拥有攻击面管理需求的用户打造的 SaaS 应用&#xff0c;致力于协助用户管理互联网资产攻击面的 SaaS 化订阅服务产品。可实现对备案域名、子域名、IP、端口、服务、网站、漏洞、安全风险等场景进行周期性监控&#xff0c;支持多维度分析攻击面。利用可视化…

uni-app 开发调试自动打开手机屏幕大小界面(Aidex移动端开发项目)

上效果&#xff1a; 下载Aidex的移动端项目并打开&#xff1a; 若依-ruoyi-AiDex-Uniapp: 若依-Ruoyi APP 移动解决方案&#xff0c;基于uniappuView封装的一套基础模版&#xff0c;开箱即用&#xff0c;免费开源&#xff0c;一份代码多终端适配&#xff0c;支持H5、支付宝小程…

基于机器学习的青藏高原高寒沼泽湿地蒸散发插补研究_王秀英_2022

基于机器学习的青藏高原高寒沼泽湿地蒸散发插补研究_王秀英_2022 摘要关键词 1 材料和方法1.1 研究区概况与数据来源1.2 研究方法 2 结果和分析2.1 蒸散发通量观测数据缺省状况2.2 蒸散发与气象因子的相关性分析2.3 不同气象因子输入组合下各模型算法精度对比2.4 随机森林回归模…

【统计分析数学模型】聚类分析

【统计分析数学模型】聚类分析 一、聚类分析1. 基本原理2. 距离的度量&#xff08;1&#xff09;变量的测量尺度&#xff08;2&#xff09;距离&#xff08;3&#xff09;R语言计算距离 三、聚类方法1. 系统聚类法2. K均值法 三、示例1. Q型聚类&#xff08;1&#xff09;问题描…

springboot集成JWT实现token权限认证

vuespringboot登录与注册功能的实现 注&#xff1a;对于JWT的学习&#xff0c;首先要完成注册和登录的功能&#xff0c;本篇博客是基于上述博客的进阶学习&#xff0c;代码页也是在原有的基础上进行扩展 ①在pom.xml添加依赖 <!-- JWT --> <dependency><grou…

QPaint绘制自定义仪表盘组件01

网上抄别人的&#xff0c;只是放这里自己看一下&#xff0c;看完就删掉 ui Dashboard.pro QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomm…

5 原型模式 Prototype

1.模式定义: 指原型实例指定创建对象的种类&#xff0c;并且通过拷贝这些原型创建新的对象 2.应用场景&#xff1a; 当代码不应该依赖于需要复制的对象的具体类时&#xff0c;请使用Prototype模式。 Spring源码中的应用 org.springframework.beans.factory.support.AbstractB…

基于java springboot+mybatis OA办公自动化系统设计和实现

基于java springbootmybatis OA办公自动化系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末…

盘点那些世界名校计算机专业采用的教材

清华、北大、MIT、CMU、斯坦福的学霸们在新学期里要学什么&#xff1f;今天我们来盘点一下那些世界名校计算机专业采用的教材。 书单目录 1.《深入理解计算机系统》&#xff08;原书第3版&#xff09;2. 《算法导论》&#xff08;原书第3版&#xff09;3. 《计算机程序的构造和…

打架监测识别摄像机

打架监测识别摄像机是一种用于监控和识别打架行为的智能监控设备。这种摄像机利用先进的人工智能和计算机视觉技术&#xff0c;能够准确识别出监控画面中发生的打架事件&#xff0c;从而及时采取必要的应对措施。 打架监测识别摄像机的工作原理是通过对监控画面的实时分析和识别…

C语言每日一题(60)对链表进行插入排序

题目链接 力扣网 147 对链表进行插入排序 题目描述 给定单个链表的头 head &#xff0c;使用 插入排序 对链表进行排序&#xff0c;并返回 排序后链表的头 。 插入排序 算法的步骤: 插入排序是迭代的&#xff0c;每次只移动一个元素&#xff0c;直到所有元素可以形成一个有…

Jlink+OpenOCD+STM32 Vscode 下载和调试环境搭建

对于 Mingw 的安装比较困难&#xff0c;国内的网无法正常在线下载组件&#xff0c; 需要手动下载 x86_64-8.1.0-release-posix-seh-rt_v6-rev0.7z 版本的软件包&#xff0c;添加环境变量&#xff0c;并将 mingw32-make.exe 名字改成 make.exe。 对于 OpenOCD&#xff0c;需要…