Flink写入数据到ClickHouse

文章目录

      • 1.ClickHouse建表
      • 1.ClickHouse依赖
      • 2.Bean实体类
      • 3.ClickHouse业务写入逻辑
      • 4.测试写入类
      • 5.发送数据

1.ClickHouse建表

ClickHouse中建表

CREATE TABLE default.test_write
(
    id   UInt16,
    name String,
    age  UInt16
) ENGINE = TinyLog();

1.ClickHouse依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到clickhouse -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse-jdbc.version}</version>
        </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

2.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}

3.ClickHouse业务写入逻辑

ClickHouseSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:36
 * @Description
 **/


public class ClickHouseSinkFunction extends RichSinkFunction<User> {
    Connection conn = null;
    String sql;

    public ClickHouseSinkFunction(String sql) {
        this.sql = sql;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = getConn("localhost", 8123, "default");
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (conn != null) {
            conn.close();
        }
    }

    // 定义具体的操作
    @Override
    public void invoke(User user, Context context) throws Exception {
        // 批量插入
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        preparedStatement.setLong(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setLong(3, user.age);
        preparedStatement.addBatch();

        long startTime = System.currentTimeMillis();
        int[] ints = preparedStatement.executeBatch();
        conn.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + ints.length);
    }

    public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
        conn = DriverManager.getConnection(address);
        return conn;
    }
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

4.测试写入类

ClickHouseWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.ClickHouseSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @Author daniel
 * @Date: 2023/7/3 15:37
 * @Description
 **/

public class ClickHouseWriteTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // Source
        DataStream<String> ds = env.socketTextStream("localhost", 9999);

        // Transform
        SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
            String[] split = data.split(",");
            return User.builder()
                    .id(Integer.parseInt(split[0]))
                    .name(split[1])
                    .age(Integer.parseInt(split[2]))
                    .build();
        });

        // Sink
        String sql = "INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";
        ClickHouseSinkFunction jdbcSink = new ClickHouseSinkFunction(sql);
        dataStream.addSink(jdbcSink);
        env.execute("flink-clickhouse-write");
    }
}

5.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动ClickHouseWriteTest.java程序

在这里插入图片描述

查询数据

select *
from default.test_write;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35093.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【文生图系列】文生图大模型合集与效果对比

文章目录 DELL EDELL E 1DELL E 2 ERNIE-ViLGERNIE-ViLG 1ERNIE-ViLG 2Paddlehub ImagenMidjourneyStable DiffusionAltDiffusioneDiff-I阿里通义 DELL E DALLE到目前为止有两个版本&#xff0c;2021年1月&#xff0c;OpenAI发布了DALLE&#xff1b;2022年,DALLE 迎来了升…

【电影推荐系统】实时推荐

目录 原因 由于实时性&#xff0c;所以算法设计需要满足一下两点 算法设计 算法实现 算法公式 完整代码 原因 用户对电影的偏好随着时间的推移总是会发生变化的。此时离线系统无法解决&#xff0c;需要实时推荐。 由于实时性&#xff0c;所以算法设计需要满足一下两点 …

Go语言远程调试

Go语言远程调试 1、安装dlv # 安装dlv $ go install github.com/go-delve/delve/cmd/dlvlatest$ dlv version Delve Debugger Version: 1.20.1 Build: $Id: 96e65b6c615845d42e0e31d903f6475b0e4ece6e $2、命令行远程调试 我们远程(Linux服务器)有如下代码&#xff1a; [ro…

自学大语言模型之GPT

GPT火爆的发展史 2017年6月OpenAI联合DeepMind首次正式提出的&#xff1a;Deep Reinforcement Learning from Human Preferences&#xff0c;即基于人类偏好的深度强化学习&#xff0c;简称RLHF 2017年7月的OpenAI团队提出的对TRPO算法的改进&#xff1a;PPO算法 GPT-1&#…

Tomcat的优化多实例部署

目录 一.tomcat核心组件模块 1.2. toncat功能组件结构 二.Tomcat 优化 三.简述Tomcat请求过程 四.Tomcat 多实例部署 多实例部署图示 1.关闭防火墙 拖入软件包 2.安装JDk 设置JDK环境变量 3.解压tomcat 创建目录 4.配置 tomcat 环境变量 5.修改 tomcat2 中的 server.xm…

学习系统编程No.29【线程执行过程之页表详解】

引言&#xff1a; 北京时间&#xff1a;2023/7/3/14:09&#xff0c;刚睡醒&#xff0c;放假在家起床时间确实不怎么好调整&#xff0c;根本固定不了一点&#xff0c;当然通俗点说也就是根本起不来&#xff0c;哈哈哈&#xff0c;已经很少见到那种7点起来码字的情形了&#xff…

UART-GD32

UART-GD32 通信的概念 同步通信和异步通信 数据帧格式 波特率 使用步骤 引脚分布

gitLab配置ssh实现私钥访问

1.配置ssh文件 1.cd C:\Users\用户名\.ssh 找到文件夹 删除.ssh 里面所有其他文件方面我们配置要最新的 2.win r cmd 呼出命令行 ssh-keygen -t rsa -C "必须对应gitLab用户名" 3.生成文件夹拿到ssh 4.复制id_rsa_pub 文件的全部字符串 公钥给到GitLab服务器 2.公…

Spring Boot 中的模板引擎是什么,如何使用

Spring Boot 中的模板引擎是什么&#xff0c;如何使用 在 Web 应用程序中&#xff0c;模板引擎是一种用于动态生成 HTML、XML、JSON 等文档的工具。Spring Boot 内置了多种常见的模板引擎&#xff0c;例如 Thymeleaf、Freemarker、Velocity 等&#xff0c;让我们可以轻松地创建…

线性代数行列式的几何含义

行列式可以看做是一系列列向量的排列&#xff0c;并且每个列向量的分量可以理解为其对应标准正交基下的坐标。 行列式有非常直观的几何意义&#xff0c;例如&#xff1a; 二维行列式按列向量排列依次是 a \mathbf{a} a和 b \mathbf{b} b&#xff0c;可以表示 a \mathbf{a} a和…

Lua学习笔记:浅谈对垃圾回收的理解

前言 本篇在讲什么 Lua的垃圾回收 本篇适合什么 适合初学Lua的小白 本篇需要什么 对Lua语法有简单认知 依赖Sublime Text编辑器 本篇的特色 具有全流程的图文教学 重实践&#xff0c;轻理论&#xff0c;快速上手 提供全流程的源码内容 ★提高阅读体验★ &#x1f…

3、boostrap图片视频上传展示

boostrap图片视频上传展示 1、展示效果2、html代码 1、展示效果 项目目录结构 2、html代码 html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><!--<link rel"st…

记一次 .NET 某工控视觉系统 卡死分析

一&#xff1a;背景 1. 讲故事 前段时间有位朋友找到我&#xff0c;说他们的工业视觉软件僵死了&#xff0c;让我帮忙看下到底是什么情况&#xff0c;哈哈&#xff0c;其实卡死的问题相对好定位&#xff0c;无非就是看主线程栈嘛&#xff0c;然后就是具体问题具体分析&#x…

一起来看看文档翻译哪个好吧

在繁忙的都市生活中&#xff0c;小玲是一位年轻的职场人士。她的工作经常需要处理各种文档和文件&#xff0c;而其中不乏需要与外国合作伙伴交流的时候。然而&#xff0c;她并不熟悉其他语言&#xff0c;这给她的工作带来了一定的困扰。于是&#xff0c;她开始寻找免费的文档翻…

什么是AOP?

目录 一、AOP简介 1、AOP简介和作用 2、AOP的概念 二、AOP的基本实现 三、AOP工作流程 1 、AOP工作流程 2、AOP核心概念 四、AOP切入点表达式 1、语法格式 2、通配符 五、AOP通知类型 1、AOP通知分类 2、AOP通知详解 &#xff08;1&#xff09;前置通知 &#xf…

MySQL-分库分表详解(四)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️努力不一定有回报&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xf…

【ArcGIS微课1000例】0069:用ArcGIS提取一条线的高程值

本实验讲解用ArcGIS软件,基于数字高程模型DEM提取一条线的高程值并导出。 文章目录 一、加载实验数据二、将线转为折点三、提取折点高程值四、导出高程值五、注意事项【相关阅读】:【GlobalMapper精品教程】060:用dem提取一条线的高程值 一、加载实验数据 本实验使用的数据…

初学者一步步学习python 学习提纲

当学习Python时&#xff0c;可以按照以下提纲逐步学习&#xff1a; 入门基础 了解Python的历史和应用领域安装Python解释器和开发环境&#xff08;如Anaconda、IDLE等&#xff09;学习使用Python的交互式解释器或集成开发环境&#xff08;IDE&#xff09;进行简单的代码编写和…

Seafile搭建个人云盘 - 内网穿透实现在外随时随地访问

文章目录 1. 前言2. SeaFile云盘设置2.1 Owncould的安装环境设置2.2 SeaFile下载安装2.3 SeaFile的配置 3. cpolar内网穿透3.1 Cpolar下载安装3.2 Cpolar的注册3.3 Cpolar云端设置3.4 Cpolar本地设置 4. 公网访问测试5. 结语 转载自cpolar极点云文章&#xff1a;使用SeaFile搭建…

【电影推荐系统】基于 ALS 的协同过滤推荐算法

目录 目的 用户电影推荐矩阵主要思路如下 1 UserId 和 MovieID 做笛卡尔积&#xff0c;产生&#xff08;uid&#xff0c;mid&#xff09;的元组 2 通过模型预测&#xff08;uid&#xff0c;mid&#xff09;的元组。 3 将预测结果通过预测分值进行排序。 4 返回分值最大的 …